|  |  |  | #!/usr/bin/env python3
 | 
					
						
							|  |  |  | import os
 | 
					
						
							|  |  |  | from cffi import FFI
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import numpy as np
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | gf_dir = os.path.dirname(os.path.abspath(__file__))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | ffi = FFI()
 | 
					
						
							|  |  |  | ffi.cdef("""
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | typedef enum VisionStreamType {
 | 
					
						
							|  |  |  |   VISION_STREAM_RGB_BACK,
 | 
					
						
							|  |  |  |   VISION_STREAM_RGB_FRONT,
 | 
					
						
							|  |  |  |   VISION_STREAM_YUV,
 | 
					
						
							|  |  |  |   VISION_STREAM_YUV_FRONT,
 | 
					
						
							|  |  |  |   VISION_STREAM_MAX,
 | 
					
						
							|  |  |  | } VisionStreamType;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | typedef struct VisionUIInfo {
 | 
					
						
							|  |  |  |   int big_box_x, big_box_y;
 | 
					
						
							|  |  |  |   int big_box_width, big_box_height;
 | 
					
						
							|  |  |  |   int transformed_width, transformed_height;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   int front_box_x, front_box_y;
 | 
					
						
							|  |  |  |   int front_box_width, front_box_height;
 | 
					
						
							|  |  |  | } VisionUIInfo;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | typedef struct VisionStreamBufs {
 | 
					
						
							|  |  |  |   VisionStreamType type;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   int width, height, stride;
 | 
					
						
							|  |  |  |   size_t buf_len;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   union {
 | 
					
						
							|  |  |  |     VisionUIInfo ui_info;
 | 
					
						
							|  |  |  |   } buf_info;
 | 
					
						
							|  |  |  | } VisionStreamBufs;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | typedef struct VIPCBuf {
 | 
					
						
							|  |  |  |   int fd;
 | 
					
						
							|  |  |  |   size_t len;
 | 
					
						
							|  |  |  |   void* addr;
 | 
					
						
							|  |  |  | } VIPCBuf;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | typedef struct VIPCBufExtra {
 | 
					
						
							|  |  |  |   // only for yuv
 | 
					
						
							|  |  |  |   uint32_t frame_id;
 | 
					
						
							|  |  |  |   uint64_t timestamp_eof;
 | 
					
						
							|  |  |  | } VIPCBufExtra;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | typedef struct VisionStream {
 | 
					
						
							|  |  |  |   int ipc_fd;
 | 
					
						
							|  |  |  |   int last_idx;
 | 
					
						
							|  |  |  |   int last_type;
 | 
					
						
							|  |  |  |   int num_bufs;
 | 
					
						
							|  |  |  |   VisionStreamBufs bufs_info;
 | 
					
						
							|  |  |  |   VIPCBuf *bufs;
 | 
					
						
							|  |  |  | } VisionStream;
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | int visionstream_init(VisionStream *s, VisionStreamType type, bool tbuffer, VisionStreamBufs *out_bufs_info);
 | 
					
						
							|  |  |  | VIPCBuf* visionstream_get(VisionStream *s, VIPCBufExtra *out_extra);
 | 
					
						
							|  |  |  | void visionstream_destroy(VisionStream *s);
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | )
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class VisionIPCError(Exception):
 | 
					
						
							|  |  |  |   pass
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class VisionIPC():
 | 
					
						
							|  |  |  |   def __init__(self, front=False):
 | 
					
						
							|  |  |  |     self.clib = ffi.dlopen(os.path.join(gf_dir, "libvisionipc.so"))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.s = ffi.new("VisionStream*")
 | 
					
						
							|  |  |  |     self.buf_info = ffi.new("VisionStreamBufs*")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     err = self.clib.visionstream_init(self.s, self.clib.VISION_STREAM_RGB_FRONT if front else self.clib.VISION_STREAM_RGB_BACK, True, self.buf_info)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if err != 0:
 | 
					
						
							|  |  |  |       self.clib.visionstream_destroy(self.s)
 | 
					
						
							|  |  |  |       raise VisionIPCError
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def __del__(self):
 | 
					
						
							|  |  |  |     self.clib.visionstream_destroy(self.s)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def get(self):
 | 
					
						
							|  |  |  |     buf = self.clib.visionstream_get(self.s, ffi.NULL)
 | 
					
						
							|  |  |  |     pbuf = ffi.buffer(buf.addr, buf.len)
 | 
					
						
							|  |  |  |     ret = np.frombuffer(pbuf, dtype=np.uint8).reshape((-1, self.buf_info.stride//3, 3))
 | 
					
						
							|  |  |  |     return ret[:self.buf_info.height, :self.buf_info.width, [2, 1, 0]]
 |