diff --git a/selfdrive/test/process_replay/README.md b/selfdrive/test/process_replay/README.md index 1174dbab83..6992d46f1d 100644 --- a/selfdrive/test/process_replay/README.md +++ b/selfdrive/test/process_replay/README.md @@ -52,10 +52,10 @@ Then, check in the new logs using git-lfs. Make sure to also update the `ref_com Process replay test suite exposes programmatic APIs for simultaneously running processes or groups of processes on provided logs. ```py -def replay_process_with_name(name: Union[str, Iterable[str]], lr: Union[LogReader, List[capnp._DynamicStructReader]], *args, **kwargs) -> List[capnp._DynamicStructReader]: +def replay_process_with_name(name: Union[str, Iterable[str]], lr: LogIterable, *args, **kwargs) -> List[capnp._DynamicStructReader]: def replay_process( - cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None, + cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: LogIterable, frs: Optional[Dict[str, Any]] = None, fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, disable_progress: bool = False ) -> List[capnp._DynamicStructReader]: ``` diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 22a150a2ea..9aab541991 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -26,7 +26,7 @@ from openpilot.selfdrive.manager.process_config import managed_processes from openpilot.selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams from openpilot.selfdrive.test.process_replay.migration import migrate_all from openpilot.selfdrive.test.process_replay.capture import ProcessOutputCapture -from openpilot.tools.lib.logreader import LogReader +from openpilot.tools.lib.logreader import LogIterable # Numpy gives different results based on CPU features after version 19 NUMPY_TOLERANCE = 1e-7 @@ -224,7 +224,7 @@ class ProcessContainer: def start( self, params_config: Dict[str, Any], environ_config: Dict[str, Any], - all_msgs: Union[LogReader, List[capnp._DynamicStructReader]], + all_msgs: LogIterable, fingerprint: Optional[str], capture_output: bool ): with self.prefix as p: @@ -599,7 +599,7 @@ def get_process_config(name: str) -> ProcessConfig: raise Exception(f"Cannot find process config with name: {name}") from ex -def get_custom_params_from_lr(lr: Union[LogReader, List[capnp._DynamicStructReader]], initial_state: str = "first") -> Dict[str, Any]: +def get_custom_params_from_lr(lr: LogIterable, initial_state: str = "first") -> Dict[str, Any]: """ Use this to get custom params dict based on provided logs. Useful when replaying following processes: calibrationd, paramsd, torqued @@ -631,8 +631,7 @@ def get_custom_params_from_lr(lr: Union[LogReader, List[capnp._DynamicStructRead return custom_params -def replay_process_with_name(name: Union[str, Iterable[str]], lr: Union[LogReader, - List[capnp._DynamicStructReader]], *args, **kwargs) -> List[capnp._DynamicStructReader]: +def replay_process_with_name(name: Union[str, Iterable[str]], lr: LogIterable, *args, **kwargs) -> List[capnp._DynamicStructReader]: if isinstance(name, str): cfgs = [get_process_config(name)] elif isinstance(name, Iterable): @@ -644,7 +643,7 @@ def replay_process_with_name(name: Union[str, Iterable[str]], lr: Union[LogReade def replay_process( - cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None, + cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: LogIterable, frs: Optional[Dict[str, Any]] = None, fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, captured_output_store: Optional[Dict[str, Dict[str, str]]] = None, disable_progress: bool = False ) -> List[capnp._DynamicStructReader]: @@ -672,7 +671,7 @@ def replay_process( def _replay_multi_process( - cfgs: List[ProcessConfig], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]], fingerprint: Optional[str], + cfgs: List[ProcessConfig], lr: LogIterable, frs: Optional[Dict[str, Any]], fingerprint: Optional[str], custom_params: Optional[Dict[str, Any]], captured_output_store: Optional[Dict[str, Dict[str, str]]], disable_progress: bool ) -> List[capnp._DynamicStructReader]: if fingerprint is not None: @@ -799,7 +798,7 @@ def generate_environ_config(CP=None, fingerprint=None, log_dir=None) -> Dict[str return environ_dict -def check_openpilot_enabled(msgs: Union[LogReader, List[capnp._DynamicStructReader]]) -> bool: +def check_openpilot_enabled(msgs: LogIterable) -> bool: cur_enabled_count = 0 max_enabled_count = 0 for msg in msgs: diff --git a/selfdrive/test/process_replay/regen.py b/selfdrive/test/process_replay/regen.py index 51182bf8af..5024ecaee8 100755 --- a/selfdrive/test/process_replay/regen.py +++ b/selfdrive/test/process_replay/regen.py @@ -11,12 +11,12 @@ from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS, FAKE from openpilot.selfdrive.test.update_ci_routes import upload_route from openpilot.tools.lib.route import Route from openpilot.tools.lib.framereader import FrameReader -from openpilot.tools.lib.logreader import LogReader +from openpilot.tools.lib.logreader import LogReader, LogIterable from openpilot.tools.lib.helpers import save_log def regen_segment( - lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None, + lr: LogIterable, frs: Optional[Dict[str, Any]] = None, processes: Iterable[ProcessConfig] = CONFIGS, disable_tqdm: bool = False ) -> List[capnp._DynamicStructReader]: all_msgs = sorted(lr, key=lambda m: m.logMonoTime) diff --git a/tools/lib/logreader.py b/tools/lib/logreader.py index e528996f32..4af922c774 100755 --- a/tools/lib/logreader.py +++ b/tools/lib/logreader.py @@ -6,11 +6,14 @@ import urllib.parse import capnp import warnings +from typing import Iterable, Iterator from cereal import log as capnp_log from openpilot.tools.lib.filereader import FileReader from openpilot.tools.lib.route import Route, SegmentName +LogIterable = Iterable[capnp._DynamicStructReader] + # this is an iterator itself, and uses private variables from LogReader class MultiLogIterator: def __init__(self, log_paths, sort_by_time=False): @@ -30,7 +33,7 @@ class MultiLogIterator: return self._log_readers[i] - def __iter__(self): + def __iter__(self) -> Iterator[capnp._DynamicStructReader]: return self def _inc(self): @@ -107,7 +110,7 @@ class LogReader: def from_bytes(cls, dat): return cls("", dat=dat) - def __iter__(self): + def __iter__(self) -> Iterator[capnp._DynamicStructReader]: for ent in self._ents: if self._only_union_types: try: