You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							135 lines
						
					
					
						
							4.3 KiB
						
					
					
				
			
		
		
	
	
							135 lines
						
					
					
						
							4.3 KiB
						
					
					
				| import os
 | |
| import subprocess
 | |
| from cffi import FFI
 | |
| from common.basedir import BASEDIR
 | |
| 
 | |
| # Initialize visiontest. Ignore output.
 | |
| _visiond_dir = os.path.dirname(os.path.abspath(__file__))
 | |
| _libvisiontest = "libvisiontest.so"
 | |
| try:  # bacause this crashes somtimes when running pipeline
 | |
|   subprocess.check_output(["make", "-C", _visiond_dir, "-f",
 | |
|                          os.path.join(_visiond_dir, "visiontest.mk"),
 | |
|                          _libvisiontest])
 | |
| except:
 | |
|   pass
 | |
| 
 | |
| class VisionTest():
 | |
|   """A version of the vision model that can be run on a desktop.
 | |
| 
 | |
|      WARNING: This class is not thread safe. VisionTest objects cannot be
 | |
|               created or used on multiple threads simultaneously.
 | |
|   """
 | |
| 
 | |
|   ffi = FFI()
 | |
|   ffi.cdef("""
 | |
|   typedef unsigned char uint8_t;
 | |
| 
 | |
|   struct VisionTest;
 | |
|   typedef struct VisionTest VisionTest;
 | |
| 
 | |
|   VisionTest* visiontest_create(int temporal_model, int disable_model,
 | |
|                                 int input_width, int input_height,
 | |
|                                 int model_input_width, int model_input_height);
 | |
|   void visiontest_destroy(VisionTest* visiontest);
 | |
| 
 | |
|   void visiontest_transform(VisionTest* vt, const uint8_t* yuv_data,
 | |
|                             uint8_t* out_y, uint8_t* out_u, uint8_t* out_v,
 | |
|                             const float* transform);
 | |
|   """)
 | |
| 
 | |
|   clib = ffi.dlopen(os.path.join(_visiond_dir, _libvisiontest))
 | |
| 
 | |
|   def __init__(self, input_size, model_input_size, model):
 | |
|     """Create a wrapper around visiond for off-device python code.
 | |
| 
 | |
|        Inputs:
 | |
|         input_size: The size of YUV images passed to transform.
 | |
|         model_input_size: The size of YUV images passed to the model.
 | |
|         model: The name of the model to use. "temporal", "yuv", or None to disable the
 | |
|                model (used to disable OpenCL).
 | |
|     """
 | |
|     self._input_size = input_size
 | |
|     self._model_input_size = model_input_size
 | |
| 
 | |
|     if model is None:
 | |
|       disable_model = 1
 | |
|       temporal_model = 0
 | |
|     elif model == "yuv":
 | |
|       disable_model = 0
 | |
|       temporal_model = 0
 | |
|     elif model == "temporal":
 | |
|       disable_model = 0
 | |
|       temporal_model = 1
 | |
|     else:
 | |
|       raise ValueError("Bad model name: {}".format(model))
 | |
| 
 | |
|     prevdir = os.getcwd()
 | |
|     os.chdir(_visiond_dir) # tmp hack to find kernels
 | |
|     os.environ['BASEDIR'] = BASEDIR
 | |
|     self._visiontest_c = self.clib.visiontest_create(
 | |
|       temporal_model, disable_model, self._input_size[0], self._input_size[1],
 | |
|       self._model_input_size[0], self._model_input_size[1])
 | |
|     os.chdir(prevdir)
 | |
| 
 | |
|   @property
 | |
|   def input_size(self):
 | |
|     return self._input_size
 | |
| 
 | |
|   @property
 | |
|   def model_input_size(self):
 | |
|     return self._model_input_size
 | |
| 
 | |
|   def transform(self, yuv_data, transform):
 | |
|     y_len = self.model_input_size[0] * self.model_input_size[1]
 | |
|     t_y_ptr = bytearray(y_len)
 | |
|     t_u_ptr = bytearray(y_len // 4)
 | |
|     t_v_ptr = bytearray(y_len // 4)
 | |
| 
 | |
|     self.transform_output_buffer(yuv_data, t_y_ptr, t_u_ptr, t_v_ptr,
 | |
|                                  transform)
 | |
| 
 | |
|     return t_y_ptr, t_u_ptr, t_v_ptr
 | |
| 
 | |
|   def transform_contiguous(self, yuv_data, transform):
 | |
|     y_ol = self.model_input_size[0] * self.model_input_size[1]
 | |
|     uv_ol = y_ol // 4
 | |
|     result = bytearray(y_ol * 3 // 2)
 | |
|     result_view = memoryview(result)
 | |
|     t_y_ptr = result_view[:y_ol]
 | |
|     t_u_ptr = result_view[y_ol:y_ol + uv_ol]
 | |
|     t_v_ptr = result_view[y_ol + uv_ol:]
 | |
| 
 | |
|     self.transform_output_buffer(yuv_data, t_y_ptr, t_u_ptr, t_v_ptr,
 | |
|                                  transform)
 | |
|     return result
 | |
| 
 | |
| 
 | |
|   def transform_output_buffer(self, yuv_data, y_out, u_out, v_out,
 | |
|                               transform):
 | |
|     assert len(yuv_data) == self.input_size[0] * self.input_size[1] * 3/2
 | |
| 
 | |
|     cast = self.ffi.cast
 | |
|     from_buffer = self.ffi.from_buffer
 | |
|     yuv_ptr = cast("unsigned char*", from_buffer(yuv_data))
 | |
|     transform_ptr = self.ffi.new("float[]", transform)
 | |
| 
 | |
|     y_out_ptr = cast("unsigned char*", from_buffer(y_out))
 | |
|     u_out_ptr = cast("unsigned char*", from_buffer(u_out))
 | |
|     v_out_ptr = cast("unsigned char*", from_buffer(v_out))
 | |
| 
 | |
|     self.clib.visiontest_transform(self._visiontest_c, yuv_ptr, y_out_ptr,
 | |
|                                    u_out_ptr, v_out_ptr, transform_ptr)
 | |
| 
 | |
|   def close(self):
 | |
|     self.clib.visiontest_destroy(self._visiontest_c)
 | |
|     self._visiontest_c = None
 | |
| 
 | |
|   def __enter__(self):
 | |
|     return self
 | |
| 
 | |
|   def __exit__(self, type, value, traceback):
 | |
|     self.close()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   VisionTest((560, 304), (320, 160), "temporal")
 | |
| 
 |