@ -1,11 +1,14 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
import os
import os
import time
import time
import copy
import json
import heapq
import signal
import signal
import platform
import platform
from collections import OrderedDict
from collections import OrderedDict
from dataclasses import dataclass , field
from dataclasses import dataclass , field
from typing import Dict , List , Optional , Callable , Union , Any
from typing import Dict , List , Optional , Callable , Union , Any , Iterable , Tuple
from tqdm import tqdm
from tqdm import tqdm
import capnp
import capnp
@ -19,18 +22,17 @@ from common.realtime import DT_CTRL
from panda . python import ALTERNATIVE_EXPERIENCE
from panda . python import ALTERNATIVE_EXPERIENCE
from selfdrive . car . car_helpers import get_car , interfaces
from selfdrive . car . car_helpers import get_car , interfaces
from selfdrive . manager . process_config import managed_processes
from selfdrive . manager . process_config import managed_processes
from selfdrive . test . process_replay . helpers import OpenpilotPrefix
from selfdrive . test . process_replay . helpers import OpenpilotPrefix , DummySocket
from selfdrive . test . process_replay . vision_meta import meta_from_camera_state , available_streams
from selfdrive . test . process_replay . vision_meta import meta_from_camera_state , available_streams
from selfdrive . test . process_replay . migration import migrate_all
from selfdrive . test . process_replay . migration import migrate_all
from tools . lib . logreader import LogReader
from tools . lib . logreader import LogReader
# Numpy gives different results based on CPU features after version 19
# Numpy gives different results based on CPU features after version 19
NUMPY_TOLERANCE = 1e-7
NUMPY_TOLERANCE = 1e-7
CI = " CI " in os . environ
TIMEOUT = 15
PROC_REPLAY_DIR = os . path . dirname ( os . path . abspath ( __file__ ) )
PROC_REPLAY_DIR = os . path . dirname ( os . path . abspath ( __file__ ) )
FAKEDATA = os . path . join ( PROC_REPLAY_DIR , " fakedata/ " )
FAKEDATA = os . path . join ( PROC_REPLAY_DIR , " fakedata/ " )
class ReplayContext :
class ReplayContext :
def __init__ ( self , cfg ) :
def __init__ ( self , cfg ) :
self . proc_name = cfg . proc_name
self . proc_name = cfg . proc_name
@ -40,6 +42,14 @@ class ReplayContext:
assert ( len ( self . pubs ) != 0 or self . main_pub is not None )
assert ( len ( self . pubs ) != 0 or self . main_pub is not None )
def __enter__ ( self ) :
def __enter__ ( self ) :
self . open ( )
return self
def __exit__ ( self , exc_type , exc_obj , exc_tb ) :
self . close ( )
def open ( self ) :
messaging . toggle_fake_events ( True )
messaging . toggle_fake_events ( True )
messaging . set_fake_prefix ( self . proc_name )
messaging . set_fake_prefix ( self . proc_name )
@ -50,9 +60,7 @@ class ReplayContext:
else :
else :
self . events = { self . main_pub : messaging . fake_event_handle ( self . main_pub , enable = True ) }
self . events = { self . main_pub : messaging . fake_event_handle ( self . main_pub , enable = True ) }
return self
def close ( self ) :
def __exit__ ( self , exc_type , exc_obj , exc_tb ) :
del self . events
del self . events
messaging . toggle_fake_events ( False )
messaging . toggle_fake_events ( False )
@ -101,8 +109,7 @@ class ProcessConfig:
init_callback : Optional [ Callable ] = None
init_callback : Optional [ Callable ] = None
should_recv_callback : Optional [ Callable ] = None
should_recv_callback : Optional [ Callable ] = None
tolerance : Optional [ float ] = None
tolerance : Optional [ float ] = None
environ : Dict [ str , str ] = field ( default_factory = dict )
processing_time : float = 0.001
subtest_name : str = " "
field_tolerances : Dict [ str , float ] = field ( default_factory = dict )
field_tolerances : Dict [ str , float ] = field ( default_factory = dict )
timeout : int = 30
timeout : int = 30
simulation : bool = True
simulation : bool = True
@ -112,18 +119,151 @@ class ProcessConfig:
ignore_alive_pubs : List [ str ] = field ( default_factory = list )
ignore_alive_pubs : List [ str ] = field ( default_factory = list )
class DummySocket :
class ProcessContainer :
def __init__ ( self ) :
def __init__ ( self , cfg : ProcessConfig ) :
self . data = [ ]
self . prefix = OpenpilotPrefix ( clean_dirs_on_exit = False )
self . cfg = copy . deepcopy ( cfg )
self . process = managed_processes [ cfg . proc_name ]
self . msg_queue : List [ capnp . _DynamicStructReader ] = [ ]
self . cnt = 0
self . pm : Optional [ messaging . PubMaster ] = None
self . sockets : Optional [ List [ messaging . SubSocket ] ] = None
self . rc : Optional [ ReplayContext ] = None
self . vipc_server : Optional [ VisionIpcServer ] = None
@property
def has_empty_queue ( self ) - > bool :
return len ( self . msg_queue ) == 0
@property
def pubs ( self ) - > List [ str ] :
return self . cfg . pubs
@property
def subs ( self ) - > List [ str ] :
return self . cfg . subs
def _setup_env ( self , params_config : Dict [ str , Any ] , environ_config : Dict [ str , Any ] ) :
for k , v in environ_config . items ( ) :
if len ( v ) != 0 :
os . environ [ k ] = v
elif k in os . environ :
del os . environ [ k ]
os . environ [ " PROC_NAME " ] = self . cfg . proc_name
if self . cfg . simulation :
os . environ [ " SIMULATION " ] = " 1 "
elif " SIMULATION " in os . environ :
del os . environ [ " SIMULATION " ]
params = Params ( )
for k , v in params_config . items ( ) :
if isinstance ( v , bool ) :
params . put_bool ( k , v )
else :
params . put ( k , v )
def _setup_vision_ipc ( self , all_msgs ) :
assert len ( self . cfg . vision_pubs ) != 0
device_type = next ( msg . initData . deviceType for msg in all_msgs if msg . which ( ) == " initData " )
vipc_server = VisionIpcServer ( " camerad " )
streams_metas = available_streams ( all_msgs )
for meta in streams_metas :
if meta . camera_state in self . cfg . vision_pubs :
vipc_server . create_buffers ( meta . stream , 2 , False , * meta . frame_sizes [ device_type ] )
vipc_server . start_listener ( )
self . vipc_server = vipc_server
def start (
self , params_config : Dict [ str , Any ] , environ_config : Dict [ str , Any ] ,
all_msgs : Union [ LogReader , List [ capnp . _DynamicStructReader ] ] , fingerprint : Optional [ str ]
) :
with self . prefix :
self . _setup_env ( params_config , environ_config )
if self . cfg . config_callback is not None :
params = Params ( )
self . cfg . config_callback ( params , self . cfg , all_msgs )
self . rc = ReplayContext ( self . cfg )
self . rc . open ( )
self . pm = messaging . PubMaster ( self . cfg . pubs )
self . sockets = [ messaging . sub_sock ( s , timeout = 100 ) for s in self . cfg . subs ]
if len ( self . cfg . vision_pubs ) != 0 :
self . _setup_vision_ipc ( all_msgs )
assert self . vipc_server is not None
self . process . prepare ( )
self . process . start ( )
if self . cfg . init_callback is not None :
self . cfg . init_callback ( self . rc , self . pm , all_msgs , fingerprint )
# wait for process to startup
with Timeout ( 10 , error_msg = f " timed out waiting for process to start: { repr ( self . cfg . proc_name ) } " ) :
while not all ( self . pm . all_readers_updated ( s ) for s in self . cfg . pubs if s not in self . cfg . ignore_alive_pubs ) :
time . sleep ( 0 )
def stop ( self ) :
with self . prefix :
self . process . signal ( signal . SIGKILL )
self . process . stop ( )
self . rc . close ( )
self . prefix . clean_dirs ( )
def run_step ( self , msg : capnp . _DynamicStructReader , frs : Optional [ Dict [ str , Any ] ] ) - > List [ capnp . _DynamicStructReader ] :
assert self . rc and self . pm and self . sockets and self . process . proc
output_msgs = [ ]
with self . prefix , Timeout ( self . cfg . timeout , error_msg = f " timed out testing process { repr ( self . cfg . proc_name ) } " ) :
end_of_cycle = True
if self . cfg . should_recv_callback is not None :
end_of_cycle = self . cfg . should_recv_callback ( msg , self . cfg , self . cnt )
self . msg_queue . append ( msg )
if end_of_cycle :
self . rc . wait_for_recv_called ( )
# call recv to let sub-sockets reconnect, after we know the process is ready
if self . cnt == 0 :
for s in self . sockets :
messaging . recv_one_or_none ( s )
# empty recv on drained pub indicates the end of messages, only do that if there're any
trigger_empty_recv = False
if self . cfg . main_pub and self . cfg . main_pub_drained :
trigger_empty_recv = next ( ( True for m in self . msg_queue if m . which ( ) == self . cfg . main_pub ) , False )
for m in self . msg_queue :
self . pm . send ( m . which ( ) , m . as_builder ( ) )
# send frames if needed
if self . vipc_server is not None and m . which ( ) in self . cfg . vision_pubs :
camera_state = getattr ( m , m . which ( ) )
camera_meta = meta_from_camera_state ( m . which ( ) )
assert frs is not None
img = frs [ m . which ( ) ] . get ( camera_state . frameId , pix_fmt = " nv12 " ) [ 0 ]
self . vipc_server . send ( camera_meta . stream , img . flatten ( ) . tobytes ( ) ,
camera_state . frameId , camera_state . timestampSof , camera_state . timestampEof )
self . msg_queue = [ ]
def receive ( self , non_blocking = False ) :
self . rc . unlock_sockets ( )
if non_blocking :
self . rc . wait_for_next_recv ( trigger_empty_recv )
return None
return self . data . pop ( )
for socket in self . sockets :
ms = messaging . drain_sock ( socket )
for m in ms :
m = m . as_builder ( )
m . logMonoTime = msg . logMonoTime + int ( self . cfg . processing_time * 1e9 )
output_msgs . append ( m . as_reader ( ) )
self . cnt + = 1
assert self . process . proc . is_alive ( )
def send ( self , data ) :
return output_msgs
self . data . append ( data )
def controlsd_fingerprint_callback ( rc , pm , msgs , fingerprint ) :
def controlsd_fingerprint_callback ( rc , pm , msgs , fingerprint ) :
@ -242,21 +382,38 @@ class FrequencyBasedRcvCallback:
if frame % max ( 1 , int ( service_list [ msg . which ( ) ] . frequency / service_list [ s ] . frequency ) ) == 0
if frame % max ( 1 , int ( service_list [ msg . which ( ) ] . frequency / service_list [ s ] . frequency ) ) == 0
]
]
return bool ( len ( resp_sockets ) )
return bool ( len ( resp_sockets ) )
def controlsd_config_callback ( params , cfg , lr ) :
controlsState = None
initialized = False
for msg in lr :
if msg . which ( ) == " controlsState " :
controlsState = msg . controlsState
if initialized :
break
elif msg . which ( ) == " carEvents " :
initialized = car . CarEvent . EventName . controlsInitializing not in [ e . name for e in msg . carEvents ]
assert controlsState is not None and initialized , " controlsState never initialized "
params . put ( " ReplayControlsState " , controlsState . as_builder ( ) . to_bytes ( ) )
def laikad_config_pubsub_callback ( params , cfg ) :
def laikad_config_pubsub_callback ( params , cfg , lr ) :
ublox = params . get_bool ( " UbloxAvailable " )
ublox = params . get_bool ( " UbloxAvailable " )
main_key = " ubloxGnss " if ublox else " qcomGnss "
main_key = " ubloxGnss " if ublox else " qcomGnss "
sub_keys = ( { " qcomGnss " , } if ublox else { " ubloxGnss " , } )
sub_keys = ( { " qcomGnss " , } if ublox else { " ubloxGnss " , } )
return set ( cfg . pubs ) - sub_keys , main_key , True
cfg . pubs = set ( cfg . pubs ) - sub_keys
cfg . main_pub = main_key
cfg . main_pub_drained = True
def locationd_config_pubsub_callback ( params , cfg ) :
def locationd_config_pubsub_callback ( params , cfg , lr ) :
ublox = params . get_bool ( " UbloxAvailable " )
ublox = params . get_bool ( " UbloxAvailable " )
sub_keys = ( { " gpsLocation " , } if ublox else { " gpsLocationExternal " , } )
sub_keys = ( { " gpsLocation " , } if ublox else { " gpsLocationExternal " , } )
return set ( cfg . pubs ) - sub_keys , None , False
cfg . pubs = set ( cfg . pubs ) - sub_keys
CONFIGS = [
CONFIGS = [
@ -270,9 +427,11 @@ CONFIGS = [
] ,
] ,
subs = [ " controlsState " , " carState " , " carControl " , " sendcan " , " carEvents " , " carParams " ] ,
subs = [ " controlsState " , " carState " , " carControl " , " sendcan " , " carEvents " , " carParams " ] ,
ignore = [ " logMonoTime " , " valid " , " controlsState.startMonoTime " , " controlsState.cumLagMs " ] ,
ignore = [ " logMonoTime " , " valid " , " controlsState.startMonoTime " , " controlsState.cumLagMs " ] ,
config_callback = controlsd_config_callback ,
init_callback = controlsd_fingerprint_callback ,
init_callback = controlsd_fingerprint_callback ,
should_recv_callback = controlsd_rcv_callback ,
should_recv_callback = controlsd_rcv_callback ,
tolerance = NUMPY_TOLERANCE ,
tolerance = NUMPY_TOLERANCE ,
processing_time = 0.004 ,
main_pub = " can " ,
main_pub = " can " ,
) ,
) ,
ProcessConfig (
ProcessConfig (
@ -327,6 +486,7 @@ CONFIGS = [
init_callback = get_car_params_callback ,
init_callback = get_car_params_callback ,
should_recv_callback = FrequencyBasedRcvCallback ( " liveLocationKalman " ) ,
should_recv_callback = FrequencyBasedRcvCallback ( " liveLocationKalman " ) ,
tolerance = NUMPY_TOLERANCE ,
tolerance = NUMPY_TOLERANCE ,
processing_time = 0.004 ,
) ,
) ,
ProcessConfig (
ProcessConfig (
proc_name = " ubloxd " ,
proc_name = " ubloxd " ,
@ -341,8 +501,9 @@ CONFIGS = [
ignore = [ " logMonoTime " ] ,
ignore = [ " logMonoTime " ] ,
config_callback = laikad_config_pubsub_callback ,
config_callback = laikad_config_pubsub_callback ,
tolerance = NUMPY_TOLERANCE ,
tolerance = NUMPY_TOLERANCE ,
processing_time = 0.002 ,
timeout = 60 * 10 , # first messages are blocked on internet assistance
timeout = 60 * 10 , # first messages are blocked on internet assistance
main_pub = " ubloxGnss " , # config_callback will switch this to qcom if needed
main_pub = " ubloxGnss " , # config_callback will switch this to qcom if needed
) ,
) ,
ProcessConfig (
ProcessConfig (
proc_name = " torqued " ,
proc_name = " torqued " ,
@ -360,6 +521,7 @@ CONFIGS = [
ignore = [ " logMonoTime " , " modelV2.frameDropPerc " , " modelV2.modelExecutionTime " ] ,
ignore = [ " logMonoTime " , " modelV2.frameDropPerc " , " modelV2.modelExecutionTime " ] ,
should_recv_callback = ModeldCameraSyncRcvCallback ( ) ,
should_recv_callback = ModeldCameraSyncRcvCallback ( ) ,
tolerance = NUMPY_TOLERANCE ,
tolerance = NUMPY_TOLERANCE ,
processing_time = 0.020 ,
main_pub = vipc_get_endpoint_name ( " camerad " , meta_from_camera_state ( " roadCameraState " ) . stream ) ,
main_pub = vipc_get_endpoint_name ( " camerad " , meta_from_camera_state ( " roadCameraState " ) . stream ) ,
main_pub_drained = False ,
main_pub_drained = False ,
vision_pubs = [ " roadCameraState " , " wideRoadCameraState " ] ,
vision_pubs = [ " roadCameraState " , " wideRoadCameraState " ] ,
@ -372,6 +534,7 @@ CONFIGS = [
ignore = [ " logMonoTime " , " driverStateV2.modelExecutionTime " , " driverStateV2.dspExecutionTime " ] ,
ignore = [ " logMonoTime " , " driverStateV2.modelExecutionTime " , " driverStateV2.dspExecutionTime " ] ,
should_recv_callback = dmonitoringmodeld_rcv_callback ,
should_recv_callback = dmonitoringmodeld_rcv_callback ,
tolerance = NUMPY_TOLERANCE ,
tolerance = NUMPY_TOLERANCE ,
processing_time = 0.020 ,
main_pub = vipc_get_endpoint_name ( " camerad " , meta_from_camera_state ( " driverCameraState " ) . stream ) ,
main_pub = vipc_get_endpoint_name ( " camerad " , meta_from_camera_state ( " driverCameraState " ) . stream ) ,
main_pub_drained = False ,
main_pub_drained = False ,
vision_pubs = [ " driverCameraState " ] ,
vision_pubs = [ " driverCameraState " ] ,
@ -380,27 +543,73 @@ CONFIGS = [
]
]
def get_process_config ( name ) :
def get_process_config ( name : str ) - > ProcessConfig :
try :
try :
return next ( c for c in CONFIGS if c . proc_name == name )
return copy . deepcopy ( next ( c for c in CONFIGS if c . proc_name == name ) )
except StopIteration as ex :
except StopIteration as ex :
raise Exception ( f " Cannot find process config with name: { name } " ) from ex
raise Exception ( f " Cannot find process config with name: { name } " ) from ex
def replay_process_with_name ( name , lr , * args , * * kwargs ) :
def get_custom_params_from_lr ( lr : Union [ LogReader , List [ capnp . _DynamicStructReader ] ] , initial_state : str = " first " ) - > Dict [ str , Any ] :
cfg = get_process_config ( name )
"""
return replay_process ( cfg , lr , * args , * * kwargs )
Use this to get custom params dict based on provided logs .
Useful when replaying following processes : calibrationd , paramsd , torqued
The params may be based on first or last message of given type ( carParams , liveCalibration , liveParameters , liveTorqueParameters ) in the logs .
"""
car_params = [ m for m in lr if m . which ( ) == " carParams " ]
live_calibration = [ m for m in lr if m . which ( ) == " liveCalibration " ]
live_parameters = [ m for m in lr if m . which ( ) == " liveParameters " ]
live_torque_parameters = [ m for m in lr if m . which ( ) == " liveTorqueParameters " ]
assert initial_state in [ " first " , " last " ]
msg_index = 0 if initial_state == " first " else - 1
assert len ( car_params ) > 0 , " carParams required for initial state of liveParameters and liveTorqueCarParams "
CP = car_params [ msg_index ] . carParams
custom_params = { }
if len ( live_calibration ) > 0 :
custom_params [ " CalibrationParams " ] = live_calibration [ msg_index ] . as_builder ( ) . to_bytes ( )
if len ( live_parameters ) > 0 :
lp_dict = live_parameters [ msg_index ] . to_dict ( )
lp_dict [ " carFingerprint " ] = CP . carFingerprint
custom_params [ " LiveParameters " ] = json . dumps ( lp_dict )
if len ( live_torque_parameters ) > 0 :
custom_params [ " LiveTorqueCarParams " ] = CP . as_builder ( ) . to_bytes ( )
custom_params [ " LiveTorqueParameters " ] = live_torque_parameters [ msg_index ] . as_builder ( ) . to_bytes ( )
def replay_process ( cfg , lr , frs = None , fingerprint = None , return_all_logs = False , custom_params = None , disable_progress = False ) :
return custom_params
all_msgs = migrate_all ( lr , old_logtime = True , camera_states = len ( cfg . vision_pubs ) != 0 )
process_logs = _replay_single_process ( cfg , all_msgs , frs , fingerprint , custom_params , disable_progress )
def replay_process_with_name ( name : Union [ str , Iterable [ str ] ] , lr : Union [ LogReader , List [ capnp . _DynamicStructReader ] ] , * args , * * kwargs ) - > List [ capnp . _DynamicStructReader ] :
if isinstance ( name , str ) :
cfgs = [ get_process_config ( name ) ]
elif isinstance ( name , Iterable ) :
cfgs = [ get_process_config ( n ) for n in name ]
else :
raise ValueError ( " name must be str or collections of strings " )
return replay_process ( cfgs , lr , * args , * * kwargs )
def replay_process (
cfg : Union [ ProcessConfig , Iterable [ ProcessConfig ] ] , lr : Union [ LogReader , List [ capnp . _DynamicStructReader ] ] , frs : Optional [ Dict [ str , Any ] ] = None ,
fingerprint : Optional [ str ] = None , return_all_logs : bool = False , custom_params : Optional [ Dict [ str , Any ] ] = None , disable_progress : bool = False
) - > List [ capnp . _DynamicStructReader ] :
if isinstance ( cfg , Iterable ) :
cfgs = list ( cfg )
else :
cfgs = [ cfg ]
all_msgs = migrate_all ( lr , old_logtime = True , camera_states = any ( len ( cfg . vision_pubs ) != 0 for cfg in cfgs ) )
process_logs = _replay_multi_process ( cfgs , all_msgs , frs , fingerprint , custom_params , disable_progress )
if return_all_logs :
if return_all_logs :
keys = set ( cfg . subs )
keys = { m . which ( ) for m in process_logs }
modified_logs = [ m for m in all_msgs if m . which ( ) not in keys ]
modified_logs = [ m for m in all_msgs if m . which ( ) not in keys ]
modified_logs . extend ( process_logs )
modified_logs . extend ( process_logs )
modified_logs . sort ( key = lambda m : m . logMonoTime )
modified_logs . sort ( key = lambda m : int ( m . logMonoTime ) )
log_msgs = modified_logs
log_msgs = modified_logs
else :
else :
log_msgs = process_logs
log_msgs = process_logs
@ -408,202 +617,131 @@ def replay_process(cfg, lr, frs=None, fingerprint=None, return_all_logs=False, c
return log_msgs
return log_msgs
def _replay_single _process (
def _replay_multi _process (
cfg : ProcessConfig , lr : Union [ LogReader , List [ capnp . _DynamicStructReader ] ] , frs : Optional [ Dict [ str , Any ] ] ,
cfgs : List [ ProcessConfig ] , lr : Union [ LogReader , List [ capnp . _DynamicStructReader ] ] , frs : Optional [ Dict [ str , Any ] ] ,
fingerprint : Optional [ str ] , custom_params : Optional [ Dict [ str , Any ] ] , disable_progress : bool
fingerprint : Optional [ str ] , custom_params : Optional [ Dict [ str , Any ] ] , disable_progress : bool
) :
) - > List [ capnp . _DynamicStructReader ] :
with OpenpilotPrefix ( ) :
if fingerprint is not None :
controlsState = None
params_config = generate_params_config ( lr = lr , fingerprint = fingerprint , custom_params = custom_params )
initialized = False
env_config = generate_environ_config ( fingerprint = fingerprint )
if cfg . proc_name == " controlsd " :
else :
for msg in lr :
CP = next ( ( m . carParams for m in lr if m . which ( ) == " carParams " ) , None )
if msg . which ( ) == " controlsState " :
params_config = generate_params_config ( lr = lr , CP = CP , custom_params = custom_params )
controlsState = msg . controlsState
env_config = generate_environ_config ( CP = CP )
if initialized :
break
# validate frs and vision pubs
elif msg . which ( ) == " carEvents " :
for cfg in cfgs :
initialized = car . CarEvent . EventName . controlsInitializing not in [ e . name for e in msg . carEvents ]
if len ( cfg . vision_pubs ) == 0 :
continue
assert controlsState is not None and initialized , " controlsState never initialized "
assert frs is not None , " frs must be provided when replaying process using vision streams "
if fingerprint is not None :
assert all ( meta_from_camera_state ( st ) is not None for st in cfg . vision_pubs ) , f " undefined vision stream spotted, probably misconfigured process: { cfg . vision_pubs } "
setup_env ( cfg = cfg , controlsState = controlsState , lr = lr , fingerprint = fingerprint , custom_params = custom_params )
assert all ( st in frs for st in cfg . vision_pubs ) , f " frs for this process must contain following vision streams: { cfg . vision_pubs } "
else :
CP = next ( ( m . carParams for m in lr if m . which ( ) == " carParams " ) , None )
all_msgs = sorted ( lr , key = lambda msg : msg . logMonoTime )
assert CP is not None or " carParams " not in cfg . pubs , " carParams are missing and process needs it "
log_msgs = [ ]
setup_env ( cfg = cfg , CP = CP , controlsState = controlsState , lr = lr , custom_params = custom_params )
try :
containers = [ ]
if cfg . config_callback is not None :
for cfg in cfgs :
params = Params ( )
container = ProcessContainer ( cfg )
cfg . pubs , cfg . main_pub , cfg . main_pub_drained = cfg . config_callback ( params , cfg )
container . start ( params_config , env_config , all_msgs , fingerprint )
containers . append ( container )
all_msgs = sorted ( lr , key = lambda msg : msg . logMonoTime )
pub_msgs = [ msg for msg in all_msgs if msg . which ( ) in set ( cfg . pubs ) ]
all_pubs = set ( [ pub for container in containers for pub in container . pubs ] )
all_subs = set ( [ sub for container in containers for sub in container . subs ] )
with ReplayContext ( cfg ) as rc :
lr_pubs = all_pubs - all_subs
pm = messaging . PubMaster ( cfg . pubs )
pubs_to_containers = { pub : [ container for container in containers if pub in container . pubs ] for pub in all_pubs }
sockets = { s : messaging . sub_sock ( s , timeout = 100 ) for s in cfg . subs }
pub_msgs = [ msg for msg in all_msgs if msg . which ( ) in lr_pubs ]
vipc_server = None
# external queue for messages taken from logs; internal queue for messages generated by processes, which will be republished
if len ( cfg . vision_pubs ) != 0 :
external_pub_queue : List [ capnp . _DynamicStructReader ] = pub_msgs . copy ( )
assert frs is not None , " frs must be provided when replaying process using vision streams "
internal_pub_queue : List [ capnp . _DynamicStructReader ] = [ ]
assert all ( meta_from_camera_state ( st ) is not None for st in cfg . vision_pubs ) , f " undefined vision stream spotted, probably misconfigured process: { cfg . vision_pubs } "
# heap for maintaining the order of messages generated by processes, where each element: (logMonoTime, index in internal_pub_queue)
assert all ( st in frs for st in cfg . vision_pubs ) , f " frs for this process must contain following vision streams: { cfg . vision_pubs } "
internal_pub_index_heap : List [ Tuple [ int , int ] ] = [ ]
vipc_server = setup_vision_ipc ( cfg , lr )
pbar = tqdm ( total = len ( external_pub_queue ) , disable = disable_progress )
managed_processes [ cfg . proc_name ] . prepare ( )
while len ( external_pub_queue ) != 0 or ( len ( internal_pub_index_heap ) != 0 and not all ( c . has_empty_queue for c in containers ) ) :
managed_processes [ cfg . proc_name ] . start ( )
if len ( internal_pub_index_heap ) == 0 or ( len ( external_pub_queue ) != 0 and external_pub_queue [ 0 ] . logMonoTime < internal_pub_index_heap [ 0 ] [ 0 ] ) :
msg = external_pub_queue . pop ( 0 )
if cfg . init_callback is not None :
pbar . update ( 1 )
cfg . init_callback ( rc , pm , all_msgs , fingerprint )
else :
_ , index = heapq . heappop ( internal_pub_index_heap )
log_msgs , msg_queue = [ ] , [ ]
msg = internal_pub_queue [ index ]
try :
# Wait for process to startup
target_containers = pubs_to_containers [ msg . which ( ) ]
with Timeout ( 10 , error_msg = f " timed out waiting for process to start: { repr ( cfg . proc_name ) } " ) :
for container in target_containers :
while not all ( pm . all_readers_updated ( s ) for s in cfg . pubs if s not in cfg . ignore_alive_pubs ) :
output_msgs = container . run_step ( msg , frs )
time . sleep ( 0 )
for m in output_msgs :
if m . which ( ) in all_pubs :
# Do the replay
internal_pub_queue . append ( m )
cnt = 0
heapq . heappush ( internal_pub_index_heap , ( m . logMonoTime , len ( internal_pub_queue ) - 1 ) )
for msg in tqdm ( pub_msgs , disable = disable_progress ) :
log_msgs . extend ( output_msgs )
with Timeout ( cfg . timeout , error_msg = f " timed out testing process { repr ( cfg . proc_name ) } , { cnt } / { len ( pub_msgs ) } msgs done " ) :
finally :
resp_sockets , end_of_cycle = cfg . subs , True
for container in containers :
if cfg . should_recv_callback is not None :
container . stop ( )
end_of_cycle = cfg . should_recv_callback ( msg , cfg , cnt )
msg_queue . append ( msg )
if end_of_cycle :
rc . wait_for_recv_called ( )
# call recv to let sub-sockets reconnect, after we know the process is ready
if cnt == 0 :
for s in sockets . values ( ) :
messaging . recv_one_or_none ( s )
# empty recv on drained pub indicates the end of messages, only do that if there're any
trigger_empty_recv = False
if cfg . main_pub and cfg . main_pub_drained :
trigger_empty_recv = next ( ( True for m in msg_queue if m . which ( ) == cfg . main_pub ) , False )
for m in msg_queue :
pm . send ( m . which ( ) , m . as_builder ( ) )
# send frames if needed
if vipc_server is not None and m . which ( ) in cfg . vision_pubs :
camera_state = getattr ( m , m . which ( ) )
camera_meta = meta_from_camera_state ( m . which ( ) )
assert frs is not None
img = frs [ m . which ( ) ] . get ( camera_state . frameId , pix_fmt = " nv12 " ) [ 0 ]
vipc_server . send ( camera_meta . stream , img . flatten ( ) . tobytes ( ) ,
camera_state . frameId , camera_state . timestampSof , camera_state . timestampEof )
msg_queue = [ ]
rc . unlock_sockets ( )
rc . wait_for_next_recv ( trigger_empty_recv )
for s in resp_sockets :
return log_msgs
ms = messaging . drain_sock ( sockets [ s ] )
for m in ms :
m = m . as_builder ( )
m . logMonoTime = msg . logMonoTime
log_msgs . append ( m . as_reader ( ) )
cnt + = 1
proc = managed_processes [ cfg . proc_name ] . proc
assert ( proc and proc . is_alive ( ) )
finally :
managed_processes [ cfg . proc_name ] . signal ( signal . SIGKILL )
managed_processes [ cfg . proc_name ] . stop ( )
return log_msgs
def generate_params_config ( lr = None , CP = None , fingerprint = None , custom_params = None ) - > Dict [ str , Any ] :
params_dict = {
" OpenpilotEnabledToggle " : True ,
" Passive " : False ,
" DisengageOnAccelerator " : True ,
" DisableLogging " : False ,
}
if custom_params is not None :
params_dict . update ( custom_params )
if lr is not None :
has_ublox = any ( msg . which ( ) == " ubloxGnss " for msg in lr )
params_dict [ " UbloxAvailable " ] = has_ublox
is_rhd = next ( ( msg . driverMonitoringState . isRHD for msg in lr if msg . which ( ) == " driverMonitoringState " ) , False )
params_dict [ " IsRhdDetected " ] = is_rhd
def setup_vision_ipc ( cfg , lr ) :
if CP is not None :
assert len ( cfg . vision_pubs ) != 0
if CP . alternativeExperience == ALTERNATIVE_EXPERIENCE . DISABLE_DISENGAGE_ON_GAS :
params_dict [ " DisengageOnAccelerator " ] = False
device_type = next ( msg . initData . deviceType for msg in lr if msg . which ( ) == " initData " )
if fingerprint is None :
if CP . fingerprintSource == " fw " :
params_dict [ " CarParamsCache " ] = CP . as_builder ( ) . to_bytes ( )
vipc_server = VisionIpcServer ( " camerad " )
if CP . openpilotLongitudinalControl :
streams_metas = available_streams ( lr )
params_dict [ " ExperimentalLongitudinalEnabled " ] = True
for meta in streams_metas :
if meta . camera_state in cfg . vision_pubs :
vipc_server . create_buffers ( meta . stream , 2 , False , * meta . frame_sizes [ device_type ] )
vipc_server . start_listener ( )
return vipc_server
return params_dict
def setup_env ( cfg = None , CP = None , controlsState = None , lr = None , fingerprint = None , custom_params = None , log_dir = None ) :
def generate_environ_config ( CP = None , fingerprint = None , log_dir = None ) - > Dict [ str , Any ] :
environ_dict = { }
if platform . system ( ) != " Darwin " :
if platform . system ( ) != " Darwin " :
os . environ [ " PARAMS_ROOT " ] = " /dev/shm/params "
environ_dict [ " PARAMS_ROOT " ] = " /dev/shm/params "
if log_dir is not None :
if log_dir is not None :
os . environ [ " LOG_ROOT " ] = log_dir
environ_dict [ " LOG_ROOT " ] = log_dir
params = Params ( )
environ_dict [ " NO_RADAR_SLEEP " ] = " 1 "
params . clear_all ( )
environ_dict [ " REPLAY " ] = " 1 "
params . put_bool ( " OpenpilotEnabledToggle " , True )
params . put_bool ( " Passive " , False )
params . put_bool ( " DisengageOnAccelerator " , True )
params . put_bool ( " DisableLogging " , False )
if custom_params is not None :
for k , v in custom_params . items ( ) :
if type ( v ) == bool :
params . put_bool ( k , v )
else :
params . put ( k , v )
os . environ [ " NO_RADAR_SLEEP " ] = " 1 "
# Regen or python process
os . environ [ " REPLAY " ] = " 1 "
if CP is not None and fingerprint is None :
if fingerprint is not None :
if CP . fingerprintSource == " fw " :
os . environ [ ' SKIP_FW_QUERY ' ] = " 1 "
environ_dict [ ' SKIP_FW_QUERY ' ] = " "
os . environ [ ' FINGERPRINT ' ] = fingerprint
environ_dict [ ' FINGERPRINT ' ] = " "
else :
environ_dict [ ' SKIP_FW_QUERY ' ] = " 1 "
environ_dict [ ' FINGERPRINT ' ] = CP . carFingerprint
elif fingerprint is not None :
environ_dict [ ' SKIP_FW_QUERY ' ] = " 1 "
environ_dict [ ' FINGERPRINT ' ] = fingerprint
else :
else :
os . environ [ " SKIP_FW_QUERY " ] = " "
environ_dict [ " SKIP_FW_QUERY " ] = " "
os . environ [ " FINGERPRINT " ] = " "
environ_dict [ " FINGERPRINT " ] = " "
if lr is not None :
services = { m . which ( ) for m in lr }
params . put_bool ( " UbloxAvailable " , " ubloxGnss " in services )
if cfg is not None :
return environ_dict
# Clear all custom processConfig environment variables
for config in CONFIGS :
for k , _ in config . environ . items ( ) :
if k in os . environ :
del os . environ [ k ]
os . environ . update ( cfg . environ )
os . environ [ ' PROC_NAME ' ] = cfg . proc_name
if cfg is not None and cfg . simulation :
os . environ [ " SIMULATION " ] = " 1 "
elif " SIMULATION " in os . environ :
del os . environ [ " SIMULATION " ]
# Initialize controlsd with a controlsState packet
if controlsState is not None :
params . put ( " ReplayControlsState " , controlsState . as_builder ( ) . to_bytes ( ) )
else :
params . remove ( " ReplayControlsState " )
# Regen or python process
if CP is not None :
if CP . alternativeExperience == ALTERNATIVE_EXPERIENCE . DISABLE_DISENGAGE_ON_GAS :
params . put_bool ( " DisengageOnAccelerator " , False )
if fingerprint is None :
if CP . fingerprintSource == " fw " :
params . put ( " CarParamsCache " , CP . as_builder ( ) . to_bytes ( ) )
os . environ [ ' SKIP_FW_QUERY ' ] = " "
os . environ [ ' FINGERPRINT ' ] = " "
else :
os . environ [ ' SKIP_FW_QUERY ' ] = " 1 "
os . environ [ ' FINGERPRINT ' ] = CP . carFingerprint
if CP . openpilotLongitudinalControl :
params . put_bool ( " ExperimentalLongitudinalEnabled " , True )
def check_openpilot_enabled ( msgs ) :
def check_openpilot_enabled ( msgs : Union [ LogReader , List [ capnp . _DynamicStructReader ] ] ) - > bool :
cur_enabled_count = 0
cur_enabled_count = 0
max_enabled_count = 0
max_enabled_count = 0
for msg in msgs :
for msg in msgs :