@ -238,14 +238,14 @@ def torqued_rcv_callback(msg, CP, cfg, fsm):
return recv_socks , fsm . frame == 0 or msg . which ( ) == ' liveLocationKalman '
def ublox_rcv_callback ( msg ) :
def ublox_rcv_callback ( msg , CP , cfg , fsm ) :
msg_class , msg_id = msg . ubloxRaw [ 2 : 4 ]
if ( msg_class , msg_id ) in { ( 1 , 7 * 16 ) } :
return [ " gpsLocationExternal " ]
return [ " gpsLocationExternal " ] , True
elif ( msg_class , msg_id ) in { ( 2 , 1 * 16 + 5 ) , ( 10 , 9 ) } :
return [ " ubloxGnss " ]
return [ " ubloxGnss " ] , True
else :
return [ ]
return [ ] , False
CONFIGS = [
@ -364,7 +364,7 @@ CONFIGS = [
init_callback = get_car_params ,
should_recv_callback = None ,
tolerance = NUMPY_TOLERANCE ,
fake_pubsubmaster = Tru e,
fake_pubsubmaster = Fals e,
) ,
ProcessConfig (
proc_name = " torqued " ,
@ -386,7 +386,7 @@ def replay_process(cfg, lr, fingerprint=None):
if cfg . fake_pubsubmaster :
return python_replay_process ( cfg , lr , fingerprint )
else :
return cpp_ replay_process( cfg , lr , fingerprint )
return replay_process_with_socket s ( cfg , lr , fingerprint )
def setup_env ( simulation = False , CP = None , cfg = None , controlsState = None , lr = None ) :
@ -401,8 +401,12 @@ def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None):
os . environ [ " NO_RADAR_SLEEP " ] = " 1 "
os . environ [ " REPLAY " ] = " 1 "
os . environ [ ' SKIP_FW_QUERY ' ] = " "
os . environ [ ' FINGERPRINT ' ] = " "
os . environ [ " SKIP_FW_QUERY " ] = " "
os . environ [ " FINGERPRINT " ] = " "
if lr is not None :
services = { m . which ( ) for m in lr }
params . put_bool ( " UbloxAvailable " , " ubloxGnss " in services )
if lr is not None :
services = { m . which ( ) for m in lr }
@ -458,12 +462,6 @@ def python_replay_process(cfg, lr, fingerprint=None):
all_msgs = sorted ( lr , key = lambda msg : msg . logMonoTime )
pub_msgs = [ msg for msg in all_msgs if msg . which ( ) in list ( cfg . pub_sub . keys ( ) ) ]
# laikad needs decision between submaster ubloxGnss and qcomGnss, prio given to ubloxGnss
if cfg . proc_name == " laikad " :
args = ( * args , not any ( m . which ( ) == " ubloxGnss " for m in pub_msgs ) )
service = " qcomGnss " if args [ 2 ] else " ubloxGnss "
pub_msgs = [ m for m in pub_msgs if m . which ( ) == service or m . which ( ) == ' clocks ' ]
controlsState = None
initialized = False
for msg in lr :
@ -534,22 +532,28 @@ def python_replay_process(cfg, lr, fingerprint=None):
return log_msgs
def cpp_ replay_process( cfg , lr , fingerprint = None ) :
sub_sockets = [ s for _ , sub in cfg . pub_sub . items ( ) for s in sub ] # We get responses here
def replay_process_with_socket s ( cfg , lr , fingerprint = None ) :
sub_sockets = [ s for _ , sub in cfg . pub_sub . items ( ) for s in sub ]
pm = messaging . PubMaster ( cfg . pub_sub . keys ( ) )
all_msgs = sorted ( lr , key = lambda msg : msg . logMonoTime )
pub_msgs = [ msg for msg in all_msgs if msg . which ( ) in list ( cfg . pub_sub . keys ( ) ) ]
log_msgs = [ ]
# We need to fake SubMaster alive since we can't inject a fake clock
setup_env ( simulation = True , cfg = cfg , lr = lr )
if cfg . proc_name == " laikad " :
ublox = Params ( ) . get_bool ( " UbloxAvailable " )
keys = set ( cfg . pub_sub . keys ( ) ) - ( { " qcomGnss " , } if ublox else { " ubloxGnss " , } )
pub_msgs = [ msg for msg in pub_msgs if msg . which ( ) in keys ]
managed_processes [ cfg . proc_name ] . prepare ( )
managed_processes [ cfg . proc_name ] . start ( )
log_msgs = [ ]
try :
with Timeout ( TIMEOUT , error_msg = f " timed out testing process { repr ( cfg . proc_name ) } " ) :
# Wait for process to startup
with Timeout ( 10 , error_msg = f " timed out waiting for process to start: { repr ( cfg . proc_name ) } " ) :
while not any ( pm . all_readers_updated ( s ) for s in cfg . pub_sub . keys ( ) ) :
time . sleep ( 0 )
@ -558,25 +562,23 @@ def cpp_replay_process(cfg, lr, fingerprint=None):
for s in sub_sockets :
messaging . recv_one_or_none ( sockets [ s ] )
for i , msg in enumerate ( pub_msgs ) :
# Do the replay
cnt = 0
for msg in pub_msgs :
with Timeout ( TIMEOUT , error_msg = f " timed out testing process { repr ( cfg . proc_name ) } , { cnt } / { len ( pub_msgs ) } msgs done " ) :
pm . send ( msg . which ( ) , msg . as_builder ( ) )
resp_sockets = cfg . pub_sub [ msg . which ( ) ] if cfg . should_recv_callback is None else cfg . should_recv_callback ( msg )
for s in resp_sockets :
response = messaging . recv_one_retry ( sockets [ s ] )
if response is None :
print ( f " Warning, no response received { i } " )
else :
response = response . as_builder ( )
response . logMonoTime = msg . logMonoTime
response = response . as_reader ( )
log_msgs . append ( response )
if not len ( resp_sockets ) : # We only need to wait if we didn't already wait for a response
while not pm . all_readers_updated ( msg . which ( ) ) :
time . sleep ( 0 )
resp_sockets = cfg . pub_sub [ msg . which ( ) ]
if cfg . should_recv_callback is not None :
resp_sockets , _ = cfg . should_recv_callback ( msg , None , None , None )
for s in resp_sockets :
m = messaging . recv_one_retry ( sockets [ s ] )
m = m . as_builder ( )
m . logMonoTime = msg . logMonoTime
log_msgs . append ( m . as_reader ( ) )
cnt + = 1
finally :
managed_processes [ cfg . proc_name ] . signal ( signal . SIGKILL )
managed_processes [ cfg . proc_name ] . stop ( )