# distutils: language = c++
# cython: c_string_encoding=ascii, language_level=3
import numpy as np
cimport numpy as cnp
from libc.string cimport memcpy
from libc.stdint cimport uintptr_t
from msgq.visionipc.visionipc cimport cl_mem
from msgq.visionipc.visionipc_pyx cimport VisionBuf , CLContext as BaseCLContext
from .commonmodel cimport CL_DEVICE_TYPE_DEFAULT , cl_get_device_id , cl_create_context
from .commonmodel cimport mat3 , ModelFrame as cppModelFrame , DrivingModelFrame as cppDrivingModelFrame , MonitoringModelFrame as cppMonitoringModelFrame
cdef class CLContext ( BaseCLContext ) :
def __cinit__ ( self ) :
self . device_id = cl_get_device_id ( CL_DEVICE_TYPE_DEFAULT )
self . context = cl_create_context ( self . device_id )
cdef class CLMem :
@staticmethod
cdef create ( void * cmem ) :
mem = CLMem ( )
mem . mem = < cl_mem * > cmem
return mem
@property
def mem_address ( self ) :
return < uintptr_t > ( self . mem )
def cl_from_visionbuf ( VisionBuf buf ) :
return CLMem . create ( < void * > & buf . buf . buf_cl )
cdef class ModelFrame :
cdef cppModelFrame * frame
cdef int buf_size
def __dealloc__ ( self ) :
del self . frame
def prepare ( self , VisionBuf buf , float [ : ] projection , CLMem output ) :
cdef mat3 cprojection
memcpy ( cprojection . v , & projection [ 0 ] , 9 * sizeof ( float ) )
cdef unsigned char * data
if output is None :
data = self . frame . prepare ( buf . buf . buf_cl , buf . width , buf . height , buf . stride , buf . uv_offset , cprojection , NULL )
else :
data = self . frame . prepare ( buf . buf . buf_cl , buf . width , buf . height , buf . stride , buf . uv_offset , cprojection , output . mem )
if not data :
return None
return np . asarray ( < cnp . uint8_t [ : self . buf_size ] > data )
# return CLMem.create(data)
# def buffer_from_cl(self, CLMem in_frames):
# cdef unsigned char * data2
# data2 = self.frame.buffer_from_cl(in_frames.mem, self.buf_size)
# return np.asarray(<cnp.uint8_t[:self.buf_size]> data2)
cdef class DrivingModelFrame ( ModelFrame ) :
cdef cppDrivingModelFrame * _frame
def __cinit__ ( self , CLContext context ) :
self . _frame = new cppDrivingModelFrame ( context . device_id , context . context )
self . frame = < cppModelFrame * > ( self . _frame )
self . buf_size = self . _frame . buf_size
cdef class MonitoringModelFrame ( ModelFrame ) :
cdef cppMonitoringModelFrame * _frame
def __cinit__ ( self , CLContext context ) :
self . _frame = new cppMonitoringModelFrame ( context . device_id , context . context )
self . frame = < cppModelFrame * > ( self . _frame )
self . buf_size = self . _frame . buf_size