Proc Replay: cleanup iterable log typing (#30261)

* cleanup log typing

* use iterable

* fix those

* Cleanup

* cleanup

* missed that one
old-commit-hash: 1820b5f259
testing-closet
Justin Newberry 2 years ago committed by GitHub
parent 2b5ad3fe86
commit ef1acd4e6c
  1. 4
      selfdrive/test/process_replay/README.md
  2. 15
      selfdrive/test/process_replay/process_replay.py
  3. 4
      selfdrive/test/process_replay/regen.py
  4. 7
      tools/lib/logreader.py

@ -52,10 +52,10 @@ Then, check in the new logs using git-lfs. Make sure to also update the `ref_com
Process replay test suite exposes programmatic APIs for simultaneously running processes or groups of processes on provided logs. Process replay test suite exposes programmatic APIs for simultaneously running processes or groups of processes on provided logs.
```py ```py
def replay_process_with_name(name: Union[str, Iterable[str]], lr: Union[LogReader, List[capnp._DynamicStructReader]], *args, **kwargs) -> List[capnp._DynamicStructReader]: def replay_process_with_name(name: Union[str, Iterable[str]], lr: LogIterable, *args, **kwargs) -> List[capnp._DynamicStructReader]:
def replay_process( def replay_process(
cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None, cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: LogIterable, frs: Optional[Dict[str, Any]] = None,
fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, disable_progress: bool = False fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, disable_progress: bool = False
) -> List[capnp._DynamicStructReader]: ) -> List[capnp._DynamicStructReader]:
``` ```

@ -26,7 +26,7 @@ from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams from openpilot.selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams
from openpilot.selfdrive.test.process_replay.migration import migrate_all from openpilot.selfdrive.test.process_replay.migration import migrate_all
from openpilot.selfdrive.test.process_replay.capture import ProcessOutputCapture from openpilot.selfdrive.test.process_replay.capture import ProcessOutputCapture
from openpilot.tools.lib.logreader import LogReader from openpilot.tools.lib.logreader import LogIterable
# Numpy gives different results based on CPU features after version 19 # Numpy gives different results based on CPU features after version 19
NUMPY_TOLERANCE = 1e-7 NUMPY_TOLERANCE = 1e-7
@ -224,7 +224,7 @@ class ProcessContainer:
def start( def start(
self, params_config: Dict[str, Any], environ_config: Dict[str, Any], self, params_config: Dict[str, Any], environ_config: Dict[str, Any],
all_msgs: Union[LogReader, List[capnp._DynamicStructReader]], all_msgs: LogIterable,
fingerprint: Optional[str], capture_output: bool fingerprint: Optional[str], capture_output: bool
): ):
with self.prefix as p: with self.prefix as p:
@ -599,7 +599,7 @@ def get_process_config(name: str) -> ProcessConfig:
raise Exception(f"Cannot find process config with name: {name}") from ex raise Exception(f"Cannot find process config with name: {name}") from ex
def get_custom_params_from_lr(lr: Union[LogReader, List[capnp._DynamicStructReader]], initial_state: str = "first") -> Dict[str, Any]: def get_custom_params_from_lr(lr: LogIterable, initial_state: str = "first") -> Dict[str, Any]:
""" """
Use this to get custom params dict based on provided logs. Use this to get custom params dict based on provided logs.
Useful when replaying following processes: calibrationd, paramsd, torqued Useful when replaying following processes: calibrationd, paramsd, torqued
@ -631,8 +631,7 @@ def get_custom_params_from_lr(lr: Union[LogReader, List[capnp._DynamicStructRead
return custom_params return custom_params
def replay_process_with_name(name: Union[str, Iterable[str]], lr: Union[LogReader, def replay_process_with_name(name: Union[str, Iterable[str]], lr: LogIterable, *args, **kwargs) -> List[capnp._DynamicStructReader]:
List[capnp._DynamicStructReader]], *args, **kwargs) -> List[capnp._DynamicStructReader]:
if isinstance(name, str): if isinstance(name, str):
cfgs = [get_process_config(name)] cfgs = [get_process_config(name)]
elif isinstance(name, Iterable): elif isinstance(name, Iterable):
@ -644,7 +643,7 @@ def replay_process_with_name(name: Union[str, Iterable[str]], lr: Union[LogReade
def replay_process( def replay_process(
cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None, cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: LogIterable, frs: Optional[Dict[str, Any]] = None,
fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None,
captured_output_store: Optional[Dict[str, Dict[str, str]]] = None, disable_progress: bool = False captured_output_store: Optional[Dict[str, Dict[str, str]]] = None, disable_progress: bool = False
) -> List[capnp._DynamicStructReader]: ) -> List[capnp._DynamicStructReader]:
@ -672,7 +671,7 @@ def replay_process(
def _replay_multi_process( def _replay_multi_process(
cfgs: List[ProcessConfig], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]], fingerprint: Optional[str], cfgs: List[ProcessConfig], lr: LogIterable, frs: Optional[Dict[str, Any]], fingerprint: Optional[str],
custom_params: Optional[Dict[str, Any]], captured_output_store: Optional[Dict[str, Dict[str, str]]], disable_progress: bool custom_params: Optional[Dict[str, Any]], captured_output_store: Optional[Dict[str, Dict[str, str]]], disable_progress: bool
) -> List[capnp._DynamicStructReader]: ) -> List[capnp._DynamicStructReader]:
if fingerprint is not None: if fingerprint is not None:
@ -799,7 +798,7 @@ def generate_environ_config(CP=None, fingerprint=None, log_dir=None) -> Dict[str
return environ_dict return environ_dict
def check_openpilot_enabled(msgs: Union[LogReader, List[capnp._DynamicStructReader]]) -> bool: def check_openpilot_enabled(msgs: LogIterable) -> bool:
cur_enabled_count = 0 cur_enabled_count = 0
max_enabled_count = 0 max_enabled_count = 0
for msg in msgs: for msg in msgs:

@ -11,12 +11,12 @@ from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS, FAKE
from openpilot.selfdrive.test.update_ci_routes import upload_route from openpilot.selfdrive.test.update_ci_routes import upload_route
from openpilot.tools.lib.route import Route from openpilot.tools.lib.route import Route
from openpilot.tools.lib.framereader import FrameReader from openpilot.tools.lib.framereader import FrameReader
from openpilot.tools.lib.logreader import LogReader from openpilot.tools.lib.logreader import LogReader, LogIterable
from openpilot.tools.lib.helpers import save_log from openpilot.tools.lib.helpers import save_log
def regen_segment( def regen_segment(
lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None, lr: LogIterable, frs: Optional[Dict[str, Any]] = None,
processes: Iterable[ProcessConfig] = CONFIGS, disable_tqdm: bool = False processes: Iterable[ProcessConfig] = CONFIGS, disable_tqdm: bool = False
) -> List[capnp._DynamicStructReader]: ) -> List[capnp._DynamicStructReader]:
all_msgs = sorted(lr, key=lambda m: m.logMonoTime) all_msgs = sorted(lr, key=lambda m: m.logMonoTime)

@ -6,11 +6,14 @@ import urllib.parse
import capnp import capnp
import warnings import warnings
from typing import Iterable, Iterator
from cereal import log as capnp_log from cereal import log as capnp_log
from openpilot.tools.lib.filereader import FileReader from openpilot.tools.lib.filereader import FileReader
from openpilot.tools.lib.route import Route, SegmentName from openpilot.tools.lib.route import Route, SegmentName
LogIterable = Iterable[capnp._DynamicStructReader]
# this is an iterator itself, and uses private variables from LogReader # this is an iterator itself, and uses private variables from LogReader
class MultiLogIterator: class MultiLogIterator:
def __init__(self, log_paths, sort_by_time=False): def __init__(self, log_paths, sort_by_time=False):
@ -30,7 +33,7 @@ class MultiLogIterator:
return self._log_readers[i] return self._log_readers[i]
def __iter__(self): def __iter__(self) -> Iterator[capnp._DynamicStructReader]:
return self return self
def _inc(self): def _inc(self):
@ -107,7 +110,7 @@ class LogReader:
def from_bytes(cls, dat): def from_bytes(cls, dat):
return cls("", dat=dat) return cls("", dat=dat)
def __iter__(self): def __iter__(self) -> Iterator[capnp._DynamicStructReader]:
for ent in self._ents: for ent in self._ents:
if self._only_union_types: if self._only_union_types:
try: try:

Loading…
Cancel
Save