process_replay: custom params (#28624)

* Add custom params argument

* Fix default arguments

* Fix typing issues
pull/28630/head
Kacper Rączy 2 years ago committed by GitHub
parent ae5bfcf248
commit 3cae32a85f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      selfdrive/test/process_replay/process_replay.py

@ -5,8 +5,9 @@ import signal
import platform import platform
from collections import OrderedDict from collections import OrderedDict
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable from typing import Dict, List, Optional, Callable, Union, Any
from tqdm import tqdm from tqdm import tqdm
import capnp
import cereal.messaging as messaging import cereal.messaging as messaging
from cereal import car from cereal import car
@ -19,6 +20,7 @@ from selfdrive.car.car_helpers import get_car, interfaces
from selfdrive.manager.process_config import managed_processes from selfdrive.manager.process_config import managed_processes
from selfdrive.test.process_replay.helpers import OpenpilotPrefix from selfdrive.test.process_replay.helpers import OpenpilotPrefix
from selfdrive.test.process_replay.migration import migrate_all from selfdrive.test.process_replay.migration import migrate_all
from tools.lib.logreader import LogReader
# Numpy gives different results based on CPU features after version 19 # Numpy gives different results based on CPU features after version 19
NUMPY_TOLERANCE = 1e-7 NUMPY_TOLERANCE = 1e-7
@ -342,9 +344,9 @@ def replay_process_with_name(name, lr, *args, **kwargs):
return replay_process(cfg, lr, *args, **kwargs) return replay_process(cfg, lr, *args, **kwargs)
def replay_process(cfg, lr, fingerprint=None, return_all_logs=False, disable_progress=False): def replay_process(cfg, lr, fingerprint=None, return_all_logs=False, custom_params=None, disable_progress=False):
all_msgs = migrate_all(lr, old_logtime=True) all_msgs = migrate_all(lr, old_logtime=True)
process_logs = _replay_single_process(cfg, all_msgs, fingerprint, disable_progress) process_logs = _replay_single_process(cfg, all_msgs, fingerprint, custom_params or {}, disable_progress)
if return_all_logs: if return_all_logs:
keys = set(cfg.subs) keys = set(cfg.subs)
@ -358,7 +360,10 @@ def replay_process(cfg, lr, fingerprint=None, return_all_logs=False, disable_pro
return log_msgs return log_msgs
def _replay_single_process(cfg, lr, fingerprint, disable_progress): def _replay_single_process(
cfg: ProcessConfig, lr: Union[LogReader, List[capnp._DynamicStructReader]],
fingerprint: Optional[str], custom_params: Optional[Dict[str, Any]], disable_progress: bool
):
with OpenpilotPrefix(): with OpenpilotPrefix():
controlsState = None controlsState = None
initialized = False initialized = False
@ -374,11 +379,11 @@ def _replay_single_process(cfg, lr, fingerprint, disable_progress):
assert controlsState is not None and initialized, "controlsState never initialized" assert controlsState is not None and initialized, "controlsState never initialized"
if fingerprint is not None: if fingerprint is not None:
setup_env(cfg=cfg, controlsState=controlsState, lr=lr, fingerprint=fingerprint) setup_env(cfg=cfg, controlsState=controlsState, lr=lr, fingerprint=fingerprint, custom_params=custom_params)
else: else:
CP = next((m.carParams for m in lr if m.which() == "carParams"), None) CP = next((m.carParams for m in lr if m.which() == "carParams"), None)
assert CP is not None or "carParams" not in cfg.pubs, "carParams are missing and process needs it" assert CP is not None or "carParams" not in cfg.pubs, "carParams are missing and process needs it"
setup_env(CP=CP, cfg=cfg, controlsState=controlsState, lr=lr) setup_env(CP=CP, cfg=cfg, controlsState=controlsState, lr=lr, custom_params=custom_params)
if cfg.config_callback is not None: if cfg.config_callback is not None:
params = Params() params = Params()
@ -440,7 +445,8 @@ def _replay_single_process(cfg, lr, fingerprint, disable_progress):
m.logMonoTime = msg.logMonoTime m.logMonoTime = msg.logMonoTime
log_msgs.append(m.as_reader()) log_msgs.append(m.as_reader())
cnt += 1 cnt += 1
assert(managed_processes[cfg.proc_name].proc.is_alive()) proc = managed_processes[cfg.proc_name].proc
assert(proc and proc.is_alive())
finally: finally:
managed_processes[cfg.proc_name].signal(signal.SIGKILL) managed_processes[cfg.proc_name].signal(signal.SIGKILL)
managed_processes[cfg.proc_name].stop() managed_processes[cfg.proc_name].stop()
@ -448,7 +454,7 @@ def _replay_single_process(cfg, lr, fingerprint, disable_progress):
return log_msgs return log_msgs
def setup_env(CP=None, cfg=None, controlsState=None, lr=None, fingerprint=None, log_dir=None): def setup_env(cfg=None, CP=None, controlsState=None, lr=None, fingerprint=None, custom_params=None, log_dir=None):
if platform.system() != "Darwin": if platform.system() != "Darwin":
os.environ["PARAMS_ROOT"] = "/dev/shm/params" os.environ["PARAMS_ROOT"] = "/dev/shm/params"
if log_dir is not None: if log_dir is not None:
@ -460,6 +466,12 @@ def setup_env(CP=None, cfg=None, controlsState=None, lr=None, fingerprint=None,
params.put_bool("Passive", False) params.put_bool("Passive", False)
params.put_bool("DisengageOnAccelerator", True) params.put_bool("DisengageOnAccelerator", True)
params.put_bool("DisableLogging", False) params.put_bool("DisableLogging", False)
if custom_params is not None:
for k, v in custom_params.items():
if type(v) == bool:
params.put_bool(k, v)
else:
params.put(k, v)
os.environ["NO_RADAR_SLEEP"] = "1" os.environ["NO_RADAR_SLEEP"] = "1"
os.environ["REPLAY"] = "1" os.environ["REPLAY"] = "1"

Loading…
Cancel
Save