diff --git a/cereal b/cereal index cf9c5cbb91..6f7102581f 160000 --- a/cereal +++ b/cereal @@ -1 +1 @@ -Subproject commit cf9c5cbb9196f80a25bd8421a9921ce4801a7561 +Subproject commit 6f7102581f57eb5074b816cc2cfd984218916773 diff --git a/selfdrive/test/process_replay/helpers.py b/selfdrive/test/process_replay/helpers.py index 8571f36c36..1b30eca103 100644 --- a/selfdrive/test/process_replay/helpers.py +++ b/selfdrive/test/process_replay/helpers.py @@ -2,12 +2,15 @@ import os import shutil import uuid +from typing import List, Optional + from common.params import Params class OpenpilotPrefix(object): - def __init__(self, prefix: str = None) -> None: + def __init__(self, prefix: str = None, clean_dirs_on_exit: bool = True): self.prefix = prefix if prefix else str(uuid.uuid4()) self.msgq_path = os.path.join('/dev/shm', self.prefix) + self.clean_dirs_on_exit = clean_dirs_on_exit def __enter__(self): os.environ['OPENPILOT_PREFIX'] = self.prefix @@ -17,10 +20,28 @@ class OpenpilotPrefix(object): pass def __exit__(self, exc_type, exc_obj, exc_tb): + if self.clean_dirs_on_exit: + self.clean_dirs() + del os.environ['OPENPILOT_PREFIX'] + return False + + def clean_dirs(self): symlink_path = Params().get_param_path() if os.path.exists(symlink_path): shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True) os.remove(symlink_path) shutil.rmtree(self.msgq_path, ignore_errors=True) - del os.environ['OPENPILOT_PREFIX'] - return False + + +class DummySocket: + def __init__(self): + self.data: List[bytes] = [] + + def receive(self, non_blocking: bool = False) -> Optional[bytes]: + if non_blocking: + return None + + return self.data.pop() + + def send(self, data: bytes): + self.data.append(data) diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 9d1efe255d..6c49ba3449 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -1,11 +1,14 @@ #!/usr/bin/env python3 import os import time +import copy +import json +import heapq import signal import platform from collections import OrderedDict from dataclasses import dataclass, field -from typing import Dict, List, Optional, Callable, Union, Any +from typing import Dict, List, Optional, Callable, Union, Any, Iterable, Tuple from tqdm import tqdm import capnp @@ -19,18 +22,17 @@ from common.realtime import DT_CTRL from panda.python import ALTERNATIVE_EXPERIENCE from selfdrive.car.car_helpers import get_car, interfaces from selfdrive.manager.process_config import managed_processes -from selfdrive.test.process_replay.helpers import OpenpilotPrefix +from selfdrive.test.process_replay.helpers import OpenpilotPrefix, DummySocket from selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams from selfdrive.test.process_replay.migration import migrate_all from tools.lib.logreader import LogReader # Numpy gives different results based on CPU features after version 19 NUMPY_TOLERANCE = 1e-7 -CI = "CI" in os.environ -TIMEOUT = 15 PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__)) FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/") + class ReplayContext: def __init__(self, cfg): self.proc_name = cfg.proc_name @@ -40,6 +42,14 @@ class ReplayContext: assert(len(self.pubs) != 0 or self.main_pub is not None) def __enter__(self): + self.open() + + return self + + def __exit__(self, exc_type, exc_obj, exc_tb): + self.close() + + def open(self): messaging.toggle_fake_events(True) messaging.set_fake_prefix(self.proc_name) @@ -50,9 +60,7 @@ class ReplayContext: else: self.events = {self.main_pub: messaging.fake_event_handle(self.main_pub, enable=True)} - return self - - def __exit__(self, exc_type, exc_obj, exc_tb): + def close(self): del self.events messaging.toggle_fake_events(False) @@ -101,8 +109,7 @@ class ProcessConfig: init_callback: Optional[Callable] = None should_recv_callback: Optional[Callable] = None tolerance: Optional[float] = None - environ: Dict[str, str] = field(default_factory=dict) - subtest_name: str = "" + processing_time: float = 0.001 field_tolerances: Dict[str, float] = field(default_factory=dict) timeout: int = 30 simulation: bool = True @@ -112,18 +119,151 @@ class ProcessConfig: ignore_alive_pubs: List[str] = field(default_factory=list) -class DummySocket: - def __init__(self): - self.data = [] +class ProcessContainer: + def __init__(self, cfg: ProcessConfig): + self.prefix = OpenpilotPrefix(clean_dirs_on_exit=False) + self.cfg = copy.deepcopy(cfg) + self.process = managed_processes[cfg.proc_name] + self.msg_queue: List[capnp._DynamicStructReader] = [] + self.cnt = 0 + self.pm: Optional[messaging.PubMaster] = None + self.sockets: Optional[List[messaging.SubSocket]] = None + self.rc: Optional[ReplayContext] = None + self.vipc_server: Optional[VisionIpcServer] = None + + @property + def has_empty_queue(self) -> bool: + return len(self.msg_queue) == 0 + + @property + def pubs(self) -> List[str]: + return self.cfg.pubs + + @property + def subs(self) -> List[str]: + return self.cfg.subs + + def _setup_env(self, params_config: Dict[str, Any], environ_config: Dict[str, Any]): + for k, v in environ_config.items(): + if len(v) != 0: + os.environ[k] = v + elif k in os.environ: + del os.environ[k] + + os.environ["PROC_NAME"] = self.cfg.proc_name + if self.cfg.simulation: + os.environ["SIMULATION"] = "1" + elif "SIMULATION" in os.environ: + del os.environ["SIMULATION"] + + params = Params() + for k, v in params_config.items(): + if isinstance(v, bool): + params.put_bool(k, v) + else: + params.put(k, v) + + def _setup_vision_ipc(self, all_msgs): + assert len(self.cfg.vision_pubs) != 0 + + device_type = next(msg.initData.deviceType for msg in all_msgs if msg.which() == "initData") + + vipc_server = VisionIpcServer("camerad") + streams_metas = available_streams(all_msgs) + for meta in streams_metas: + if meta.camera_state in self.cfg.vision_pubs: + vipc_server.create_buffers(meta.stream, 2, False, *meta.frame_sizes[device_type]) + vipc_server.start_listener() + + self.vipc_server = vipc_server + + def start( + self, params_config: Dict[str, Any], environ_config: Dict[str, Any], + all_msgs: Union[LogReader, List[capnp._DynamicStructReader]], fingerprint: Optional[str] + ): + with self.prefix: + self._setup_env(params_config, environ_config) + + if self.cfg.config_callback is not None: + params = Params() + self.cfg.config_callback(params, self.cfg, all_msgs) + + self.rc = ReplayContext(self.cfg) + self.rc.open() + + self.pm = messaging.PubMaster(self.cfg.pubs) + self.sockets = [messaging.sub_sock(s, timeout=100) for s in self.cfg.subs] + + if len(self.cfg.vision_pubs) != 0: + self._setup_vision_ipc(all_msgs) + assert self.vipc_server is not None + + self.process.prepare() + self.process.start() + + if self.cfg.init_callback is not None: + self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint) + + # wait for process to startup + with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(self.cfg.proc_name)}"): + while not all(self.pm.all_readers_updated(s) for s in self.cfg.pubs if s not in self.cfg.ignore_alive_pubs): + time.sleep(0) + + def stop(self): + with self.prefix: + self.process.signal(signal.SIGKILL) + self.process.stop() + self.rc.close() + self.prefix.clean_dirs() + + def run_step(self, msg: capnp._DynamicStructReader, frs: Optional[Dict[str, Any]]) -> List[capnp._DynamicStructReader]: + assert self.rc and self.pm and self.sockets and self.process.proc + + output_msgs = [] + with self.prefix, Timeout(self.cfg.timeout, error_msg=f"timed out testing process {repr(self.cfg.proc_name)}"): + end_of_cycle = True + if self.cfg.should_recv_callback is not None: + end_of_cycle = self.cfg.should_recv_callback(msg, self.cfg, self.cnt) + + self.msg_queue.append(msg) + if end_of_cycle: + self.rc.wait_for_recv_called() + + # call recv to let sub-sockets reconnect, after we know the process is ready + if self.cnt == 0: + for s in self.sockets: + messaging.recv_one_or_none(s) + + # empty recv on drained pub indicates the end of messages, only do that if there're any + trigger_empty_recv = False + if self.cfg.main_pub and self.cfg.main_pub_drained: + trigger_empty_recv = next((True for m in self.msg_queue if m.which() == self.cfg.main_pub), False) + + for m in self.msg_queue: + self.pm.send(m.which(), m.as_builder()) + # send frames if needed + if self.vipc_server is not None and m.which() in self.cfg.vision_pubs: + camera_state = getattr(m, m.which()) + camera_meta = meta_from_camera_state(m.which()) + assert frs is not None + img = frs[m.which()].get(camera_state.frameId, pix_fmt="nv12")[0] + self.vipc_server.send(camera_meta.stream, img.flatten().tobytes(), + camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof) + self.msg_queue = [] - def receive(self, non_blocking=False): - if non_blocking: - return None + self.rc.unlock_sockets() + self.rc.wait_for_next_recv(trigger_empty_recv) - return self.data.pop() + for socket in self.sockets: + ms = messaging.drain_sock(socket) + for m in ms: + m = m.as_builder() + m.logMonoTime = msg.logMonoTime + int(self.cfg.processing_time * 1e9) + output_msgs.append(m.as_reader()) + self.cnt += 1 + assert self.process.proc.is_alive() - def send(self, data): - self.data.append(data) + return output_msgs def controlsd_fingerprint_callback(rc, pm, msgs, fingerprint): @@ -242,21 +382,38 @@ class FrequencyBasedRcvCallback: if frame % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0 ] return bool(len(resp_sockets)) + + +def controlsd_config_callback(params, cfg, lr): + controlsState = None + initialized = False + for msg in lr: + if msg.which() == "controlsState": + controlsState = msg.controlsState + if initialized: + break + elif msg.which() == "carEvents": + initialized = car.CarEvent.EventName.controlsInitializing not in [e.name for e in msg.carEvents] + assert controlsState is not None and initialized, "controlsState never initialized" + params.put("ReplayControlsState", controlsState.as_builder().to_bytes()) -def laikad_config_pubsub_callback(params, cfg): + +def laikad_config_pubsub_callback(params, cfg, lr): ublox = params.get_bool("UbloxAvailable") main_key = "ubloxGnss" if ublox else "qcomGnss" sub_keys = ({"qcomGnss", } if ublox else {"ubloxGnss", }) - return set(cfg.pubs) - sub_keys, main_key, True + cfg.pubs = set(cfg.pubs) - sub_keys + cfg.main_pub = main_key + cfg.main_pub_drained = True -def locationd_config_pubsub_callback(params, cfg): +def locationd_config_pubsub_callback(params, cfg, lr): ublox = params.get_bool("UbloxAvailable") sub_keys = ({"gpsLocation", } if ublox else {"gpsLocationExternal", }) - return set(cfg.pubs) - sub_keys, None, False + cfg.pubs = set(cfg.pubs) - sub_keys CONFIGS = [ @@ -270,9 +427,11 @@ CONFIGS = [ ], subs=["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"], ignore=["logMonoTime", "valid", "controlsState.startMonoTime", "controlsState.cumLagMs"], + config_callback=controlsd_config_callback, init_callback=controlsd_fingerprint_callback, should_recv_callback=controlsd_rcv_callback, tolerance=NUMPY_TOLERANCE, + processing_time=0.004, main_pub="can", ), ProcessConfig( @@ -327,6 +486,7 @@ CONFIGS = [ init_callback=get_car_params_callback, should_recv_callback=FrequencyBasedRcvCallback("liveLocationKalman"), tolerance=NUMPY_TOLERANCE, + processing_time=0.004, ), ProcessConfig( proc_name="ubloxd", @@ -341,8 +501,9 @@ CONFIGS = [ ignore=["logMonoTime"], config_callback=laikad_config_pubsub_callback, tolerance=NUMPY_TOLERANCE, + processing_time=0.002, timeout=60*10, # first messages are blocked on internet assistance - main_pub="ubloxGnss", # config_callback will switch this to qcom if needed + main_pub="ubloxGnss", # config_callback will switch this to qcom if needed ), ProcessConfig( proc_name="torqued", @@ -360,6 +521,7 @@ CONFIGS = [ ignore=["logMonoTime", "modelV2.frameDropPerc", "modelV2.modelExecutionTime"], should_recv_callback=ModeldCameraSyncRcvCallback(), tolerance=NUMPY_TOLERANCE, + processing_time=0.020, main_pub=vipc_get_endpoint_name("camerad", meta_from_camera_state("roadCameraState").stream), main_pub_drained=False, vision_pubs=["roadCameraState", "wideRoadCameraState"], @@ -372,6 +534,7 @@ CONFIGS = [ ignore=["logMonoTime", "driverStateV2.modelExecutionTime", "driverStateV2.dspExecutionTime"], should_recv_callback=dmonitoringmodeld_rcv_callback, tolerance=NUMPY_TOLERANCE, + processing_time=0.020, main_pub=vipc_get_endpoint_name("camerad", meta_from_camera_state("driverCameraState").stream), main_pub_drained=False, vision_pubs=["driverCameraState"], @@ -380,27 +543,73 @@ CONFIGS = [ ] -def get_process_config(name): +def get_process_config(name: str) -> ProcessConfig: try: - return next(c for c in CONFIGS if c.proc_name == name) + return copy.deepcopy(next(c for c in CONFIGS if c.proc_name == name)) except StopIteration as ex: raise Exception(f"Cannot find process config with name: {name}") from ex -def replay_process_with_name(name, lr, *args, **kwargs): - cfg = get_process_config(name) - return replay_process(cfg, lr, *args, **kwargs) +def get_custom_params_from_lr(lr: Union[LogReader, List[capnp._DynamicStructReader]], initial_state: str = "first") -> Dict[str, Any]: + """ + Use this to get custom params dict based on provided logs. + Useful when replaying following processes: calibrationd, paramsd, torqued + The params may be based on first or last message of given type (carParams, liveCalibration, liveParameters, liveTorqueParameters) in the logs. + """ + + car_params = [m for m in lr if m.which() == "carParams"] + live_calibration = [m for m in lr if m.which() == "liveCalibration"] + live_parameters = [m for m in lr if m.which() == "liveParameters"] + live_torque_parameters = [m for m in lr if m.which() == "liveTorqueParameters"] + + assert initial_state in ["first", "last"] + msg_index = 0 if initial_state == "first" else -1 + + assert len(car_params) > 0, "carParams required for initial state of liveParameters and liveTorqueCarParams" + CP = car_params[msg_index].carParams + custom_params = {} + if len(live_calibration) > 0: + custom_params["CalibrationParams"] = live_calibration[msg_index].as_builder().to_bytes() + if len(live_parameters) > 0: + lp_dict = live_parameters[msg_index].to_dict() + lp_dict["carFingerprint"] = CP.carFingerprint + custom_params["LiveParameters"] = json.dumps(lp_dict) + if len(live_torque_parameters) > 0: + custom_params["LiveTorqueCarParams"] = CP.as_builder().to_bytes() + custom_params["LiveTorqueParameters"] = live_torque_parameters[msg_index].as_builder().to_bytes() -def replay_process(cfg, lr, frs=None, fingerprint=None, return_all_logs=False, custom_params=None, disable_progress=False): - all_msgs = migrate_all(lr, old_logtime=True, camera_states=len(cfg.vision_pubs) != 0) - process_logs = _replay_single_process(cfg, all_msgs, frs, fingerprint, custom_params, disable_progress) + return custom_params + + +def replay_process_with_name(name: Union[str, Iterable[str]], lr: Union[LogReader, List[capnp._DynamicStructReader]], *args, **kwargs) -> List[capnp._DynamicStructReader]: + if isinstance(name, str): + cfgs = [get_process_config(name)] + elif isinstance(name, Iterable): + cfgs = [get_process_config(n) for n in name] + else: + raise ValueError("name must be str or collections of strings") + + return replay_process(cfgs, lr, *args, **kwargs) + + +def replay_process( + cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None, + fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, disable_progress: bool = False +) -> List[capnp._DynamicStructReader]: + if isinstance(cfg, Iterable): + cfgs = list(cfg) + else: + cfgs = [cfg] + + all_msgs = migrate_all(lr, old_logtime=True, camera_states=any(len(cfg.vision_pubs) != 0 for cfg in cfgs)) + process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, disable_progress) if return_all_logs: - keys = set(cfg.subs) + keys = {m.which() for m in process_logs} modified_logs = [m for m in all_msgs if m.which() not in keys] modified_logs.extend(process_logs) - modified_logs.sort(key=lambda m: m.logMonoTime) + modified_logs.sort(key=lambda m: int(m.logMonoTime)) log_msgs = modified_logs else: log_msgs = process_logs @@ -408,202 +617,131 @@ def replay_process(cfg, lr, frs=None, fingerprint=None, return_all_logs=False, c return log_msgs -def _replay_single_process( - cfg: ProcessConfig, lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]], +def _replay_multi_process( + cfgs: List[ProcessConfig], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]], fingerprint: Optional[str], custom_params: Optional[Dict[str, Any]], disable_progress: bool -): - with OpenpilotPrefix(): - controlsState = None - initialized = False - if cfg.proc_name == "controlsd": - for msg in lr: - if msg.which() == "controlsState": - controlsState = msg.controlsState - if initialized: - break - elif msg.which() == "carEvents": - initialized = car.CarEvent.EventName.controlsInitializing not in [e.name for e in msg.carEvents] - - assert controlsState is not None and initialized, "controlsState never initialized" - - if fingerprint is not None: - setup_env(cfg=cfg, controlsState=controlsState, lr=lr, fingerprint=fingerprint, custom_params=custom_params) - else: - CP = next((m.carParams for m in lr if m.which() == "carParams"), None) - assert CP is not None or "carParams" not in cfg.pubs, "carParams are missing and process needs it" - setup_env(cfg=cfg, CP=CP, controlsState=controlsState, lr=lr, custom_params=custom_params) - - if cfg.config_callback is not None: - params = Params() - cfg.pubs, cfg.main_pub, cfg.main_pub_drained = cfg.config_callback(params, cfg) - - all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) - pub_msgs = [msg for msg in all_msgs if msg.which() in set(cfg.pubs)] - - with ReplayContext(cfg) as rc: - pm = messaging.PubMaster(cfg.pubs) - sockets = {s: messaging.sub_sock(s, timeout=100) for s in cfg.subs} - - vipc_server = None - if len(cfg.vision_pubs) != 0: - assert frs is not None, "frs must be provided when replaying process using vision streams" - assert all(meta_from_camera_state(st) is not None for st in cfg.vision_pubs),f"undefined vision stream spotted, probably misconfigured process: {cfg.vision_pubs}" - assert all(st in frs for st in cfg.vision_pubs), f"frs for this process must contain following vision streams: {cfg.vision_pubs}" - vipc_server = setup_vision_ipc(cfg, lr) - - managed_processes[cfg.proc_name].prepare() - managed_processes[cfg.proc_name].start() - - if cfg.init_callback is not None: - cfg.init_callback(rc, pm, all_msgs, fingerprint) - - log_msgs, msg_queue = [], [] - try: - # Wait for process to startup - with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(cfg.proc_name)}"): - while not all(pm.all_readers_updated(s) for s in cfg.pubs if s not in cfg.ignore_alive_pubs): - time.sleep(0) - - # Do the replay - cnt = 0 - for msg in tqdm(pub_msgs, disable=disable_progress): - with Timeout(cfg.timeout, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"): - resp_sockets, end_of_cycle = cfg.subs, True - if cfg.should_recv_callback is not None: - end_of_cycle = cfg.should_recv_callback(msg, cfg, cnt) - - msg_queue.append(msg) - if end_of_cycle: - rc.wait_for_recv_called() - - # call recv to let sub-sockets reconnect, after we know the process is ready - if cnt == 0: - for s in sockets.values(): - messaging.recv_one_or_none(s) - - # empty recv on drained pub indicates the end of messages, only do that if there're any - trigger_empty_recv = False - if cfg.main_pub and cfg.main_pub_drained: - trigger_empty_recv = next((True for m in msg_queue if m.which() == cfg.main_pub), False) - - for m in msg_queue: - pm.send(m.which(), m.as_builder()) - # send frames if needed - if vipc_server is not None and m.which() in cfg.vision_pubs: - camera_state = getattr(m, m.which()) - camera_meta = meta_from_camera_state(m.which()) - assert frs is not None - img = frs[m.which()].get(camera_state.frameId, pix_fmt="nv12")[0] - vipc_server.send(camera_meta.stream, img.flatten().tobytes(), - camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof) - msg_queue = [] - - rc.unlock_sockets() - rc.wait_for_next_recv(trigger_empty_recv) +) -> List[capnp._DynamicStructReader]: + if fingerprint is not None: + params_config = generate_params_config(lr=lr, fingerprint=fingerprint, custom_params=custom_params) + env_config = generate_environ_config(fingerprint=fingerprint) + else: + CP = next((m.carParams for m in lr if m.which() == "carParams"), None) + params_config = generate_params_config(lr=lr, CP=CP, custom_params=custom_params) + env_config = generate_environ_config(CP=CP) + + # validate frs and vision pubs + for cfg in cfgs: + if len(cfg.vision_pubs) == 0: + continue + + assert frs is not None, "frs must be provided when replaying process using vision streams" + assert all(meta_from_camera_state(st) is not None for st in cfg.vision_pubs),f"undefined vision stream spotted, probably misconfigured process: {cfg.vision_pubs}" + assert all(st in frs for st in cfg.vision_pubs), f"frs for this process must contain following vision streams: {cfg.vision_pubs}" + + all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) + log_msgs = [] + try: + containers = [] + for cfg in cfgs: + container = ProcessContainer(cfg) + container.start(params_config, env_config, all_msgs, fingerprint) + containers.append(container) + + all_pubs = set([pub for container in containers for pub in container.pubs]) + all_subs = set([sub for container in containers for sub in container.subs]) + lr_pubs = all_pubs - all_subs + pubs_to_containers = {pub: [container for container in containers if pub in container.pubs] for pub in all_pubs} + + pub_msgs = [msg for msg in all_msgs if msg.which() in lr_pubs] + # external queue for messages taken from logs; internal queue for messages generated by processes, which will be republished + external_pub_queue: List[capnp._DynamicStructReader] = pub_msgs.copy() + internal_pub_queue: List[capnp._DynamicStructReader] = [] + # heap for maintaining the order of messages generated by processes, where each element: (logMonoTime, index in internal_pub_queue) + internal_pub_index_heap: List[Tuple[int, int]] = [] + + pbar = tqdm(total=len(external_pub_queue), disable=disable_progress) + while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)): + if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]): + msg = external_pub_queue.pop(0) + pbar.update(1) + else: + _, index = heapq.heappop(internal_pub_index_heap) + msg = internal_pub_queue[index] + + target_containers = pubs_to_containers[msg.which()] + for container in target_containers: + output_msgs = container.run_step(msg, frs) + for m in output_msgs: + if m.which() in all_pubs: + internal_pub_queue.append(m) + heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) + log_msgs.extend(output_msgs) + finally: + for container in containers: + container.stop() - for s in resp_sockets: - ms = messaging.drain_sock(sockets[s]) - for m in ms: - m = m.as_builder() - m.logMonoTime = msg.logMonoTime - log_msgs.append(m.as_reader()) - cnt += 1 - proc = managed_processes[cfg.proc_name].proc - assert(proc and proc.is_alive()) - finally: - managed_processes[cfg.proc_name].signal(signal.SIGKILL) - managed_processes[cfg.proc_name].stop() + return log_msgs - return log_msgs +def generate_params_config(lr=None, CP=None, fingerprint=None, custom_params=None) -> Dict[str, Any]: + params_dict = { + "OpenpilotEnabledToggle": True, + "Passive": False, + "DisengageOnAccelerator": True, + "DisableLogging": False, + } + + if custom_params is not None: + params_dict.update(custom_params) + if lr is not None: + has_ublox = any(msg.which() == "ubloxGnss" for msg in lr) + params_dict["UbloxAvailable"] = has_ublox + is_rhd = next((msg.driverMonitoringState.isRHD for msg in lr if msg.which() == "driverMonitoringState"), False) + params_dict["IsRhdDetected"] = is_rhd -def setup_vision_ipc(cfg, lr): - assert len(cfg.vision_pubs) != 0 + if CP is not None: + if CP.alternativeExperience == ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS: + params_dict["DisengageOnAccelerator"] = False - device_type = next(msg.initData.deviceType for msg in lr if msg.which() == "initData") + if fingerprint is None: + if CP.fingerprintSource == "fw": + params_dict["CarParamsCache"] = CP.as_builder().to_bytes() - vipc_server = VisionIpcServer("camerad") - streams_metas = available_streams(lr) - for meta in streams_metas: - if meta.camera_state in cfg.vision_pubs: - vipc_server.create_buffers(meta.stream, 2, False, *meta.frame_sizes[device_type]) - vipc_server.start_listener() + if CP.openpilotLongitudinalControl: + params_dict["ExperimentalLongitudinalEnabled"] = True - return vipc_server + return params_dict -def setup_env(cfg=None, CP=None, controlsState=None, lr=None, fingerprint=None, custom_params=None, log_dir=None): +def generate_environ_config(CP=None, fingerprint=None, log_dir=None) -> Dict[str, Any]: + environ_dict = {} if platform.system() != "Darwin": - os.environ["PARAMS_ROOT"] = "/dev/shm/params" + environ_dict["PARAMS_ROOT"] = "/dev/shm/params" if log_dir is not None: - os.environ["LOG_ROOT"] = log_dir + environ_dict["LOG_ROOT"] = log_dir - params = Params() - params.clear_all() - params.put_bool("OpenpilotEnabledToggle", True) - params.put_bool("Passive", False) - params.put_bool("DisengageOnAccelerator", True) - params.put_bool("DisableLogging", False) - if custom_params is not None: - for k, v in custom_params.items(): - if type(v) == bool: - params.put_bool(k, v) - else: - params.put(k, v) + environ_dict["NO_RADAR_SLEEP"] = "1" + environ_dict["REPLAY"] = "1" - os.environ["NO_RADAR_SLEEP"] = "1" - os.environ["REPLAY"] = "1" - if fingerprint is not None: - os.environ['SKIP_FW_QUERY'] = "1" - os.environ['FINGERPRINT'] = fingerprint + # Regen or python process + if CP is not None and fingerprint is None: + if CP.fingerprintSource == "fw": + environ_dict['SKIP_FW_QUERY'] = "" + environ_dict['FINGERPRINT'] = "" + else: + environ_dict['SKIP_FW_QUERY'] = "1" + environ_dict['FINGERPRINT'] = CP.carFingerprint + elif fingerprint is not None: + environ_dict['SKIP_FW_QUERY'] = "1" + environ_dict['FINGERPRINT'] = fingerprint else: - os.environ["SKIP_FW_QUERY"] = "" - os.environ["FINGERPRINT"] = "" - - if lr is not None: - services = {m.which() for m in lr} - params.put_bool("UbloxAvailable", "ubloxGnss" in services) + environ_dict["SKIP_FW_QUERY"] = "" + environ_dict["FINGERPRINT"] = "" - if cfg is not None: - # Clear all custom processConfig environment variables - for config in CONFIGS: - for k, _ in config.environ.items(): - if k in os.environ: - del os.environ[k] - - os.environ.update(cfg.environ) - os.environ['PROC_NAME'] = cfg.proc_name - - if cfg is not None and cfg.simulation: - os.environ["SIMULATION"] = "1" - elif "SIMULATION" in os.environ: - del os.environ["SIMULATION"] - - # Initialize controlsd with a controlsState packet - if controlsState is not None: - params.put("ReplayControlsState", controlsState.as_builder().to_bytes()) - else: - params.remove("ReplayControlsState") - - # Regen or python process - if CP is not None: - if CP.alternativeExperience == ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS: - params.put_bool("DisengageOnAccelerator", False) - - if fingerprint is None: - if CP.fingerprintSource == "fw": - params.put("CarParamsCache", CP.as_builder().to_bytes()) - os.environ['SKIP_FW_QUERY'] = "" - os.environ['FINGERPRINT'] = "" - else: - os.environ['SKIP_FW_QUERY'] = "1" - os.environ['FINGERPRINT'] = CP.carFingerprint - - if CP.openpilotLongitudinalControl: - params.put_bool("ExperimentalLongitudinalEnabled", True) + return environ_dict -def check_openpilot_enabled(msgs): +def check_openpilot_enabled(msgs: Union[LogReader, List[capnp._DynamicStructReader]]) -> bool: cur_enabled_count = 0 max_enabled_count = 0 for msg in msgs: diff --git a/selfdrive/test/process_replay/regen.py b/selfdrive/test/process_replay/regen.py index 59f0fe703f..f94f2198a4 100755 --- a/selfdrive/test/process_replay/regen.py +++ b/selfdrive/test/process_replay/regen.py @@ -1,336 +1,95 @@ #!/usr/bin/env python3 -import bz2 import os -import time -import multiprocessing import argparse -from tqdm import tqdm -# run DM procs -os.environ["USE_WEBCAM"] = "1" +import time +import capnp + +from typing import Union, Iterable, Optional, List, Any, Dict, Tuple -import cereal.messaging as messaging -from cereal import car -from cereal.services import service_list -from cereal.visionipc import VisionIpcServer, VisionStreamType -from common.params import Params -from common.realtime import Ratekeeper, DT_MDL, DT_DMON, sec_since_boot -from common.transformations.camera import eon_f_frame_size, eon_d_frame_size, tici_f_frame_size, tici_d_frame_size, tici_e_frame_size -from panda.python import Panda -from selfdrive.car.toyota.values import EPS_SCALE -from selfdrive.manager.process import ensure_running -from selfdrive.manager.process_config import managed_processes -from selfdrive.test.process_replay.process_replay import CONFIGS, FAKEDATA, setup_env, check_openpilot_enabled -from selfdrive.test.process_replay.migration import migrate_all +from selfdrive.test.process_replay.process_replay import CONFIGS, FAKEDATA, replay_process, get_process_config, check_openpilot_enabled, get_custom_params_from_lr from selfdrive.test.update_ci_routes import upload_route from tools.lib.route import Route from tools.lib.framereader import FrameReader from tools.lib.logreader import LogReader +from tools.lib.helpers import save_log -def replay_panda_states(s, msgs): - pm = messaging.PubMaster([s, 'peripheralState']) - rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None) - smsgs = [m for m in msgs if m.which() in ['pandaStates', 'pandaStateDEPRECATED']] - - # TODO: safety param migration should be handled automatically - safety_param_migration = { - "TOYOTA PRIUS 2017": EPS_SCALE["TOYOTA PRIUS 2017"] | Panda.FLAG_TOYOTA_STOCK_LONGITUDINAL, - "TOYOTA RAV4 2017": EPS_SCALE["TOYOTA RAV4 2017"] | Panda.FLAG_TOYOTA_ALT_BRAKE, - "KIA EV6 2022": Panda.FLAG_HYUNDAI_EV_GAS | Panda.FLAG_HYUNDAI_CANFD_HDA2, - } - - # Migrate safety param base on carState - cp = [m for m in msgs if m.which() == 'carParams'][0].carParams - if cp.carFingerprint in safety_param_migration: - safety_param = safety_param_migration[cp.carFingerprint] - elif len(cp.safetyConfigs): - safety_param = cp.safetyConfigs[0].safetyParam - if cp.safetyConfigs[0].safetyParamDEPRECATED != 0: - safety_param = cp.safetyConfigs[0].safetyParamDEPRECATED - else: - safety_param = cp.safetyParamDEPRECATED - - while True: - for m in smsgs: - if m.which() == 'pandaStateDEPRECATED': - new_m = messaging.new_message('pandaStates', 1) - new_m.pandaStates[0] = m.pandaStateDEPRECATED - new_m.pandaStates[0].safetyParam = safety_param - pm.send(s, new_m) - else: - new_m = m.as_builder() - new_m.pandaStates[-1].safetyParam = safety_param - new_m.logMonoTime = int(sec_since_boot() * 1e9) - pm.send(s, new_m) - - new_m = messaging.new_message('peripheralState') - pm.send('peripheralState', new_m) - - rk.keep_time() - - -def replay_manager_state(s, msgs): - pm = messaging.PubMaster([s, ]) - rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None) - - while True: - new_m = messaging.new_message('managerState') - new_m.managerState.processes = [{'name': name, 'running': True} for name in managed_processes] - pm.send(s, new_m) - rk.keep_time() - - -def replay_device_state(s, msgs): - pm = messaging.PubMaster([s, ]) - rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None) - smsgs = [m for m in msgs if m.which() == s] - while True: - for m in smsgs: - new_m = m.as_builder() - new_m.logMonoTime = int(sec_since_boot() * 1e9) - new_m.deviceState.freeSpacePercent = 50 - new_m.deviceState.memoryUsagePercent = 50 - pm.send(s, new_m) - rk.keep_time() - - -def replay_sensor_event(s, msgs): - pm = messaging.PubMaster([s, ]) - rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None) - smsgs = [m for m in msgs if m.which() == s] - while True: - for m in smsgs: - m = m.as_builder() - m.logMonoTime = int(sec_since_boot() * 1e9) - getattr(m, m.which()).timestamp = m.logMonoTime - pm.send(m.which(), m) - rk.keep_time() - - -def replay_service(s, msgs): - pm = messaging.PubMaster([s, ]) - rk = Ratekeeper(service_list[s].frequency, print_delay_threshold=None) - smsgs = [m for m in msgs if m.which() == s] - while True: - for m in smsgs: - new_m = m.as_builder() - new_m.logMonoTime = int(sec_since_boot() * 1e9) - pm.send(s, new_m) - rk.keep_time() - - -def replay_cameras(lr, frs, disable_tqdm=False): - eon_cameras = [ - ("roadCameraState", DT_MDL, eon_f_frame_size, VisionStreamType.VISION_STREAM_ROAD), - ("driverCameraState", DT_DMON, eon_d_frame_size, VisionStreamType.VISION_STREAM_DRIVER), - ] - tici_cameras = [ - ("roadCameraState", DT_MDL, tici_f_frame_size, VisionStreamType.VISION_STREAM_ROAD), - ("wideRoadCameraState", DT_MDL, tici_e_frame_size, VisionStreamType.VISION_STREAM_WIDE_ROAD), - ("driverCameraState", DT_DMON, tici_d_frame_size, VisionStreamType.VISION_STREAM_DRIVER), - ] - - def replay_camera(s, stream, dt, vipc_server, frames, size): - services = [(s, stream)] - pm = messaging.PubMaster([s for s, _ in services]) - rk = Ratekeeper(1 / dt, print_delay_threshold=None) - - img = b"\x00" * int(size[0] * size[1] * 3 / 2) - while True: - if frames is not None: - img = frames[rk.frame % len(frames)] - - rk.keep_time() - for s, stream in services: - m = messaging.new_message(s) - msg = getattr(m, s) - msg.frameId = rk.frame - msg.timestampSof = m.logMonoTime - msg.timestampEof = m.logMonoTime - pm.send(s, m) - - vipc_server.send(stream, img, msg.frameId, msg.timestampSof, msg.timestampEof) - - init_data = [m for m in lr if m.which() == 'initData'][0] - cameras = tici_cameras if (init_data.initData.deviceType in ['tici', 'tizi']) else eon_cameras - - # init vipc server and cameras - p = [] - vs = VisionIpcServer("camerad") - for (s, dt, size, stream) in cameras: - fr = frs.get(s, None) - - frames = None - if fr is not None: - print(f"Decompressing frames {s}") - frames = [] - for i in tqdm(range(fr.frame_count), disable=disable_tqdm): - img = fr.get(i, pix_fmt='nv12')[0] - frames.append(img.flatten().tobytes()) - - vs.create_buffers(stream, 40, False, size[0], size[1]) - p.append(multiprocessing.Process(target=replay_camera, - args=(s, stream, dt, vs, frames, size))) - - vs.start_listener() - return vs, p - - -def regen_segment(lr, frs=None, daemons="all", outdir=FAKEDATA, disable_tqdm=False): +def regen_segment( + lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None, + daemons: Union[str, Iterable[str]] = "all", disable_tqdm: bool = False +) -> List[capnp._DynamicStructReader]: if not isinstance(daemons, str) and not hasattr(daemons, "__iter__"): raise ValueError("whitelist_proc must be a string or iterable") - lr = migrate_all(lr) - if frs is None: - frs = dict() - - # Get and setup initial state - CP = [m for m in lr if m.which() == 'carParams'][0].carParams - controlsState = [m for m in lr if m.which() == 'controlsState'][0].controlsState - liveCalibration = [m for m in lr if m.which() == 'liveCalibration'][0] - - setup_env(CP=CP, controlsState=controlsState, log_dir=outdir) - - params = Params() - params.put("CalibrationParams", liveCalibration.as_builder().to_bytes()) - - vs, cam_procs = replay_cameras(lr, frs, disable_tqdm=disable_tqdm) - fake_daemons = { - 'sensord': [ - multiprocessing.Process(target=replay_sensor_event, args=('accelerometer', lr)), - multiprocessing.Process(target=replay_sensor_event, args=('gyroscope', lr)), - multiprocessing.Process(target=replay_sensor_event, args=('magnetometer', lr)), - ], - 'pandad': [ - multiprocessing.Process(target=replay_service, args=('can', lr)), - multiprocessing.Process(target=replay_service, args=('ubloxRaw', lr)), - multiprocessing.Process(target=replay_panda_states, args=('pandaStates', lr)), - ], - 'manager': [ - multiprocessing.Process(target=replay_manager_state, args=('managerState', lr)), - ], - 'thermald': [ - multiprocessing.Process(target=replay_device_state, args=('deviceState', lr)), - ], - 'rawgpsd': [ - multiprocessing.Process(target=replay_service, args=('qcomGnss', lr)), - multiprocessing.Process(target=replay_service, args=('gpsLocation', lr)), - ], - 'camerad': [ - *cam_procs, - ], - } - # TODO add configs for modeld, dmonitoringmodeld - fakeable_daemons = {} - for config in CONFIGS: - processes = [ - multiprocessing.Process(target=replay_service, args=(msg, lr)) - for msg in config.subs - ] - fakeable_daemons[config.proc_name] = processes + all_msgs = sorted(lr, key=lambda m: m.logMonoTime) + custom_params = get_custom_params_from_lr(all_msgs) - additional_fake_daemons = {} if daemons != "all": - additional_fake_daemons = fakeable_daemons if isinstance(daemons, str): raise ValueError(f"Invalid value for daemons: {daemons}") + replayed_processes = [] for d in daemons: - if d in fake_daemons: - raise ValueError(f"Running daemon {d} is not supported!") - - if d in fakeable_daemons: - del additional_fake_daemons[d] - - all_fake_daemons = {**fake_daemons, **additional_fake_daemons} - - try: - # TODO: make first run of onnxruntime CUDA provider fast - if "modeld" not in all_fake_daemons: - managed_processes["modeld"].start() - if "dmonitoringmodeld" not in all_fake_daemons: - managed_processes["dmonitoringmodeld"].start() - time.sleep(5) - - # start procs up - ignore = list(all_fake_daemons.keys()) \ - + ['ui', 'manage_athenad', 'uploader', 'soundd', 'micd', 'navd'] - - print("Faked daemons:", ", ".join(all_fake_daemons.keys())) - print("Running daemons:", ", ".join([key for key in managed_processes.keys() if key not in ignore])) - - ensure_running(managed_processes.values(), started=True, params=Params(), CP=car.CarParams(), not_run=ignore) - for procs in all_fake_daemons.values(): - for p in procs: - p.start() - - for _ in tqdm(range(60), disable=disable_tqdm): - # ensure all procs are running - for d, procs in all_fake_daemons.items(): - for p in procs: - if not p.is_alive(): - raise Exception(f"{d}'s {p.name} died") - time.sleep(1) - finally: - # kill everything - for p in managed_processes.values(): - p.stop() - for procs in all_fake_daemons.values(): - for p in procs: - p.terminate() + cfg = get_process_config(d) + replayed_processes.append(cfg) + else: + replayed_processes = CONFIGS - del vs + print("Replayed processes:", [p.proc_name for p in replayed_processes]) + print("\n\n", "*"*30, "\n\n", sep="") - segment = params.get("CurrentRoute", encoding='utf-8') + "--0" - seg_path = os.path.join(outdir, segment) - # check to make sure openpilot is engaged in the route - if not check_openpilot_enabled(LogReader(os.path.join(seg_path, "rlog"))): - raise Exception(f"Route did not engage for long enough: {segment}") + output_logs = replay_process(replayed_processes, all_msgs, frs, return_all_logs=True, custom_params=custom_params, disable_progress=disable_tqdm) - return seg_path + return output_logs -def regen_and_save(route, sidx, daemons="all", upload=False, use_route_meta=False, outdir=FAKEDATA, disable_tqdm=False): +def setup_data_readers(route: str, sidx: int, use_route_meta: bool) -> Tuple[LogReader, Dict[str, Any]]: if use_route_meta: r = Route(route) lr = LogReader(r.log_paths()[sidx]) - fr = FrameReader(r.camera_paths()[sidx]) - if r.ecamera_paths()[sidx] is not None: - wfr = FrameReader(r.ecamera_paths()[sidx]) - else: - wfr = None + frs = {} + if len(r.camera_paths()) > sidx and r.camera_paths()[sidx] is not None: + frs['roadCameraState'] = FrameReader(r.camera_paths()[sidx]) + if len(r.ecamera_paths()) > sidx and r.ecamera_paths()[sidx] is not None: + frs['wideCameraState'] = FrameReader(r.ecamera_paths()[sidx]) + if len(r.dcamera_paths()) > sidx and r.dcamera_paths()[sidx] is not None: + frs['driverCameraState'] = FrameReader(r.dcamera_paths()[sidx]) else: lr = LogReader(f"cd:/{route.replace('|', '/')}/{sidx}/rlog.bz2") - fr = FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/fcamera.hevc") - device_type = next(iter(lr)).initData.deviceType - if device_type in ['tici', 'tizi']: - wfr = FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/ecamera.hevc") - else: - wfr = None - - frs = {'roadCameraState': fr} - if wfr is not None: - frs['wideRoadCameraState'] = wfr - rpath = regen_segment(lr, frs, daemons, outdir=outdir, disable_tqdm=disable_tqdm) + frs = { + 'roadCameraState': FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/fcamera.hevc"), + 'driverCameraState': FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/dcamera.hevc"), + } + if next((True for m in lr if m.which() == "wideRoadCameraState"), False): + frs['wideRoadCameraState'] = FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/ecamera.hevc") - # compress raw rlog before uploading - with open(os.path.join(rpath, "rlog"), "rb") as f: - data = bz2.compress(f.read()) - with open(os.path.join(rpath, "rlog.bz2"), "wb") as f: - f.write(data) - os.remove(os.path.join(rpath, "rlog")) + return lr, frs - lr = LogReader(os.path.join(rpath, 'rlog.bz2')) - controls_state_active = [m.controlsState.active for m in lr if m.which() == 'controlsState'] - assert any(controls_state_active), "Segment did not engage" - relr = os.path.relpath(rpath) +def regen_and_save( + route: str, sidx: int, daemons: Union[str, Iterable[str]] = "all", outdir: str = FAKEDATA, + upload: bool = False, use_route_meta: bool = False, disable_tqdm: bool = False +) -> str: + lr, frs = setup_data_readers(route, sidx, use_route_meta) + output_logs = regen_segment(lr, frs, daemons, disable_tqdm=disable_tqdm) + + log_dir = os.path.join(outdir, time.strftime("%Y-%m-%d--%H-%M-%S--0", time.gmtime())) + rel_log_dir = os.path.relpath(log_dir) + rpath = os.path.join(log_dir, "rlog.bz2") + + os.makedirs(log_dir) + save_log(rpath, output_logs, compress=True) + + print("\n\n", "*"*30, "\n\n", sep="") + print("New route:", rel_log_dir, "\n") + + if not check_openpilot_enabled(output_logs): + raise Exception("Route did not engage for long enough") - print("\n\n", "*"*30, "\n\n") - print("New route:", relr, "\n") if upload: - upload_route(relr, exclude_patterns=['*.hevc', ]) - return relr + upload_route(rel_log_dir) + + return rel_log_dir if __name__ == "__main__": @@ -348,4 +107,4 @@ if __name__ == "__main__": parser.add_argument("seg", type=int, help="Segment in source route") args = parser.parse_args() - regen_and_save(args.route, args.seg, args.whitelist_procs, args.upload, outdir=args.outdir) + regen_and_save(args.route, args.seg, daemons=args.whitelist_procs, upload=args.upload, outdir=args.outdir) diff --git a/selfdrive/test/process_replay/test_processes.py b/selfdrive/test/process_replay/test_processes.py index 1b07081cd8..1a717311bb 100755 --- a/selfdrive/test/process_replay/test_processes.py +++ b/selfdrive/test/process_replay/test_processes.py @@ -82,7 +82,7 @@ def run_test_process(data): assert os.path.exists(cur_log_fn), f"Cannot find log to upload: {cur_log_fn}" upload_file(cur_log_fn, os.path.basename(cur_log_fn)) os.remove(cur_log_fn) - return (segment, cfg.proc_name, cfg.subtest_name, res) + return (segment, cfg.proc_name, res) def get_log_data(segment): @@ -224,24 +224,24 @@ if __name__ == "__main__": if cfg.proc_name not in tested_procs: continue - cur_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}{cfg.subtest_name}_{cur_commit}.bz2") + cur_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{cur_commit}.bz2") if args.update_refs: # reference logs will not exist if routes were just regenerated ref_log_path = get_url(*segment.rsplit("--", 1)) else: - ref_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}{cfg.subtest_name}_{ref_commit}.bz2") + ref_log_fn = os.path.join(FAKEDATA, f"{segment}_{cfg.proc_name}_{ref_commit}.bz2") ref_log_path = ref_log_fn if os.path.exists(ref_log_fn) else BASE_URL + os.path.basename(ref_log_fn) dat = None if args.upload_only else log_data[segment] pool_args.append((segment, cfg, args, cur_log_fn, ref_log_path, dat)) - log_paths[segment][cfg.proc_name + cfg.subtest_name]['ref'] = ref_log_path - log_paths[segment][cfg.proc_name + cfg.subtest_name]['new'] = cur_log_fn + log_paths[segment][cfg.proc_name]['ref'] = ref_log_path + log_paths[segment][cfg.proc_name]['new'] = cur_log_fn results: Any = defaultdict(dict) p2 = pool.map(run_test_process, pool_args) - for (segment, proc, subtest_name, result) in tqdm(p2, desc="Running Tests", total=len(pool_args)): + for (segment, proc, result) in tqdm(p2, desc="Running Tests", total=len(pool_args)): if not args.upload_only: - results[segment][proc + subtest_name] = result + results[segment][proc] = result diff1, diff2, failed = format_diff(results, log_paths, ref_commit) if not upload: