@ -164,7 +164,7 @@ class Calibrator:
write_this_cycle = ( self . idx == 0 ) and ( self . block_idx % ( INPUTS_WANTED / / 5 ) == 5 )
if self . param_put and write_this_cycle :
put_nonblocking ( " CalibrationParams " , self . get_msg ( ) . to_bytes ( ) )
put_nonblocking ( " CalibrationParams " , self . get_msg ( True ) . to_bytes ( ) )
def handle_v_ego ( self , v_ego : float ) - > None :
self . v_ego = v_ego
@ -227,12 +227,13 @@ class Calibrator:
return new_rpy
def get_msg ( self ) - > capnp . lib . capnp . _DynamicStructBuilder :
def get_msg ( self , valid : bool ) - > capnp . lib . capnp . _DynamicStructBuilder :
smooth_rpy = self . get_smooth_rpy ( )
msg = messaging . new_message ( ' liveCalibration ' )
liveCalibration = msg . liveCalibration
msg . valid = valid
liveCalibration = msg . liveCalibration
liveCalibration . validBlocks = self . valid_blocks
liveCalibration . calStatus = self . cal_status
liveCalibration . calPerc = min ( 100 * ( self . valid_blocks * BLOCK_SIZE + self . idx ) / / ( INPUTS_NEEDED * BLOCK_SIZE ) , 100 )
@ -250,19 +251,16 @@ class Calibrator:
return msg
def send_data ( self , pm : messaging . PubMaster ) - > None :
pm . send ( ' liveCalibration ' , self . get_msg ( ) )
def send_data ( self , pm : messaging . PubMaster , valid : bool ) - > None :
pm . send ( ' liveCalibration ' , self . get_msg ( valid ) )
def calibrationd_thread ( sm : Optional [ messaging . SubMaster ] = None , pm : Optional [ messaging . PubMaster ] = None ) - > NoReturn :
def main ( ) - > NoReturn :
gc . disable ( )
set_realtime_priority ( 1 )
if sm is None :
sm = messaging . SubMaster ( [ ' cameraOdometry ' , ' carState ' , ' carParams ' ] , poll = [ ' cameraOdometry ' ] )
if pm is None :
pm = messaging . PubMaster ( [ ' liveCalibration ' ] )
sm = messaging . SubMaster ( [ ' cameraOdometry ' , ' carState ' , ' carParams ' ] , poll = [ ' cameraOdometry ' ] )
calibrator = Calibrator ( param_put = True )
@ -286,11 +284,7 @@ def calibrationd_thread(sm: Optional[messaging.SubMaster] = None, pm: Optional[m
# 4Hz driven by cameraOdometry
if sm . frame % 5 == 0 :
calibrator . send_data ( pm )
def main ( sm : Optional [ messaging . SubMaster ] = None , pm : Optional [ messaging . PubMaster ] = None ) - > NoReturn :
calibrationd_thread ( sm , pm )
calibrator . send_data ( pm , sm . all_checks ( ) )
if __name__ == " __main__ " :