diff --git a/selfdrive/test/process_replay/compare_logs.py b/selfdrive/test/process_replay/compare_logs.py index 821429a70f..48752d2222 100755 --- a/selfdrive/test/process_replay/compare_logs.py +++ b/selfdrive/test/process_replay/compare_logs.py @@ -83,7 +83,6 @@ def compare_logs(log1, log2, ignore_fields=None, ignore_msgs=None, tolerance=Non diff = [] for msg1, msg2 in zip(log1, log2): if msg1.which() != msg2.which(): - print(msg1.which(), msg2.which()) raise Exception("msgs not aligned between logs") msg1 = remove_ignored_fields(msg1, ignore_fields) diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 4ab2d30315..65415b4151 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -5,6 +5,7 @@ import sys import threading import time import signal +from collections import defaultdict from dataclasses import dataclass, field from typing import Dict, List, Optional, Callable @@ -43,6 +44,8 @@ class ProcessConfig: subtest_name: str = "" field_tolerances: Dict[str, float] = field(default_factory=dict) timeout: int = 30 + iter_wait_time: float = 0.0 + drain_sockets: bool = False def wait_for_event(evt): @@ -70,7 +73,7 @@ class FakeSocket: self.recv_called.set() wait_for_event(self.recv_ready) self.recv_ready.clear() - return self.data.pop() + return self.data.pop(0) def send(self, data): if self.wait: @@ -137,35 +140,24 @@ class FakeSubMaster(messaging.SubMaster): class FakePubMaster(messaging.PubMaster): def __init__(self, services): # pylint: disable=super-init-not-called - self.data = {} + self.data = defaultdict(list) self.sock = {} self.last_updated = None for s in services: - try: - data = messaging.new_message(s) - except capnp.lib.capnp.KjException: - data = messaging.new_message(s, 0) - self.data[s] = data.as_reader() self.sock[s] = DumbSocket() - self.send_called = threading.Event() - self.get_called = threading.Event() def send(self, s, dat): self.last_updated = s if isinstance(dat, bytes): - self.data[s] = log.Event.from_bytes(dat) + self.data[s].append(log.Event.from_bytes(dat)) else: - self.data[s] = dat.as_reader() - self.send_called.set() - wait_for_event(self.get_called) - self.get_called.clear() + self.data[s].append(dat.as_reader()) + + def drain(self, s): + msgs = self.data[s] + self.data[s] = [] - def wait_for_msg(self): - wait_for_event(self.send_called) - self.send_called.clear() - dat = self.data[self.last_updated] - self.get_called.set() - return dat + return msgs def fingerprint(msgs, fsm, can_sock, fingerprint): @@ -252,14 +244,12 @@ def torqued_rcv_callback(msg, CP, cfg, fsm): return recv_socks, fsm.frame == 0 or msg.which() == 'liveLocationKalman' -def ublox_rcv_callback(msg, CP, cfg, fsm): - msg_class, msg_id = msg.ubloxRaw[2:4] - if (msg_class, msg_id) in {(1, 7 * 16)}: - return ["gpsLocationExternal"], True - elif (msg_class, msg_id) in {(2, 1 * 16 + 5), (10, 9)}: - return ["ubloxGnss"], True - else: - return [], False +def locationd_rcv_callback(msg, CP, cfg, fsm): + trigger_msg = "accelerometer" if CP is not None and CP.notCar else "cameraOdometry" + if msg.which() == trigger_msg: + return ["liveLocationKalman"], True + + return [], False CONFIGS = [ @@ -286,7 +276,7 @@ CONFIGS = [ proc_name="radard", pub_sub={ "can": ["radarState", "liveTracks"], - "liveParameters": [], "carState": [], "modelV2": [], + "carState": [], "modelV2": [], }, ignore=["logMonoTime", "valid", "radarState.cumLagMs"], init_callback=get_car_params, @@ -309,8 +299,8 @@ CONFIGS = [ ProcessConfig( proc_name="calibrationd", pub_sub={ - "carState": ["liveCalibration"], - "cameraOdometry": [], + "carState": [], + "cameraOdometry": ["liveCalibration"], "carParams": [], }, ignore=["logMonoTime", "valid"], @@ -335,12 +325,13 @@ CONFIGS = [ proc_name="locationd", pub_sub={ "cameraOdometry": ["liveLocationKalman"], - "accelerometer": [], "gyroscope": [], - "gpsLocationExternal": [], "liveCalibration": [], "carState": [], "gpsLocation": [], + "accelerometer": ["liveLocationKalman"], "gyroscope": [], + "gpsLocationExternal": [], "liveCalibration": [], + "carParams": [], "carState": [], "gpsLocation": [], }, ignore=["logMonoTime", "valid"], init_callback=get_car_params, - should_recv_callback=None, + should_recv_callback=locationd_rcv_callback, tolerance=NUMPY_TOLERANCE, fake_pubsubmaster=False, ), @@ -363,9 +354,11 @@ CONFIGS = [ }, ignore=["logMonoTime"], init_callback=None, - should_recv_callback=ublox_rcv_callback, + should_recv_callback=None, tolerance=None, fake_pubsubmaster=False, + iter_wait_time=0.01, + drain_sockets=True ), ProcessConfig( proc_name="laikad", @@ -383,8 +376,8 @@ CONFIGS = [ ProcessConfig( proc_name="torqued", pub_sub={ + "carControl": [], "carState": [], "liveLocationKalman": ["liveTorqueParameters"], - "carState": [], "controlsState": [], }, ignore=["logMonoTime"], init_callback=get_car_params, @@ -463,6 +456,7 @@ def python_replay_process(cfg, lr, fingerprint=None): fsm = FakeSubMaster(pub_sockets, **cfg.submaster_config) fpm = FakePubMaster(sub_sockets) + can_sock = None args = (fsm, fpm) if 'can' in list(cfg.pub_sub.keys()): can_sock = FakeSocket() @@ -514,12 +508,13 @@ def python_replay_process(cfg, lr, fingerprint=None): log_msgs, msg_queue = [], [] for msg in pub_msgs: + recv_socks = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is not None: - recv_socks, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm) + _, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm) else: - recv_socks = [s for s in cfg.pub_sub[msg.which()] if - (fsm.frame + 1) % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0] - should_recv = bool(len(recv_socks)) + socks = [s for s in cfg.pub_sub[msg.which()] if + (fsm.frame + 1) % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0] + should_recv = bool(len(socks)) if msg.which() == 'can': can_sock.send(msg.as_builder().to_bytes()) @@ -530,14 +525,18 @@ def python_replay_process(cfg, lr, fingerprint=None): fsm.update_msgs(msg.logMonoTime / 1e9, msg_queue) msg_queue = [] - recv_cnt = len(recv_socks) - while recv_cnt > 0: - m = fpm.wait_for_msg().as_builder() - m.logMonoTime = msg.logMonoTime - m = m.as_reader() + if can_sock is not None: + can_sock.recv_called.wait() + else: + fsm.update_called.wait() + + for s in recv_socks: + ms = fpm.drain(s) + for m in ms: + m = m.as_builder() + m.logMonoTime = msg.logMonoTime + log_msgs.append(m.as_reader()) - log_msgs.append(m) - recv_cnt -= m.which() in recv_socks return log_msgs @@ -550,7 +549,8 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None): pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] # We need to fake SubMaster alive since we can't inject a fake clock - setup_env(simulation=True, cfg=cfg, lr=lr) + CP = [m for m in lr if m.which() == 'carParams'][0].carParams + setup_env(simulation=True, CP=CP, cfg=cfg, lr=lr) if cfg.proc_name == "laikad": ublox = Params().get_bool("UbloxAvailable") @@ -560,6 +560,10 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None): managed_processes[cfg.proc_name].prepare() managed_processes[cfg.proc_name].start() + if cfg.init_callback is not None: + cfg.init_callback(all_msgs, None, None, fingerprint) + CP = car.CarParams.from_bytes(Params().get("CarParams", block=True)) + log_msgs = [] try: # Wait for process to startup @@ -572,11 +576,15 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None): # Do the replay cnt = 0 + curr_CP = None for msg in pub_msgs: with Timeout(cfg.timeout, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"): + if msg.which() == 'carParams': + curr_CP = msg.carParams + resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is not None: - resp_sockets, _ = cfg.should_recv_callback(msg, None, None, None) + resp_sockets, _ = cfg.should_recv_callback(msg, curr_CP, cfg, None) # Make sure all subscribers are connected if len(log_msgs) == 0 and len(resp_sockets) > 0: @@ -587,11 +595,18 @@ def replay_process_with_sockets(cfg, lr, fingerprint=None): while not pm.all_readers_updated(msg.which()): time.sleep(0) + time.sleep(cfg.iter_wait_time) + for s in resp_sockets: - m = messaging.recv_one_retry(sockets[s]) - m = m.as_builder() - m.logMonoTime = msg.logMonoTime - log_msgs.append(m.as_reader()) + msgs = [] + if cfg.drain_sockets: + msgs.extend(messaging.drain_sock(sockets[s], wait_for_one=True)) + else: + msgs = [messaging.recv_one_retry(sockets[s])] + for m in msgs: + m = m.as_builder() + m.logMonoTime = msg.logMonoTime + log_msgs.append(m.as_reader()) cnt += 1 finally: managed_processes[cfg.proc_name].signal(signal.SIGKILL) diff --git a/selfdrive/test/process_replay/ref_commit b/selfdrive/test/process_replay/ref_commit index 310d6300d2..bff328d31c 100644 --- a/selfdrive/test/process_replay/ref_commit +++ b/selfdrive/test/process_replay/ref_commit @@ -1 +1 @@ -c67a0959201829ef07a7d68d5fb59603b3983587 +89ead2916988a97a938f5e2e10855c37fcbf5d65