openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 lines
4.3 KiB

import os
import subprocess
from cffi import FFI
from common.basedir import BASEDIR
# Initialize visiontest. Ignore output.
_visiond_dir = os.path.dirname(os.path.abspath(__file__))
_libvisiontest = "libvisiontest.so"
try: # bacause this crashes somtimes when running pipeline
subprocess.check_output(["make", "-C", _visiond_dir, "-f",
os.path.join(_visiond_dir, "visiontest.mk"),
_libvisiontest])
except:
pass
class VisionTest():
"""A version of the vision model that can be run on a desktop.
WARNING: This class is not thread safe. VisionTest objects cannot be
created or used on multiple threads simultaneously.
"""
ffi = FFI()
ffi.cdef("""
typedef unsigned char uint8_t;
struct VisionTest;
typedef struct VisionTest VisionTest;
VisionTest* visiontest_create(int temporal_model, int disable_model,
int input_width, int input_height,
int model_input_width, int model_input_height);
void visiontest_destroy(VisionTest* visiontest);
void visiontest_transform(VisionTest* vt, const uint8_t* yuv_data,
uint8_t* out_y, uint8_t* out_u, uint8_t* out_v,
const float* transform);
""")
clib = ffi.dlopen(os.path.join(_visiond_dir, _libvisiontest))
def __init__(self, input_size, model_input_size, model):
"""Create a wrapper around visiond for off-device python code.
Inputs:
input_size: The size of YUV images passed to transform.
model_input_size: The size of YUV images passed to the model.
model: The name of the model to use. "temporal", "yuv", or None to disable the
model (used to disable OpenCL).
"""
self._input_size = input_size
self._model_input_size = model_input_size
if model is None:
disable_model = 1
temporal_model = 0
elif model == "yuv":
disable_model = 0
temporal_model = 0
elif model == "temporal":
disable_model = 0
temporal_model = 1
else:
raise ValueError("Bad model name: {}".format(model))
prevdir = os.getcwd()
os.chdir(_visiond_dir) # tmp hack to find kernels
os.environ['BASEDIR'] = BASEDIR
self._visiontest_c = self.clib.visiontest_create(
temporal_model, disable_model, self._input_size[0], self._input_size[1],
self._model_input_size[0], self._model_input_size[1])
os.chdir(prevdir)
@property
def input_size(self):
return self._input_size
@property
def model_input_size(self):
return self._model_input_size
def transform(self, yuv_data, transform):
y_len = self.model_input_size[0] * self.model_input_size[1]
t_y_ptr = bytearray(y_len)
t_u_ptr = bytearray(y_len // 4)
t_v_ptr = bytearray(y_len // 4)
self.transform_output_buffer(yuv_data, t_y_ptr, t_u_ptr, t_v_ptr,
transform)
return t_y_ptr, t_u_ptr, t_v_ptr
def transform_contiguous(self, yuv_data, transform):
y_ol = self.model_input_size[0] * self.model_input_size[1]
uv_ol = y_ol // 4
result = bytearray(y_ol * 3 // 2)
result_view = memoryview(result)
t_y_ptr = result_view[:y_ol]
t_u_ptr = result_view[y_ol:y_ol + uv_ol]
t_v_ptr = result_view[y_ol + uv_ol:]
self.transform_output_buffer(yuv_data, t_y_ptr, t_u_ptr, t_v_ptr,
transform)
return result
def transform_output_buffer(self, yuv_data, y_out, u_out, v_out,
transform):
assert len(yuv_data) == self.input_size[0] * self.input_size[1] * 3/2
cast = self.ffi.cast
from_buffer = self.ffi.from_buffer
yuv_ptr = cast("unsigned char*", from_buffer(yuv_data))
transform_ptr = self.ffi.new("float[]", transform)
y_out_ptr = cast("unsigned char*", from_buffer(y_out))
u_out_ptr = cast("unsigned char*", from_buffer(u_out))
v_out_ptr = cast("unsigned char*", from_buffer(v_out))
self.clib.visiontest_transform(self._visiontest_c, yuv_ptr, y_out_ptr,
u_out_ptr, v_out_ptr, transform_ptr)
def close(self):
self.clib.visiontest_destroy(self._visiontest_c)
self._visiontest_c = None
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()
if __name__ == "__main__":
VisionTest((560, 304), (320, 160), "temporal")