#!/usr/bin/env python3 import os from cffi import FFI import numpy as np gf_dir = os.path.dirname(os.path.abspath(__file__)) ffi = FFI() ffi.cdef(""" typedef enum VisionStreamType { VISION_STREAM_RGB_BACK, VISION_STREAM_RGB_FRONT, VISION_STREAM_YUV, VISION_STREAM_YUV_FRONT, VISION_STREAM_MAX, } VisionStreamType; typedef struct VisionUIInfo { int big_box_x, big_box_y; int big_box_width, big_box_height; int transformed_width, transformed_height; int front_box_x, front_box_y; int front_box_width, front_box_height; } VisionUIInfo; typedef struct VisionStreamBufs { VisionStreamType type; int width, height, stride; size_t buf_len; union { VisionUIInfo ui_info; } buf_info; } VisionStreamBufs; typedef struct VIPCBuf { int fd; size_t len; void* addr; } VIPCBuf; typedef struct VIPCBufExtra { // only for yuv uint32_t frame_id; uint64_t timestamp_eof; } VIPCBufExtra; typedef struct VisionStream { int ipc_fd; int last_idx; int last_type; int num_bufs; VisionStreamBufs bufs_info; VIPCBuf *bufs; } VisionStream; int visionstream_init(VisionStream *s, VisionStreamType type, bool tbuffer, VisionStreamBufs *out_bufs_info); VIPCBuf* visionstream_get(VisionStream *s, VIPCBufExtra *out_extra); void visionstream_destroy(VisionStream *s); """ ) class VisionIPCError(Exception): pass class VisionIPC(): def __init__(self, front=False): self.clib = ffi.dlopen(os.path.join(gf_dir, "libvisionipc.so")) self.s = ffi.new("VisionStream*") self.buf_info = ffi.new("VisionStreamBufs*") err = self.clib.visionstream_init(self.s, self.clib.VISION_STREAM_RGB_FRONT if front else self.clib.VISION_STREAM_RGB_BACK, True, self.buf_info) if err != 0: self.clib.visionstream_destroy(self.s) raise VisionIPCError def __del__(self): self.clib.visionstream_destroy(self.s) def get(self): buf = self.clib.visionstream_get(self.s, ffi.NULL) pbuf = ffi.buffer(buf.addr, buf.len) ret = np.frombuffer(pbuf, dtype=np.uint8).reshape((-1, self.buf_info.stride//3, 3)) return ret[:self.buf_info.height, :self.buf_info.width, [2, 1, 0]]