import os
import subprocess
from cffi import FFI
from common . basedir import BASEDIR
# Initialize visiontest. Ignore output.
_visiond_dir = os . path . dirname ( os . path . abspath ( __file__ ) )
_libvisiontest = " libvisiontest.so "
try : # bacause this crashes somtimes when running pipeline
subprocess . check_output ( [ " make " , " -C " , _visiond_dir , " -f " ,
os . path . join ( _visiond_dir , " visiontest.mk " ) ,
_libvisiontest ] )
except Exception :
pass
class VisionTest ( ) :
""" A version of the vision model that can be run on a desktop.
WARNING : This class is not thread safe . VisionTest objects cannot be
created or used on multiple threads simultaneously .
"""
ffi = FFI ( )
ffi . cdef ( """
typedef unsigned char uint8_t ;
struct VisionTest ;
typedef struct VisionTest VisionTest ;
VisionTest * visiontest_create ( int temporal_model , int disable_model ,
int input_width , int input_height ,
int model_input_width , int model_input_height ) ;
void visiontest_destroy ( VisionTest * visiontest ) ;
void visiontest_transform ( VisionTest * vt , const uint8_t * yuv_data ,
uint8_t * out_y , uint8_t * out_u , uint8_t * out_v ,
const float * transform ) ;
""" )
clib = ffi . dlopen ( os . path . join ( _visiond_dir , _libvisiontest ) )
def __init__ ( self , input_size , model_input_size , model ) :
""" Create a wrapper around visiond for off-device python code.
Inputs :
input_size : The size of YUV images passed to transform .
model_input_size : The size of YUV images passed to the model .
model : The name of the model to use . " temporal " , " yuv " , or None to disable the
model ( used to disable OpenCL ) .
"""
self . _input_size = input_size
self . _model_input_size = model_input_size
if model is None :
disable_model = 1
temporal_model = 0
elif model == " yuv " :
disable_model = 0
temporal_model = 0
elif model == " temporal " :
disable_model = 0
temporal_model = 1
else :
raise ValueError ( " Bad model name: {} " . format ( model ) )
prevdir = os . getcwd ( )
os . chdir ( _visiond_dir ) # tmp hack to find kernels
os . environ [ ' BASEDIR ' ] = BASEDIR
self . _visiontest_c = self . clib . visiontest_create (
temporal_model , disable_model , self . _input_size [ 0 ] , self . _input_size [ 1 ] ,
self . _model_input_size [ 0 ] , self . _model_input_size [ 1 ] )
os . chdir ( prevdir )
@property
def input_size ( self ) :
return self . _input_size
@property
def model_input_size ( self ) :
return self . _model_input_size
def transform ( self , yuv_data , transform ) :
y_len = self . model_input_size [ 0 ] * self . model_input_size [ 1 ]
t_y_ptr = bytearray ( y_len )
t_u_ptr = bytearray ( y_len / / 4 )
t_v_ptr = bytearray ( y_len / / 4 )
self . transform_output_buffer ( yuv_data , t_y_ptr , t_u_ptr , t_v_ptr ,
transform )
return t_y_ptr , t_u_ptr , t_v_ptr
def transform_contiguous ( self , yuv_data , transform ) :
y_ol = self . model_input_size [ 0 ] * self . model_input_size [ 1 ]
uv_ol = y_ol / / 4
result = bytearray ( y_ol * 3 / / 2 )
result_view = memoryview ( result )
t_y_ptr = result_view [ : y_ol ]
t_u_ptr = result_view [ y_ol : y_ol + uv_ol ]
t_v_ptr = result_view [ y_ol + uv_ol : ]
self . transform_output_buffer ( yuv_data , t_y_ptr , t_u_ptr , t_v_ptr ,
transform )
return result
def transform_output_buffer ( self , yuv_data , y_out , u_out , v_out ,
transform ) :
assert len ( yuv_data ) == self . input_size [ 0 ] * self . input_size [ 1 ] * 3 / 2
cast = self . ffi . cast
from_buffer = self . ffi . from_buffer
yuv_ptr = cast ( " unsigned char* " , from_buffer ( yuv_data ) )
transform_ptr = self . ffi . new ( " float[] " , transform )
y_out_ptr = cast ( " unsigned char* " , from_buffer ( y_out ) )
u_out_ptr = cast ( " unsigned char* " , from_buffer ( u_out ) )
v_out_ptr = cast ( " unsigned char* " , from_buffer ( v_out ) )
self . clib . visiontest_transform ( self . _visiontest_c , yuv_ptr , y_out_ptr ,
u_out_ptr , v_out_ptr , transform_ptr )
def close ( self ) :
self . clib . visiontest_destroy ( self . _visiontest_c )
self . _visiontest_c = None
def __enter__ ( self ) :
return self
def __exit__ ( self , exc_type , exc_value , traceback ) :
self . close ( )
if __name__ == " __main__ " :
VisionTest ( ( 560 , 304 ) , ( 320 , 160 ) , " temporal " )