diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 0cb8ac9cac..a520b3d740 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -27,6 +27,7 @@ from openpilot.selfdrive.test.process_replay.vision_meta import meta_from_camera from openpilot.selfdrive.test.process_replay.migration import migrate_all from openpilot.selfdrive.test.process_replay.capture import ProcessOutputCapture from openpilot.tools.lib.logreader import LogIterable +from openpilot.tools.lib.framereader import BaseFrameReader # Numpy gives different results based on CPU features after version 19 NUMPY_TOLERANCE = 1e-7 @@ -201,16 +202,15 @@ class ProcessContainer: self.environ_config = environ_config - def _setup_vision_ipc(self, all_msgs): + def _setup_vision_ipc(self, all_msgs: LogIterable, frs: Dict[str, Any]): assert len(self.cfg.vision_pubs) != 0 - device_type = next(str(msg.initData.deviceType) for msg in all_msgs if msg.which() == "initData") - vipc_server = VisionIpcServer("camerad") streams_metas = available_streams(all_msgs) for meta in streams_metas: if meta.camera_state in self.cfg.vision_pubs: - vipc_server.create_buffers(meta.stream, 2, False, *meta.frame_sizes[device_type]) + frame_size = (frs[meta.camera_state].w, frs[meta.camera_state].h) + vipc_server.create_buffers(meta.stream, 2, False, *frame_size) vipc_server.start_listener() self.vipc_server = vipc_server @@ -224,7 +224,7 @@ class ProcessContainer: def start( self, params_config: Dict[str, Any], environ_config: Dict[str, Any], - all_msgs: LogIterable, + all_msgs: LogIterable, frs: Optional[Dict[str, BaseFrameReader]], fingerprint: Optional[str], capture_output: bool ): with self.prefix as p: @@ -241,7 +241,8 @@ class ProcessContainer: self.sockets = [messaging.sub_sock(s, timeout=100) for s in self.cfg.subs] if len(self.cfg.vision_pubs) != 0: - self._setup_vision_ipc(all_msgs) + assert frs is not None + self._setup_vision_ipc(all_msgs, frs) assert self.vipc_server is not None if capture_output: @@ -265,7 +266,7 @@ class ProcessContainer: self.prefix.clean_dirs() self._clean_env() - def run_step(self, msg: capnp._DynamicStructReader, frs: Optional[Dict[str, Any]]) -> List[capnp._DynamicStructReader]: + def run_step(self, msg: capnp._DynamicStructReader, frs: Optional[Dict[str, BaseFrameReader]]) -> List[capnp._DynamicStructReader]: assert self.rc and self.pm and self.sockets and self.process.proc output_msgs = [] @@ -622,7 +623,7 @@ def replay_process_with_name(name: Union[str, Iterable[str]], lr: LogIterable, * def replay_process( - cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: LogIterable, frs: Optional[Dict[str, Any]] = None, + cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: LogIterable, frs: Optional[Dict[str, BaseFrameReader]] = None, fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, captured_output_store: Optional[Dict[str, Dict[str, str]]] = None, disable_progress: bool = False ) -> List[capnp._DynamicStructReader]: @@ -650,7 +651,7 @@ def replay_process( def _replay_multi_process( - cfgs: List[ProcessConfig], lr: LogIterable, frs: Optional[Dict[str, Any]], fingerprint: Optional[str], + cfgs: List[ProcessConfig], lr: LogIterable, frs: Optional[Dict[str, BaseFrameReader]], fingerprint: Optional[str], custom_params: Optional[Dict[str, Any]], captured_output_store: Optional[Dict[str, Dict[str, str]]], disable_progress: bool ) -> List[capnp._DynamicStructReader]: if fingerprint is not None: @@ -677,7 +678,7 @@ def _replay_multi_process( for cfg in cfgs: container = ProcessContainer(cfg) containers.append(container) - container.start(params_config, env_config, all_msgs, fingerprint, captured_output_store is not None) + container.start(params_config, env_config, all_msgs, frs, fingerprint, captured_output_store is not None) all_pubs = {pub for container in containers for pub in container.pubs} all_subs = {sub for container in containers for sub in container.subs}