@ -385,7 +385,7 @@ def replay_process(cfg, lr, fingerprint=None):
return cpp_replay_process ( cfg , lr , fingerprint )
def setup_env ( simulation = False , CP = None , cfg = None ) :
def setup_env ( simulation = False , CP = None , cfg = None , controlsState = None ) :
params = Params ( )
params . clear_all ( )
params . put_bool ( " OpenpilotEnabledToggle " , True )
@ -414,6 +414,12 @@ def setup_env(simulation=False, CP=None, cfg=None):
elif " SIMULATION " in os . environ :
del os . environ [ " SIMULATION " ]
# Initialize controlsd with a controlsState packet
if controlsState is not None :
params . put ( " ReplayControlsState " , controlsState . as_builder ( ) . to_bytes ( ) )
else :
params . delete ( " ReplayControlsState " )
# Regen or python process
if CP is not None :
if CP . alternativeExperience == ALTERNATIVE_EXPERIENCE . DISABLE_DISENGAGE_ON_GAS :
@ -440,13 +446,25 @@ def python_replay_process(cfg, lr, fingerprint=None):
all_msgs = sorted ( lr , key = lambda msg : msg . logMonoTime )
pub_msgs = [ msg for msg in all_msgs if msg . which ( ) in list ( cfg . pub_sub . keys ( ) ) ]
controlsState = None
initialized = False
for msg in lr :
if msg . which ( ) == ' controlsState ' :
controlsState = msg . controlsState
if initialized :
break
elif msg . which ( ) == ' carEvents ' :
initialized = car . CarEvent . EventName . controlsInitializing not in [ e . name for e in msg . carEvents ]
assert controlsState is not None and initialized , " controlsState never initialized "
if fingerprint is not None :
os . environ [ ' SKIP_FW_QUERY ' ] = " 1 "
os . environ [ ' FINGERPRINT ' ] = fingerprint
setup_env ( cfg = cfg )
setup_env ( cfg = cfg , controlsState = controlsState )
else :
CP = [ m for m in lr if m . which ( ) == ' carParams ' ] [ 0 ] . carParams
setup_env ( CP = CP , cfg = cfg )
setup_env ( CP = CP , cfg = cfg , controlsState = controlsState )
assert ( type ( managed_processes [ cfg . proc_name ] ) is PythonProcess )
managed_processes [ cfg . proc_name ] . prepare ( )