@ -59,14 +59,14 @@ class ModelState:
def __init__ ( self , context : CLContext ) :
self . frames = { ' input_imgs ' : DrivingModelFrame ( context ) , ' big_input_imgs ' : DrivingModelFrame ( context ) }
self . prev_desire = np . zeros ( ModelConstants . DESIRE_LEN , dtype = np . float32 )
self . full_features_20Hz = np . zeros ( ( ModelConstants . FULL_HISTORY_BUFFER_LEN , ModelConstants . FEATURE_LEN ) , dtype = np . float32 )
self . desire_20Hz = np . zeros ( ( ModelConstants . FULL_HISTORY_BUFFER_LEN + 1 , ModelConstants . DESIRE_LEN ) , dtype = np . float32 )
# img buffers are managed in openCL transform code
self . numpy_inputs = {
' desire ' : np . zeros ( ( 1 , ( ModelConstants . HISTORY_BUFFER_LEN + 1 ) , ModelConstants . DESIRE_LEN ) , dtype = np . float32 ) ,
' desire ' : np . zeros ( ( 1 , ( ModelConstants . FULL_ HISTORY_BUFFER_LEN+ 1 ) , ModelConstants . DESIRE_LEN ) , dtype = np . float32 ) ,
' traffic_convention ' : np . zeros ( ( 1 , ModelConstants . TRAFFIC_CONVENTION_LEN ) , dtype = np . float32 ) ,
' features_buffer ' : np . zeros ( ( 1 , ModelConstants . HISTORY_BUFFER_LEN , ModelConstants . FEATURE_LEN ) , dtype = np . float32 ) ,
' lateral_control_params ' : np . zeros ( ( 1 , ModelConstants . LATERAL_CONTROL_PARAMS_LEN ) , dtype = np . float32 ) ,
' prev_desired_curv ' : np . zeros ( ( 1 , ( ModelConstants . FULL_HISTORY_BUFFER_LEN + 1 ) , ModelConstants . PREV_DESIRED_CURV_LEN ) , dtype = np . float32 ) ,
' features_buffer ' : np . zeros ( ( 1 , ModelConstants . FULL_HISTORY_BUFFER_LEN , ModelConstants . FEATURE_LEN ) , dtype = np . float32 ) ,
}
with open ( METADATA_PATH , ' rb ' ) as f :
@ -98,11 +98,11 @@ class ModelState:
new_desire = np . where ( inputs [ ' desire ' ] - self . prev_desire > .99 , inputs [ ' desire ' ] , 0 )
self . prev_desire [ : ] = inputs [ ' desire ' ]
self . desire_20Hz [ : - 1 ] = self . desire_20Hz [ 1 : ]
self . desire_20Hz [ - 1 ] = new_desire
self . numpy_inputs [ ' desire ' ] [ : ] = self . desire_20Hz . reshape ( ( 1 , 25 , 4 , - 1 ) ) . max ( axis = 2 )
self . numpy_inputs [ ' desire ' ] [ 0 , : - 1 ] = self . numpy_inputs [ ' desire ' ] [ 0 , 1 : ]
self . numpy_inputs [ ' desire ' ] [ 0 , - 1 ] = new_desire
self . numpy_inputs [ ' traffic_convention ' ] [ : ] = inputs [ ' traffic_convention ' ]
self . numpy_inputs [ ' lateral_control_params ' ] [ : ] = inputs [ ' lateral_control_params ' ]
imgs_cl = { ' input_imgs ' : self . frames [ ' input_imgs ' ] . prepare ( buf , transform . flatten ( ) ) ,
' big_input_imgs ' : self . frames [ ' big_input_imgs ' ] . prepare ( wbuf , transform_wide . flatten ( ) ) }
@ -113,7 +113,7 @@ class ModelState:
self . tensor_inputs [ key ] = qcom_tensor_from_opencl_address ( imgs_cl [ key ] . mem_address , self . input_shapes [ key ] , dtype = dtypes . uint8 )
else :
for key in imgs_cl :
self . numpy_inputs [ key ] = self . frames [ key ] . buffer_from_cl ( imgs_cl [ key ] ) . reshape ( self . input_shapes [ key ] )
self . numpy_inputs [ key ] = self . frames [ key ] . buffer_from_cl ( imgs_cl [ key ] ) . reshape ( self . input_shapes [ key ] ) . astype ( dtype = np . float32 )
if prepare_only :
return None
@ -125,11 +125,13 @@ class ModelState:
outputs = self . parser . parse_outputs ( self . slice_outputs ( self . output ) )
self . full_features_20Hz [ : - 1 ] = self . full_features_20Hz [ 1 : ]
self . full_features_20Hz [ - 1 ] = outputs [ ' hidden_state ' ] [ 0 , : ]
self . numpy_inputs [ ' features_buffer ' ] [ 0 , : - 1 ] = self . numpy_inputs [ ' features_buffer ' ] [ 0 , 1 : ]
self . numpy_inputs [ ' features_buffer ' ] [ 0 , - 1 ] = outputs [ ' hidden_state ' ] [ 0 , : ]
idxs = np . arange ( - 4 , - 100 , - 4 ) [ : : - 1 ]
self . numpy_inputs [ ' features_buffer ' ] [ : ] = self . full_features_20Hz [ idxs ]
# TODO model only uses last value now
self . numpy_inputs [ ' prev_desired_curv ' ] [ 0 , : - 1 ] = self . numpy_inputs [ ' prev_desired_curv ' ] [ 0 , 1 : ]
self . numpy_inputs [ ' prev_desired_curv ' ] [ 0 , - 1 , : ] = outputs [ ' desired_curvature ' ] [ 0 , : ]
return outputs
@ -240,6 +242,7 @@ def main(demo=False):
is_rhd = sm [ " driverMonitoringState " ] . isRHD
frame_id = sm [ " roadCameraState " ] . frameId
v_ego = max ( sm [ " carState " ] . vEgo , 0. )
lateral_control_params = np . array ( [ v_ego , steer_delay ] , dtype = np . float32 )
if sm . updated [ " liveCalibration " ] and sm . seen [ ' roadCameraState ' ] and sm . seen [ ' deviceState ' ] :
device_from_calib_euler = np . array ( sm [ " liveCalibration " ] . rpyCalib , dtype = np . float32 )
dc = DEVICE_CAMERAS [ ( str ( sm [ ' deviceState ' ] . deviceType ) , str ( sm [ ' roadCameraState ' ] . sensor ) ) ]
@ -270,6 +273,7 @@ def main(demo=False):
inputs : dict [ str , np . ndarray ] = {
' desire ' : vec_desire ,
' traffic_convention ' : traffic_convention ,
' lateral_control_params ' : lateral_control_params ,
}
mt1 = time . perf_counter ( )