|  |  |  | @ -8,7 +8,8 @@ import signal | 
			
		
	
		
			
				
					|  |  |  |  | import platform | 
			
		
	
		
			
				
					|  |  |  |  | from collections import OrderedDict | 
			
		
	
		
			
				
					|  |  |  |  | from dataclasses import dataclass, field | 
			
		
	
		
			
				
					|  |  |  |  | from typing import Dict, List, Optional, Callable, Union, Any, Iterable, Tuple | 
			
		
	
		
			
				
					|  |  |  |  | from typing import Any | 
			
		
	
		
			
				
					|  |  |  |  | from collections.abc import Callable, Iterable | 
			
		
	
		
			
				
					|  |  |  |  | from tqdm import tqdm | 
			
		
	
		
			
				
					|  |  |  |  | import capnp | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  |  | @ -36,9 +37,9 @@ FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/") | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | class DummySocket: | 
			
		
	
		
			
				
					|  |  |  |  |   def __init__(self): | 
			
		
	
		
			
				
					|  |  |  |  |     self.data: List[bytes] = [] | 
			
		
	
		
			
				
					|  |  |  |  |     self.data: list[bytes] = [] | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def receive(self, non_blocking: bool = False) -> Optional[bytes]: | 
			
		
	
		
			
				
					|  |  |  |  |   def receive(self, non_blocking: bool = False) -> bytes | None: | 
			
		
	
		
			
				
					|  |  |  |  |     if non_blocking: | 
			
		
	
		
			
				
					|  |  |  |  |       return None | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  |  | @ -128,21 +129,21 @@ class ReplayContext: | 
			
		
	
		
			
				
					|  |  |  |  | @dataclass | 
			
		
	
		
			
				
					|  |  |  |  | class ProcessConfig: | 
			
		
	
		
			
				
					|  |  |  |  |   proc_name: str | 
			
		
	
		
			
				
					|  |  |  |  |   pubs: List[str] | 
			
		
	
		
			
				
					|  |  |  |  |   subs: List[str] | 
			
		
	
		
			
				
					|  |  |  |  |   ignore: List[str] | 
			
		
	
		
			
				
					|  |  |  |  |   config_callback: Optional[Callable] = None | 
			
		
	
		
			
				
					|  |  |  |  |   init_callback: Optional[Callable] = None | 
			
		
	
		
			
				
					|  |  |  |  |   should_recv_callback: Optional[Callable] = None | 
			
		
	
		
			
				
					|  |  |  |  |   tolerance: Optional[float] = None | 
			
		
	
		
			
				
					|  |  |  |  |   pubs: list[str] | 
			
		
	
		
			
				
					|  |  |  |  |   subs: list[str] | 
			
		
	
		
			
				
					|  |  |  |  |   ignore: list[str] | 
			
		
	
		
			
				
					|  |  |  |  |   config_callback: Callable | None = None | 
			
		
	
		
			
				
					|  |  |  |  |   init_callback: Callable | None = None | 
			
		
	
		
			
				
					|  |  |  |  |   should_recv_callback: Callable | None = None | 
			
		
	
		
			
				
					|  |  |  |  |   tolerance: float | None = None | 
			
		
	
		
			
				
					|  |  |  |  |   processing_time: float = 0.001 | 
			
		
	
		
			
				
					|  |  |  |  |   timeout: int = 30 | 
			
		
	
		
			
				
					|  |  |  |  |   simulation: bool = True | 
			
		
	
		
			
				
					|  |  |  |  |   main_pub: Optional[str] = None | 
			
		
	
		
			
				
					|  |  |  |  |   main_pub: str | None = None | 
			
		
	
		
			
				
					|  |  |  |  |   main_pub_drained: bool = True | 
			
		
	
		
			
				
					|  |  |  |  |   vision_pubs: List[str] = field(default_factory=list) | 
			
		
	
		
			
				
					|  |  |  |  |   ignore_alive_pubs: List[str] = field(default_factory=list) | 
			
		
	
		
			
				
					|  |  |  |  |   unlocked_pubs: List[str] = field(default_factory=list) | 
			
		
	
		
			
				
					|  |  |  |  |   vision_pubs: list[str] = field(default_factory=list) | 
			
		
	
		
			
				
					|  |  |  |  |   ignore_alive_pubs: list[str] = field(default_factory=list) | 
			
		
	
		
			
				
					|  |  |  |  |   unlocked_pubs: list[str] = field(default_factory=list) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | class ProcessContainer: | 
			
		
	
	
		
			
				
					|  |  |  | @ -150,25 +151,25 @@ class ProcessContainer: | 
			
		
	
		
			
				
					|  |  |  |  |     self.prefix = OpenpilotPrefix(clean_dirs_on_exit=False) | 
			
		
	
		
			
				
					|  |  |  |  |     self.cfg = copy.deepcopy(cfg) | 
			
		
	
		
			
				
					|  |  |  |  |     self.process = copy.deepcopy(managed_processes[cfg.proc_name]) | 
			
		
	
		
			
				
					|  |  |  |  |     self.msg_queue: List[capnp._DynamicStructReader] = [] | 
			
		
	
		
			
				
					|  |  |  |  |     self.msg_queue: list[capnp._DynamicStructReader] = [] | 
			
		
	
		
			
				
					|  |  |  |  |     self.cnt = 0 | 
			
		
	
		
			
				
					|  |  |  |  |     self.pm: Optional[messaging.PubMaster] = None | 
			
		
	
		
			
				
					|  |  |  |  |     self.sockets: Optional[List[messaging.SubSocket]] = None | 
			
		
	
		
			
				
					|  |  |  |  |     self.rc: Optional[ReplayContext] = None | 
			
		
	
		
			
				
					|  |  |  |  |     self.vipc_server: Optional[VisionIpcServer] = None | 
			
		
	
		
			
				
					|  |  |  |  |     self.environ_config: Optional[Dict[str, Any]] = None | 
			
		
	
		
			
				
					|  |  |  |  |     self.capture: Optional[ProcessOutputCapture] = None | 
			
		
	
		
			
				
					|  |  |  |  |     self.pm: messaging.PubMaster | None = None | 
			
		
	
		
			
				
					|  |  |  |  |     self.sockets: list[messaging.SubSocket] | None = None | 
			
		
	
		
			
				
					|  |  |  |  |     self.rc: ReplayContext | None = None | 
			
		
	
		
			
				
					|  |  |  |  |     self.vipc_server: VisionIpcServer | None = None | 
			
		
	
		
			
				
					|  |  |  |  |     self.environ_config: dict[str, Any] | None = None | 
			
		
	
		
			
				
					|  |  |  |  |     self.capture: ProcessOutputCapture | None = None | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   @property | 
			
		
	
		
			
				
					|  |  |  |  |   def has_empty_queue(self) -> bool: | 
			
		
	
		
			
				
					|  |  |  |  |     return len(self.msg_queue) == 0 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   @property | 
			
		
	
		
			
				
					|  |  |  |  |   def pubs(self) -> List[str]: | 
			
		
	
		
			
				
					|  |  |  |  |   def pubs(self) -> list[str]: | 
			
		
	
		
			
				
					|  |  |  |  |     return self.cfg.pubs | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   @property | 
			
		
	
		
			
				
					|  |  |  |  |   def subs(self) -> List[str]: | 
			
		
	
		
			
				
					|  |  |  |  |   def subs(self) -> list[str]: | 
			
		
	
		
			
				
					|  |  |  |  |     return self.cfg.subs | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def _clean_env(self): | 
			
		
	
	
		
			
				
					|  |  |  | @ -180,7 +181,7 @@ class ProcessContainer: | 
			
		
	
		
			
				
					|  |  |  |  |       if k in os.environ: | 
			
		
	
		
			
				
					|  |  |  |  |         del os.environ[k] | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def _setup_env(self, params_config: Dict[str, Any], environ_config: Dict[str, Any]): | 
			
		
	
		
			
				
					|  |  |  |  |   def _setup_env(self, params_config: dict[str, Any], environ_config: dict[str, Any]): | 
			
		
	
		
			
				
					|  |  |  |  |     for k, v in environ_config.items(): | 
			
		
	
		
			
				
					|  |  |  |  |       if len(v) != 0: | 
			
		
	
		
			
				
					|  |  |  |  |         os.environ[k] = v | 
			
		
	
	
		
			
				
					|  |  |  | @ -202,7 +203,7 @@ class ProcessContainer: | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     self.environ_config = environ_config | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def _setup_vision_ipc(self, all_msgs: LogIterable, frs: Dict[str, Any]): | 
			
		
	
		
			
				
					|  |  |  |  |   def _setup_vision_ipc(self, all_msgs: LogIterable, frs: dict[str, Any]): | 
			
		
	
		
			
				
					|  |  |  |  |     assert len(self.cfg.vision_pubs) != 0 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     vipc_server = VisionIpcServer("camerad") | 
			
		
	
	
		
			
				
					|  |  |  | @ -223,9 +224,9 @@ class ProcessContainer: | 
			
		
	
		
			
				
					|  |  |  |  |     self.process.start() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def start( | 
			
		
	
		
			
				
					|  |  |  |  |     self, params_config: Dict[str, Any], environ_config: Dict[str, Any], | 
			
		
	
		
			
				
					|  |  |  |  |     all_msgs: LogIterable, frs: Optional[Dict[str, BaseFrameReader]], | 
			
		
	
		
			
				
					|  |  |  |  |     fingerprint: Optional[str], capture_output: bool | 
			
		
	
		
			
				
					|  |  |  |  |     self, params_config: dict[str, Any], environ_config: dict[str, Any], | 
			
		
	
		
			
				
					|  |  |  |  |     all_msgs: LogIterable, frs: dict[str, BaseFrameReader] | None, | 
			
		
	
		
			
				
					|  |  |  |  |     fingerprint: str | None, capture_output: bool | 
			
		
	
		
			
				
					|  |  |  |  |   ): | 
			
		
	
		
			
				
					|  |  |  |  |     with self.prefix as p: | 
			
		
	
		
			
				
					|  |  |  |  |       self._setup_env(params_config, environ_config) | 
			
		
	
	
		
			
				
					|  |  |  | @ -266,7 +267,7 @@ class ProcessContainer: | 
			
		
	
		
			
				
					|  |  |  |  |       self.prefix.clean_dirs() | 
			
		
	
		
			
				
					|  |  |  |  |       self._clean_env() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   def run_step(self, msg: capnp._DynamicStructReader, frs: Optional[Dict[str, BaseFrameReader]]) -> List[capnp._DynamicStructReader]: | 
			
		
	
		
			
				
					|  |  |  |  |   def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, BaseFrameReader] | None) -> list[capnp._DynamicStructReader]: | 
			
		
	
		
			
				
					|  |  |  |  |     assert self.rc and self.pm and self.sockets and self.process.proc | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     output_msgs = [] | 
			
		
	
	
		
			
				
					|  |  |  | @ -580,7 +581,7 @@ def get_process_config(name: str) -> ProcessConfig: | 
			
		
	
		
			
				
					|  |  |  |  |     raise Exception(f"Cannot find process config with name: {name}") from ex | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def get_custom_params_from_lr(lr: LogIterable, initial_state: str = "first") -> Dict[str, Any]: | 
			
		
	
		
			
				
					|  |  |  |  | def get_custom_params_from_lr(lr: LogIterable, initial_state: str = "first") -> dict[str, Any]: | 
			
		
	
		
			
				
					|  |  |  |  |   """ | 
			
		
	
		
			
				
					|  |  |  |  |   Use this to get custom params dict based on provided logs. | 
			
		
	
		
			
				
					|  |  |  |  |   Useful when replaying following processes: calibrationd, paramsd, torqued | 
			
		
	
	
		
			
				
					|  |  |  | @ -614,7 +615,7 @@ def get_custom_params_from_lr(lr: LogIterable, initial_state: str = "first") -> | 
			
		
	
		
			
				
					|  |  |  |  |   return custom_params | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def replay_process_with_name(name: Union[str, Iterable[str]], lr: LogIterable, *args, **kwargs) -> List[capnp._DynamicStructReader]: | 
			
		
	
		
			
				
					|  |  |  |  | def replay_process_with_name(name: str | Iterable[str], lr: LogIterable, *args, **kwargs) -> list[capnp._DynamicStructReader]: | 
			
		
	
		
			
				
					|  |  |  |  |   if isinstance(name, str): | 
			
		
	
		
			
				
					|  |  |  |  |     cfgs = [get_process_config(name)] | 
			
		
	
		
			
				
					|  |  |  |  |   elif isinstance(name, Iterable): | 
			
		
	
	
		
			
				
					|  |  |  | @ -626,10 +627,10 @@ def replay_process_with_name(name: Union[str, Iterable[str]], lr: LogIterable, * | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def replay_process( | 
			
		
	
		
			
				
					|  |  |  |  |   cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: LogIterable, frs: Optional[Dict[str, BaseFrameReader]] = None, | 
			
		
	
		
			
				
					|  |  |  |  |   fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, | 
			
		
	
		
			
				
					|  |  |  |  |   captured_output_store: Optional[Dict[str, Dict[str, str]]] = None, disable_progress: bool = False | 
			
		
	
		
			
				
					|  |  |  |  | ) -> List[capnp._DynamicStructReader]: | 
			
		
	
		
			
				
					|  |  |  |  |   cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, BaseFrameReader] | None = None, | 
			
		
	
		
			
				
					|  |  |  |  |   fingerprint: str | None = None, return_all_logs: bool = False, custom_params: dict[str, Any] | None = None, | 
			
		
	
		
			
				
					|  |  |  |  |   captured_output_store: dict[str, dict[str, str]] | None = None, disable_progress: bool = False | 
			
		
	
		
			
				
					|  |  |  |  | ) -> list[capnp._DynamicStructReader]: | 
			
		
	
		
			
				
					|  |  |  |  |   if isinstance(cfg, Iterable): | 
			
		
	
		
			
				
					|  |  |  |  |     cfgs = list(cfg) | 
			
		
	
		
			
				
					|  |  |  |  |   else: | 
			
		
	
	
		
			
				
					|  |  |  | @ -654,9 +655,9 @@ def replay_process( | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def _replay_multi_process( | 
			
		
	
		
			
				
					|  |  |  |  |   cfgs: List[ProcessConfig], lr: LogIterable, frs: Optional[Dict[str, BaseFrameReader]], fingerprint: Optional[str], | 
			
		
	
		
			
				
					|  |  |  |  |   custom_params: Optional[Dict[str, Any]], captured_output_store: Optional[Dict[str, Dict[str, str]]], disable_progress: bool | 
			
		
	
		
			
				
					|  |  |  |  | ) -> List[capnp._DynamicStructReader]: | 
			
		
	
		
			
				
					|  |  |  |  |   cfgs: list[ProcessConfig], lr: LogIterable, frs: dict[str, BaseFrameReader] | None, fingerprint: str | None, | 
			
		
	
		
			
				
					|  |  |  |  |   custom_params: dict[str, Any] | None, captured_output_store: dict[str, dict[str, str]] | None, disable_progress: bool | 
			
		
	
		
			
				
					|  |  |  |  | ) -> list[capnp._DynamicStructReader]: | 
			
		
	
		
			
				
					|  |  |  |  |   if fingerprint is not None: | 
			
		
	
		
			
				
					|  |  |  |  |     params_config = generate_params_config(lr=lr, fingerprint=fingerprint, custom_params=custom_params) | 
			
		
	
		
			
				
					|  |  |  |  |     env_config = generate_environ_config(fingerprint=fingerprint) | 
			
		
	
	
		
			
				
					|  |  |  | @ -690,10 +691,10 @@ def _replay_multi_process( | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     pub_msgs = [msg for msg in all_msgs if msg.which() in lr_pubs] | 
			
		
	
		
			
				
					|  |  |  |  |     # external queue for messages taken from logs; internal queue for messages generated by processes, which will be republished | 
			
		
	
		
			
				
					|  |  |  |  |     external_pub_queue: List[capnp._DynamicStructReader] = pub_msgs.copy() | 
			
		
	
		
			
				
					|  |  |  |  |     internal_pub_queue: List[capnp._DynamicStructReader] = [] | 
			
		
	
		
			
				
					|  |  |  |  |     external_pub_queue: list[capnp._DynamicStructReader] = pub_msgs.copy() | 
			
		
	
		
			
				
					|  |  |  |  |     internal_pub_queue: list[capnp._DynamicStructReader] = [] | 
			
		
	
		
			
				
					|  |  |  |  |     # heap for maintaining the order of messages generated by processes, where each element: (logMonoTime, index in internal_pub_queue) | 
			
		
	
		
			
				
					|  |  |  |  |     internal_pub_index_heap: List[Tuple[int, int]] = [] | 
			
		
	
		
			
				
					|  |  |  |  |     internal_pub_index_heap: list[tuple[int, int]] = [] | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     pbar = tqdm(total=len(external_pub_queue), disable=disable_progress) | 
			
		
	
		
			
				
					|  |  |  |  |     while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)): | 
			
		
	
	
		
			
				
					|  |  |  | @ -723,7 +724,7 @@ def _replay_multi_process( | 
			
		
	
		
			
				
					|  |  |  |  |   return log_msgs | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def generate_params_config(lr=None, CP=None, fingerprint=None, custom_params=None) -> Dict[str, Any]: | 
			
		
	
		
			
				
					|  |  |  |  | def generate_params_config(lr=None, CP=None, fingerprint=None, custom_params=None) -> dict[str, Any]: | 
			
		
	
		
			
				
					|  |  |  |  |   params_dict = { | 
			
		
	
		
			
				
					|  |  |  |  |     "OpenpilotEnabledToggle": True, | 
			
		
	
		
			
				
					|  |  |  |  |     "DisengageOnAccelerator": True, | 
			
		
	
	
		
			
				
					|  |  |  | @ -755,7 +756,7 @@ def generate_params_config(lr=None, CP=None, fingerprint=None, custom_params=Non | 
			
		
	
		
			
				
					|  |  |  |  |   return params_dict | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def generate_environ_config(CP=None, fingerprint=None, log_dir=None) -> Dict[str, Any]: | 
			
		
	
		
			
				
					|  |  |  |  | def generate_environ_config(CP=None, fingerprint=None, log_dir=None) -> dict[str, Any]: | 
			
		
	
		
			
				
					|  |  |  |  |   environ_dict = {} | 
			
		
	
		
			
				
					|  |  |  |  |   if platform.system() != "Darwin": | 
			
		
	
		
			
				
					|  |  |  |  |     environ_dict["PARAMS_ROOT"] = "/dev/shm/params" | 
			
		
	
	
		
			
				
					|  |  |  | 
 |