@ -432,32 +432,38 @@ def python_replay_process(cfg, lr, fingerprint=None):
def cpp_replay_process ( cfg , lr , fingerprint = None ) :
def cpp_replay_process ( cfg , lr , fingerprint = None ) :
sub_sockets = [ s for _ , sub in cfg . pub_sub . items ( ) for s in sub ] # We get responses here
sub_sockets = [ s for _ , sub in cfg . pub_sub . items ( ) for s in sub ] # We get responses here
pm = messaging . PubMaster ( cfg . pub_sub . keys ( ) )
pm = messaging . PubMaster ( cfg . pub_sub . keys ( ) )
sockets = { s : messaging . sub_sock ( s , timeout = 1000 ) for s in sub_sockets }
all_msgs = sorted ( lr , key = lambda msg : msg . logMonoTime )
all_msgs = sorted ( lr , key = lambda msg : msg . logMonoTime )
pub_msgs = [ msg for msg in all_msgs if msg . which ( ) in list ( cfg . pub_sub . keys ( ) ) ]
pub_msgs = [ msg for msg in all_msgs if msg . which ( ) in list ( cfg . pub_sub . keys ( ) ) ]
os . environ [ " SIMULATION " ] = " 1 " # Disable submaster alive checks
managed_processes [ cfg . proc_name ] . prepare ( )
managed_processes [ cfg . proc_name ] . prepare ( )
managed_processes [ cfg . proc_name ] . start ( )
managed_processes [ cfg . proc_name ] . start ( )
time . sleep ( 1 ) # We give the process time to start
while not all ( pm . all_readers_updated ( s ) for s in cfg . pub_sub . keys ( ) ) :
time . sleep ( 0 )
log_msgs = [ ]
# Make sure all subscribers are connected
sockets = { s : messaging . sub_sock ( s , timeout = 1000 ) for s in sub_sockets }
for s in sub_sockets :
for s in sub_sockets :
messaging . recv_one_or_none ( sockets [ s ] )
messaging . recv_one_or_none ( sockets [ s ] )
for msg in tqdm ( pub_msgs , disable = CI ) :
log_msgs = [ ]
for i , msg in enumerate ( tqdm ( pub_msgs , disable = CI ) ) :
pm . send ( msg . which ( ) , msg . as_builder ( ) )
pm . send ( msg . which ( ) , msg . as_builder ( ) )
resp_sockets = cfg . pub_sub [ msg . which ( ) ] if cfg . should_recv_callback is None else cfg . should_recv_callback ( msg )
resp_sockets = cfg . pub_sub [ msg . which ( ) ] if cfg . should_recv_callback is None else cfg . should_recv_callback ( msg )
for s in resp_sockets :
for s in resp_sockets :
response = messaging . recv_one ( sockets [ s ] )
response = messaging . recv_one ( sockets [ s ] )
if response is not None :
if response is None :
print ( f " Warning, no response received { i } " )
else :
log_msgs . append ( response )
log_msgs . append ( response )
if not len ( resp_sockets ) :
if not len ( resp_sockets ) : # We only need to wait if we didn't already wait for a response
while not pm . all_readers_updated ( msg . which ( ) ) :
while not pm . all_readers_updated ( msg . which ( ) ) :
time . sleep ( 0 )
time . sleep ( 0 )
time . sleep ( 0.0001 )
managed_processes [ cfg . proc_name ] . stop ( )
managed_processes [ cfg . proc_name ] . stop ( )
return log_msgs
return log_msgs