diff --git a/selfdrive/test/process_replay/README.md b/selfdrive/test/process_replay/README.md index 7e92717f9d..3add95f0a0 100644 --- a/selfdrive/test/process_replay/README.md +++ b/selfdrive/test/process_replay/README.md @@ -12,6 +12,7 @@ Currently the following processes are tested: * radard * plannerd * calibrationd +* ubloxd ## Forks @@ -19,6 +20,6 @@ openpilot forks can use this test with their own reference logs To generate new logs: -`./update-refs.py --no-upload` +`./update_refs.py --no-upload` Then, check in the new logs using git-lfs. Make sure to also include the updated `ref_commit` file. diff --git a/selfdrive/test/process_replay/compare_logs.py b/selfdrive/test/process_replay/compare_logs.py index 8cb4b2453f..397e6de564 100755 --- a/selfdrive/test/process_replay/compare_logs.py +++ b/selfdrive/test/process_replay/compare_logs.py @@ -3,7 +3,6 @@ import bz2 import os import sys import numbers - import dictdiffer if "CI" in os.environ: @@ -59,6 +58,7 @@ def compare_logs(log1, log2, ignore_fields=None, ignore_msgs=None, tolerance=Non ignore_msgs = [] log1, log2 = [list(filter(lambda m: m.which() not in ignore_msgs, log)) for log in (log1, log2)] + if len(log1) != len(log2): raise Exception(f"logs are not same length: {len(log1)} VS {len(log2)}") diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 3b05b3871f..3676f76fe3 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -4,6 +4,7 @@ import os import sys import threading import importlib +import time if "CI" in os.environ: def tqdm(x): @@ -18,13 +19,12 @@ import cereal.messaging as messaging from common.params import Params from cereal.services import service_list from collections import namedtuple - +from selfdrive.manager import managed_processes # Numpy gives different results based on CPU features after version 19 NUMPY_TOLERANCE = 1e-7 ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'init_callback', 'should_recv_callback', 'tolerance']) - def wait_for_event(evt): if not evt.wait(15): if threading.currentThread().getName() == "MainThread": @@ -206,6 +206,15 @@ def calibration_rcv_callback(msg, CP, cfg, fsm): recv_socks = ["liveCalibration"] return recv_socks, fsm.frame == 0 or msg.which() == 'cameraOdometry' +def ublox_rcv_callback(msg): + msg_class, msg_id = msg.ubloxRaw[2:4] + if (msg_class, msg_id) in {(1, 7 * 16)}: + return ["gpsLocationExternal"] + elif (msg_class, msg_id) in {(2, 1 * 16 + 5), (10, 9)}: + return ["ubloxGnss"] + else: + return [] + CONFIGS = [ ProcessConfig( proc_name="controlsd", @@ -285,9 +294,27 @@ CONFIGS = [ should_recv_callback=None, tolerance=NUMPY_TOLERANCE, ), + ProcessConfig( + proc_name="ubloxd", + pub_sub={ + "ubloxRaw": ["ubloxGnss", "gpsLocationExternal"], + }, + ignore=["logMonoTime"], + init_callback=None, + should_recv_callback=ublox_rcv_callback, + tolerance=None, + ), ] def replay_process(cfg, lr): + proc = managed_processes[cfg.proc_name] + if isinstance(proc, str): + return python_replay_process(cfg, lr) + else: + return cpp_replay_process(cfg, lr) + + +def python_replay_process(cfg, lr): sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] pub_sockets = [s for s in cfg.pub_sub.keys() if s != 'can'] @@ -353,3 +380,31 @@ def replay_process(cfg, lr): recv_cnt -= m.which() in recv_socks return log_msgs + +def cpp_replay_process(cfg, lr): + sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] # We get responses here + pm = messaging.PubMaster(cfg.pub_sub.keys()) + sockets = {s : messaging.sub_sock(s, timeout=1000) for s in sub_sockets} + + all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) + pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] + + manager.prepare_managed_process(cfg.proc_name) + manager.start_managed_process(cfg.proc_name) + + time.sleep(1) # We give the process time to start + + log_msgs = [] + for s in sub_sockets: + messaging.recv_one_or_none(sockets[s]) + + for msg in tqdm(pub_msgs): + pm.send(msg.which(), msg.as_builder()) + resp_sockets = sub_sockets if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) + for s in resp_sockets: + response = messaging.recv_one(sockets[s]) + if response is not None: + log_msgs.append(response) + + manager.kill_managed_process(cfg.proc_name) + return log_msgs diff --git a/selfdrive/test/process_replay/ref_commit b/selfdrive/test/process_replay/ref_commit index 6053df0f66..426b01c99f 100644 --- a/selfdrive/test/process_replay/ref_commit +++ b/selfdrive/test/process_replay/ref_commit @@ -1 +1 @@ -d6364d7b9c574790642c273d87ee2e8e46664aab \ No newline at end of file +1ca0db37c9fab1328fe1f86cc884b9bc7123aaee \ No newline at end of file