import os
import numpy as np
import hashlib
import pyopencl as cl # install with `PYOPENCL_CL_PRETEND_VERSION=2.0 pip install pyopencl`
from openpilot . system . hardware import PC , TICI
from openpilot . common . basedir import BASEDIR
from openpilot . common . transformations . camera import DEVICE_CAMERAS
from openpilot . system . camerad . snapshot . snapshot import yuv_to_rgb
from openpilot . tools . lib . logreader import LogReader
# TODO: check all sensors
TEST_ROUTE = " 8345e3b82948d454|2022-05-04--13-45-33/0 "
cam = DEVICE_CAMERAS [ ( " tici " , " ar0231 " ) ]
FRAME_WIDTH , FRAME_HEIGHT = ( cam . dcam . width , cam . dcam . height )
FRAME_STRIDE = FRAME_WIDTH * 12 / / 8 + 4
UV_WIDTH = FRAME_WIDTH / / 2
UV_HEIGHT = FRAME_HEIGHT / / 2
UV_SIZE = UV_WIDTH * UV_HEIGHT
def init_kernels ( frame_offset = 0 ) :
ctx = cl . create_some_context ( interactive = False )
with open ( os . path . join ( BASEDIR , ' system/camerad/cameras/process_raw.cl ' ) ) as f :
build_args = f ' -cl-fast-relaxed-math -cl-denorms-are-zero -cl-single-precision-constant -I { BASEDIR } /system/camerad/sensors ' + \
f ' -DFRAME_WIDTH= { FRAME_WIDTH } -DFRAME_HEIGHT= { FRAME_WIDTH } -DFRAME_STRIDE= { FRAME_STRIDE } -DFRAME_OFFSET= { frame_offset } ' + \
f ' -DRGB_WIDTH= { FRAME_WIDTH } -DRGB_HEIGHT= { FRAME_HEIGHT } -DYUV_STRIDE= { FRAME_WIDTH } -DUV_OFFSET= { FRAME_WIDTH * FRAME_HEIGHT } ' + \
' -DSENSOR_ID=1 -DVIGNETTING=0 '
if PC :
build_args + = ' -DHALF_AS_FLOAT=1 -cl-std=CL2.0 '
imgproc_prg = cl . Program ( ctx , f . read ( ) ) . build ( options = build_args )
return ctx , imgproc_prg
def proc_frame ( ctx , imgproc_prg , data , rgb = False ) :
q = cl . CommandQueue ( ctx )
yuv_buff = np . empty ( FRAME_WIDTH * FRAME_HEIGHT + UV_SIZE * 2 , dtype = np . uint8 )
cam_g = cl . Buffer ( ctx , cl . mem_flags . READ_ONLY | cl . mem_flags . COPY_HOST_PTR , hostbuf = data )
yuv_g = cl . Buffer ( ctx , cl . mem_flags . WRITE_ONLY , FRAME_WIDTH * FRAME_HEIGHT + UV_SIZE * 2 )
krn = imgproc_prg . process_raw
krn . set_scalar_arg_dtypes ( [ None , None , np . int32 ] )
local_worksize = ( 20 , 20 ) if TICI else ( 4 , 4 )
ev1 = krn ( q , ( FRAME_WIDTH / / 2 , FRAME_HEIGHT / / 2 ) , local_worksize , cam_g , yuv_g , 1 )
cl . enqueue_copy ( q , yuv_buff , yuv_g , wait_for = [ ev1 ] ) . wait ( )
cl . enqueue_barrier ( q )
y = yuv_buff [ : FRAME_WIDTH * FRAME_HEIGHT ] . reshape ( ( FRAME_HEIGHT , FRAME_WIDTH ) )
u = yuv_buff [ FRAME_WIDTH * FRAME_HEIGHT : : 2 ] . reshape ( ( UV_HEIGHT , UV_WIDTH ) )
v = yuv_buff [ FRAME_WIDTH * FRAME_HEIGHT + 1 : : 2 ] . reshape ( ( UV_HEIGHT , UV_WIDTH ) )
if rgb :
return yuv_to_rgb ( y , u , v )
else :
return y , u , v
def imgproc_replay ( lr ) :
ctx , imgproc_prg = init_kernels ( )
frames = [ ]
for m in lr :
if m . which ( ) == ' roadCameraState ' :
cs = m . roadCameraState
if cs . image :
data = np . frombuffer ( cs . image , dtype = np . uint8 )
img = proc_frame ( ctx , imgproc_prg , data )
frames . append ( img )
return frames
if __name__ == " __main__ " :
# load logs
lr = list ( LogReader ( TEST_ROUTE ) )
# run replay
out_frames = imgproc_replay ( lr )
all_pix = np . concatenate ( [ np . concatenate ( [ d . flatten ( ) for d in f ] ) for f in out_frames ] )
pix_hash = hashlib . sha1 ( all_pix ) . hexdigest ( )
with open ( ' imgproc_replay_ref_hash ' ) as f :
ref_hash = f . read ( )
if pix_hash != ref_hash :
print ( " result changed! please check kernel " )
print ( f " ref: { ref_hash } " )
print ( f " new: { pix_hash } " )
else :
print ( " test passed " )