@ -24,6 +24,8 @@ MIN_SPEED_FILTER = 15 * CV.MPH_TO_MS
MAX_VEL_ANGLE_STD = np . radians ( 0.25 )
MAX_YAW_RATE_FILTER = np . radians ( 2 ) # per second
MAX_HEIGHT_STD = np . exp ( - 3.5 )
# This is at model frequency, blocks needed for efficiency
SMOOTH_CYCLES = 10
BLOCK_SIZE = 100
@ -32,6 +34,7 @@ INPUTS_WANTED = 50 # We want a little bit more than we need for stability
MAX_ALLOWED_SPREAD = np . radians ( 2 )
RPY_INIT = np . array ( [ 0.0 , 0.0 , 0.0 ] )
WIDE_FROM_DEVICE_EULER_INIT = np . array ( [ 0.0 , 0.0 , 0.0 ] )
HEIGHT_INIT = np . array ( [ 1.22 ] )
# These values are needed to accommodate the model frame in the narrow cam of the C3
PITCH_LIMITS = np . array ( [ - 0.09074112085129739 , 0.17 ] )
@ -50,6 +53,8 @@ def sanity_clip(rpy: np.ndarray) -> np.ndarray:
np . clip ( rpy [ 1 ] , PITCH_LIMITS [ 0 ] - .005 , PITCH_LIMITS [ 1 ] + .005 ) ,
np . clip ( rpy [ 2 ] , YAW_LIMITS [ 0 ] - .005 , YAW_LIMITS [ 1 ] + .005 ) ] )
def moving_avg_with_linear_decay ( prev_mean : np . ndarray , new_val : np . ndarray , idx : int , block_size : float ) - > np . ndarray :
return ( idx * prev_mean + ( block_size - idx ) * new_val ) / block_size
class Calibrator :
def __init__ ( self , param_put : bool = False ) :
@ -62,6 +67,7 @@ class Calibrator:
calibration_params = params . get ( " CalibrationParams " )
rpy_init = RPY_INIT
wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT
height = HEIGHT_INIT
valid_blocks = 0
self . cal_status = log . LiveCalibrationData . Status . uncalibrated
@ -71,21 +77,28 @@ class Calibrator:
rpy_init = np . array ( msg . liveCalibration . rpyCalib )
valid_blocks = msg . liveCalibration . validBlocks
wide_from_device_euler = np . array ( msg . liveCalibration . wideFromDeviceEuler )
height = np . array ( msg . liveCalibration . height )
except Exception :
cloudlog . exception ( " Error reading cached CalibrationParams " )
self . reset ( rpy_init , valid_blocks , wide_from_device_euler )
self . reset ( rpy_init , valid_blocks , wide_from_device_euler , height )
self . update_status ( )
def reset ( self , rpy_init : np . ndarray = RPY_INIT ,
valid_blocks : int = 0 ,
wide_from_device_euler_init : np . ndarray = WIDE_FROM_DEVICE_EULER_INIT ,
height_init : np . ndarray = HEIGHT_INIT ,
smooth_from : Optional [ np . ndarray ] = None ) - > None :
if not np . isfinite ( rpy_init ) . all ( ) :
self . rpy = RPY_INIT . copy ( )
else :
self . rpy = rpy_init . copy ( )
if not np . isfinite ( height_init ) . all ( ) or len ( height_init ) != 1 :
self . height = HEIGHT_INIT . copy ( )
else :
self . height = height_init . copy ( )
if not np . isfinite ( wide_from_device_euler_init ) . all ( ) or len ( wide_from_device_euler_init ) != 3 :
self . wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT . copy ( )
else :
@ -98,6 +111,7 @@ class Calibrator:
self . rpys = np . tile ( self . rpy , ( INPUTS_WANTED , 1 ) )
self . wide_from_device_eulers = np . tile ( self . wide_from_device_euler , ( INPUTS_WANTED , 1 ) )
self . heights = np . tile ( self . height , ( INPUTS_WANTED , 1 ) )
self . idx = 0
self . block_idx = 0
@ -120,6 +134,7 @@ class Calibrator:
valid_idxs = self . get_valid_idxs ( )
if valid_idxs :
self . wide_from_device_euler = np . mean ( self . wide_from_device_eulers [ valid_idxs ] , axis = 0 )
self . height = np . mean ( self . heights [ valid_idxs ] , axis = 0 )
rpys = self . rpys [ valid_idxs ]
self . rpy = np . mean ( rpys , axis = 0 )
max_rpy_calib = np . array ( np . max ( rpys , axis = 0 ) )
@ -140,6 +155,7 @@ class Calibrator:
# If spread is too high, assume mounting was changed and reset to last block.
# Make the transition smooth. Abrupt transitions are not good for feedback loop through supercombo model.
# TODO: add height spread check with smooth transition too
if max ( self . calib_spread ) > MAX_ALLOWED_SPREAD and self . cal_status == log . LiveCalibrationData . Status . calibrated :
self . reset ( self . rpys [ self . block_idx - 1 ] , valid_blocks = 1 , smooth_from = self . rpy )
self . cal_status = log . LiveCalibrationData . Status . recalibrating
@ -160,13 +176,21 @@ class Calibrator:
def handle_cam_odom ( self , trans : List [ float ] ,
rot : List [ float ] ,
wide_from_device_euler : List [ float ] ,
trans_std : List [ float ] ) - > Optional [ np . ndarray ] :
trans_std : List [ float ] ,
road_transform_trans : List [ float ] ,
road_transform_trans_std : List [ float ] ) - > Optional [ np . ndarray ] :
self . old_rpy_weight = max ( 0.0 , self . old_rpy_weight - 1 / SMOOTH_CYCLES )
straight_and_fast = ( ( self . v_ego > MIN_SPEED_FILTER ) and ( trans [ 0 ] > MIN_SPEED_FILTER ) and ( abs ( rot [ 2 ] ) < MAX_YAW_RATE_FILTER ) )
angle_std_threshold = MAX_VEL_ANGLE_STD
certain_if_calib = ( ( np . arctan2 ( trans_std [ 1 ] , trans [ 0 ] ) < angle_std_threshold ) or
( self . valid_blocks < INPUTS_NEEDED ) )
height_std_threshold = MAX_HEIGHT_STD
rpy_certain = np . arctan2 ( trans_std [ 1 ] , trans [ 0 ] ) < angle_std_threshold
if len ( road_transform_trans_std ) == 3 :
height_certain = road_transform_trans_std [ 2 ] < height_std_threshold
else :
height_certain = True
certain_if_calib = ( rpy_certain and height_certain ) or ( self . valid_blocks < INPUTS_NEEDED )
if not ( straight_and_fast and certain_if_calib ) :
return None
@ -180,10 +204,16 @@ class Calibrator:
new_wide_from_device_euler = np . array ( wide_from_device_euler )
else :
new_wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT
self . rpys [ self . block_idx ] = ( self . idx * self . rpys [ self . block_idx ] +
( BLOCK_SIZE - self . idx ) * new_rpy ) / float ( BLOCK_SIZE )
self . wide_from_device_eulers [ self . block_idx ] = ( self . idx * self . wide_from_device_eulers [ self . block_idx ] +
( BLOCK_SIZE - self . idx ) * new_wide_from_device_euler ) / float ( BLOCK_SIZE )
if ( len ( road_transform_trans ) == 3 ) :
new_height = np . array ( [ road_transform_trans [ 2 ] ] )
else :
new_height = HEIGHT_INIT
self . rpys [ self . block_idx ] = moving_avg_with_linear_decay ( self . rpys [ self . block_idx ] , new_rpy , self . idx , float ( BLOCK_SIZE ) )
self . wide_from_device_eulers [ self . block_idx ] = moving_avg_with_linear_decay ( self . wide_from_device_eulers [ self . block_idx ] , new_wide_from_device_euler , self . idx , float ( BLOCK_SIZE ) )
self . heights [ self . block_idx ] = moving_avg_with_linear_decay ( self . heights [ self . block_idx ] , new_height , self . idx , float ( BLOCK_SIZE ) )
self . idx = ( self . idx + 1 ) % BLOCK_SIZE
if self . idx == 0 :
self . block_idx + = 1
@ -206,6 +236,7 @@ class Calibrator:
liveCalibration . rpyCalib = smooth_rpy . tolist ( )
liveCalibration . rpyCalibSpread = self . calib_spread . tolist ( )
liveCalibration . wideFromDeviceEuler = self . wide_from_device_euler . tolist ( )
liveCalibration . height = self . height . tolist ( )
if self . not_car :
liveCalibration . validBlocks = INPUTS_NEEDED
@ -243,7 +274,9 @@ def calibrationd_thread(sm: Optional[messaging.SubMaster] = None, pm: Optional[m
new_rpy = calibrator . handle_cam_odom ( sm [ ' cameraOdometry ' ] . trans ,
sm [ ' cameraOdometry ' ] . rot ,
sm [ ' cameraOdometry ' ] . wideFromDeviceEuler ,
sm [ ' cameraOdometry ' ] . transStd )
sm [ ' cameraOdometry ' ] . transStd ,
sm [ ' cameraOdometry ' ] . roadTransformTrans ,
sm [ ' cameraOdometry ' ] . roadTransformTransStd )
if DEBUG and new_rpy is not None :
print ( ' got new rpy ' , new_rpy )