@ -19,7 +19,7 @@ from panda.python import Panda
from selfdrive . car . toyota . values import EPS_SCALE
from selfdrive . manager . process import ensure_running
from selfdrive . manager . process_config import managed_processes
from selfdrive . test . process_replay . process_replay import FAKEDATA , setup_env , check_enabled
from selfdrive . test . process_replay . process_replay import CONFIGS , FAKEDATA , setup_env , check_enabled
from selfdrive . test . update_ci_routes import upload_route
from tools . lib . route import Route
from tools . lib . framereader import FrameReader
@ -233,7 +233,11 @@ def migrate_sensorEvents(lr, old_logtime=False):
return all_msgs
def regen_segment ( lr , frs = None , outdir = FAKEDATA , disable_tqdm = False ) :
def regen_segment ( lr , frs = None , daemons = " all " , outdir = FAKEDATA , disable_tqdm = False ) :
if not isinstance ( daemons , str ) and not hasattr ( daemons , " __iter__ " ) :
raise ValueError ( " whitelist_proc must be a string or iterable " )
lr = migrate_carparams ( list ( lr ) )
lr = migrate_sensorEvents ( list ( lr ) )
if frs is None :
@ -262,33 +266,68 @@ def regen_segment(lr, frs=None, outdir=FAKEDATA, disable_tqdm=False):
multiprocessing . Process ( target = replay_service , args = ( ' ubloxRaw ' , lr ) ) ,
multiprocessing . Process ( target = replay_panda_states , args = ( ' pandaStates ' , lr ) ) ,
] ,
' managerState ' : [
' manager ' : [
multiprocessing . Process ( target = replay_manager_state , args = ( ' managerState ' , lr ) ) ,
] ,
' thermald ' : [
multiprocessing . Process ( target = replay_device_state , args = ( ' deviceState ' , lr ) ) ,
] ,
' rawgpsd ' : [
multiprocessing . Process ( target = replay_service , args = ( ' qcomGnss ' , lr ) ) ,
multiprocessing . Process ( target = replay_service , args = ( ' gpsLocation ' , lr ) ) ,
] ,
' camerad ' : [
* cam_procs ,
] ,
}
# TODO add configs for modeld, dmonitoringmodeld
fakeable_daemons = { }
for config in CONFIGS :
replayable_messages = set ( [ msg for sub in config . pub_sub . values ( ) for msg in sub ] )
processes = [
multiprocessing . Process ( target = replay_service , args = ( msg , lr ) )
for msg in replayable_messages
]
fakeable_daemons [ config . proc_name ] = processes
additional_fake_daemons = { }
if daemons != " all " :
additional_fake_daemons = fakeable_daemons
if isinstance ( daemons , str ) :
raise ValueError ( f " Invalid value for daemons: { daemons } " )
for d in daemons :
if d in fake_daemons :
raise ValueError ( f " Running daemon { d } is not supported! " )
if d in fakeable_daemons :
del additional_fake_daemons [ d ]
all_fake_daemons = { * * fake_daemons , * * additional_fake_daemons }
try :
# TODO: make first run of onnxruntime CUDA provider fast
managed_processes [ " modeld " ] . start ( )
managed_processes [ " dmonitoringmodeld " ] . start ( )
if " modeld " not in all_fake_daemons :
managed_processes [ " modeld " ] . start ( )
if " dmonitoringmodeld " not in all_fake_daemons :
managed_processes [ " dmonitoringmodeld " ] . start ( )
time . sleep ( 5 )
# start procs up
ignore = list ( fake_daemons . keys ( ) ) + [ ' ui ' , ' manage_athenad ' , ' uploader ' , ' soundd ' ]
ignore = list ( all_fake_daemons . keys ( ) ) \
+ [ ' ui ' , ' manage_athenad ' , ' uploader ' , ' soundd ' , ' micd ' , ' navd ' ]
print ( " Faked daemons: " , " , " . join ( all_fake_daemons . keys ( ) ) )
print ( " Running daemons: " , " , " . join ( [ key for key in managed_processes . keys ( ) if key not in ignore ] ) )
ensure_running ( managed_processes . values ( ) , started = True , params = Params ( ) , CP = car . CarParams ( ) , not_run = ignore )
for procs in fake_daemons . values ( ) :
for procs in all_ fake_daemons. values ( ) :
for p in procs :
p . start ( )
for _ in tqdm ( range ( 60 ) , disable = disable_tqdm ) :
# ensure all procs are running
for d , procs in fake_daemons . items ( ) :
for d , procs in all_ fake_daemons. items ( ) :
for p in procs :
if not p . is_alive ( ) :
raise Exception ( f " { d } ' s { p . name } died " )
@ -297,7 +336,7 @@ def regen_segment(lr, frs=None, outdir=FAKEDATA, disable_tqdm=False):
# kill everything
for p in managed_processes . values ( ) :
p . stop ( )
for procs in fake_daemons . values ( ) :
for procs in all_ fake_daemons. values ( ) :
for p in procs :
p . terminate ( )
@ -312,7 +351,7 @@ def regen_segment(lr, frs=None, outdir=FAKEDATA, disable_tqdm=False):
return seg_path
def regen_and_save ( route , sidx , upload = False , use_route_meta = False , outdir = FAKEDATA , disable_tqdm = False ) :
def regen_and_save ( route , sidx , daemons = " all " , upload = False , use_route_meta = False , outdir = FAKEDATA , disable_tqdm = False ) :
if use_route_meta :
r = Route ( route )
lr = LogReader ( r . log_paths ( ) [ sidx ] )
@ -333,7 +372,7 @@ def regen_and_save(route, sidx, upload=False, use_route_meta=False, outdir=FAKED
frs = { ' roadCameraState ' : fr }
if wfr is not None :
frs [ ' wideRoadCameraState ' ] = wfr
rpath = regen_segment ( lr , frs , outdir = outdir , disable_tqdm = disable_tqdm )
rpath = regen_segment ( lr , frs , daemons , outdir = outdir , disable_tqdm = disable_tqdm )
# compress raw rlog before uploading
with open ( os . path . join ( rpath , " rlog " ) , " rb " ) as f :
@ -356,10 +395,18 @@ def regen_and_save(route, sidx, upload=False, use_route_meta=False, outdir=FAKED
if __name__ == " __main__ " :
def comma_separated_list ( string ) :
if string == " all " :
return string
return string . split ( " , " )
parser = argparse . ArgumentParser ( description = " Generate new segments from old ones " )
parser . add_argument ( " --upload " , action = " store_true " , help = " Upload the new segment to the CI bucket " )
parser . add_argument ( " --outdir " , help = " log output dir " , default = FAKEDATA )
parser . add_argument ( " --whitelist-procs " , type = comma_separated_list , default = " all " ,
help = " Comma-separated whitelist of processes to regen (e.g. controlsd). Pass ' all ' to whitelist all processes. " )
parser . add_argument ( " route " , type = str , help = " The source route " )
parser . add_argument ( " seg " , type = int , help = " Segment in source route " )
args = parser . parse_args ( )
regen_and_save ( args . route , args . seg , args . upload , outdir = args . outdir )
regen_and_save ( args . route , args . seg , args . whitelist_procs , args . upload , outdir = args . outdir )