From c2fc5175fc8f45d806a256706978e4921a8a20c0 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Tue, 21 Mar 2023 16:43:40 -0700 Subject: [PATCH] process replay: prep for laikad subsock + QCOM GPS (#27632) * process replay: prep for laikad subsock * cleanup * fix for qcomGnss * detect ublox * more debug info in err * wip * cleanup old-commit-hash: 7319afbd516ea1b32736d67d8c5be7663dce9ae3 --- selfdrive/locationd/laikad.py | 4 +- .../test/process_replay/process_replay.py | 78 ++++++++++--------- .../test/process_replay/test_processes.py | 11 ++- 3 files changed, 49 insertions(+), 44 deletions(-) diff --git a/selfdrive/locationd/laikad.py b/selfdrive/locationd/laikad.py index 87b0cf9291..be272f1774 100755 --- a/selfdrive/locationd/laikad.py +++ b/selfdrive/locationd/laikad.py @@ -420,11 +420,11 @@ def clear_tmp_cache(): os.mkdir(DOWNLOADS_CACHE_FOLDER) -def main(sm=None, pm=None, qc=None): +def main(sm=None, pm=None): #clear_tmp_cache() use_qcom = not Params().get_bool("UbloxAvailable", block=True) - if use_qcom or (qc is not None and qc): + if use_qcom: raw_gnss_socket = "qcomGnss" else: raw_gnss_socket = "ubloxGnss" diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index f547fce0bc..9c0225796d 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -238,14 +238,14 @@ def torqued_rcv_callback(msg, CP, cfg, fsm): return recv_socks, fsm.frame == 0 or msg.which() == 'liveLocationKalman' -def ublox_rcv_callback(msg): +def ublox_rcv_callback(msg, CP, cfg, fsm): msg_class, msg_id = msg.ubloxRaw[2:4] if (msg_class, msg_id) in {(1, 7 * 16)}: - return ["gpsLocationExternal"] + return ["gpsLocationExternal"], True elif (msg_class, msg_id) in {(2, 1 * 16 + 5), (10, 9)}: - return ["ubloxGnss"] + return ["ubloxGnss"], True else: - return [] + return [], False CONFIGS = [ @@ -364,7 +364,7 @@ CONFIGS = [ init_callback=get_car_params, should_recv_callback=None, tolerance=NUMPY_TOLERANCE, - fake_pubsubmaster=True, + fake_pubsubmaster=False, ), ProcessConfig( proc_name="torqued", @@ -386,7 +386,7 @@ def replay_process(cfg, lr, fingerprint=None): if cfg.fake_pubsubmaster: return python_replay_process(cfg, lr, fingerprint) else: - return cpp_replay_process(cfg, lr, fingerprint) + return replay_process_with_sockets(cfg, lr, fingerprint) def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None): @@ -401,8 +401,12 @@ def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None): os.environ["NO_RADAR_SLEEP"] = "1" os.environ["REPLAY"] = "1" - os.environ['SKIP_FW_QUERY'] = "" - os.environ['FINGERPRINT'] = "" + os.environ["SKIP_FW_QUERY"] = "" + os.environ["FINGERPRINT"] = "" + + if lr is not None: + services = {m.which() for m in lr} + params.put_bool("UbloxAvailable", "ubloxGnss" in services) if lr is not None: services = {m.which() for m in lr} @@ -458,12 +462,6 @@ def python_replay_process(cfg, lr, fingerprint=None): all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] - # laikad needs decision between submaster ubloxGnss and qcomGnss, prio given to ubloxGnss - if cfg.proc_name == "laikad": - args = (*args, not any(m.which() == "ubloxGnss" for m in pub_msgs)) - service = "qcomGnss" if args[2] else "ubloxGnss" - pub_msgs = [m for m in pub_msgs if m.which() == service or m.which() == 'clocks'] - controlsState = None initialized = False for msg in lr: @@ -534,49 +532,53 @@ def python_replay_process(cfg, lr, fingerprint=None): return log_msgs -def cpp_replay_process(cfg, lr, fingerprint=None): - sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] # We get responses here +def replay_process_with_sockets(cfg, lr, fingerprint=None): + sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] pm = messaging.PubMaster(cfg.pub_sub.keys()) all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] - log_msgs = [] # We need to fake SubMaster alive since we can't inject a fake clock setup_env(simulation=True, cfg=cfg, lr=lr) + if cfg.proc_name == "laikad": + ublox = Params().get_bool("UbloxAvailable") + keys = set(cfg.pub_sub.keys()) - ({"qcomGnss", } if ublox else {"ubloxGnss", }) + pub_msgs = [msg for msg in pub_msgs if msg.which() in keys] + managed_processes[cfg.proc_name].prepare() managed_processes[cfg.proc_name].start() + log_msgs = [] try: - with Timeout(TIMEOUT, error_msg=f"timed out testing process {repr(cfg.proc_name)}"): + # Wait for process to startup + with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(cfg.proc_name)}"): while not any(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()): time.sleep(0) - # Make sure all subscribers are connected - sockets = {s: messaging.sub_sock(s, timeout=2000) for s in sub_sockets} - for s in sub_sockets: - messaging.recv_one_or_none(sockets[s]) + # Make sure all subscribers are connected + sockets = {s: messaging.sub_sock(s, timeout=2000) for s in sub_sockets} + for s in sub_sockets: + messaging.recv_one_or_none(sockets[s]) - for i, msg in enumerate(pub_msgs): + # Do the replay + cnt = 0 + for msg in pub_msgs: + with Timeout(TIMEOUT, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"): pm.send(msg.which(), msg.as_builder()) + while not pm.all_readers_updated(msg.which()): + time.sleep(0) - resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) + resp_sockets = cfg.pub_sub[msg.which()] + if cfg.should_recv_callback is not None: + resp_sockets, _ = cfg.should_recv_callback(msg, None, None, None) for s in resp_sockets: - response = messaging.recv_one_retry(sockets[s]) - - if response is None: - print(f"Warning, no response received {i}") - else: - - response = response.as_builder() - response.logMonoTime = msg.logMonoTime - response = response.as_reader() - log_msgs.append(response) - - if not len(resp_sockets): # We only need to wait if we didn't already wait for a response - while not pm.all_readers_updated(msg.which()): - time.sleep(0) + m = messaging.recv_one_retry(sockets[s]) + m = m.as_builder() + m.logMonoTime = msg.logMonoTime + log_msgs.append(m.as_reader()) + cnt += 1 finally: managed_processes[cfg.proc_name].signal(signal.SIGKILL) managed_processes[cfg.proc_name].stop() diff --git a/selfdrive/test/process_replay/test_processes.py b/selfdrive/test/process_replay/test_processes.py index 07117c2e7f..a2dd938e2f 100755 --- a/selfdrive/test/process_replay/test_processes.py +++ b/selfdrive/test/process_replay/test_processes.py @@ -18,7 +18,7 @@ from tools.lib.logreader import LogReader source_segments = [ ("BODY", "937ccb7243511b65|2022-05-24--16-03-09--1"), # COMMA.BODY ("HYUNDAI", "02c45f73a2e5c6e9|2021-01-01--19-08-22--1"), # HYUNDAI.SONATA - ("HYUNDAI2", "d545129f3ca90f28|2022-11-07--20-43-08--3"), # HYUNDAI.KIA_EV6 + ("HYUNDAI2", "d545129f3ca90f28|2022-11-07--20-43-08--3"), # HYUNDAI.KIA_EV6 (+ QCOM GPS) ("TOYOTA", "0982d79ebb0de295|2021-01-04--17-13-21--13"), # TOYOTA.PRIUS (INDI) ("TOYOTA2", "0982d79ebb0de295|2021-01-03--20-03-36--6"), # TOYOTA.RAV4 (LQR) ("TOYOTA3", "f7d7e3538cda1a2a|2021-08-16--08-55-34--6"), # TOYOTA.COROLLA_TSS2 @@ -70,7 +70,7 @@ def run_test_process(data): res = None if not args.upload_only: lr = LogReader.from_bytes(lr_dat) - res, log_msgs = test_process(cfg, lr, ref_log_path, cur_log_fn, args.ignore_fields, args.ignore_msgs) + res, log_msgs = test_process(cfg, lr, segment, ref_log_path, cur_log_fn, args.ignore_fields, args.ignore_msgs) # save logs so we can upload when updating refs save_log(cur_log_fn, log_msgs) @@ -88,7 +88,7 @@ def get_log_data(segment): return (segment, f.read()) -def test_process(cfg, lr, ref_log_path, new_log_path, ignore_fields=None, ignore_msgs=None): +def test_process(cfg, lr, segment, ref_log_path, new_log_path, ignore_fields=None, ignore_msgs=None): if ignore_fields is None: ignore_fields = [] if ignore_msgs is None: @@ -96,7 +96,10 @@ def test_process(cfg, lr, ref_log_path, new_log_path, ignore_fields=None, ignore ref_log_msgs = list(LogReader(ref_log_path)) - log_msgs = replay_process(cfg, lr) + try: + log_msgs = replay_process(cfg, lr) + except Exception as e: + raise Exception("failed on segment: " + segment) from e # check to make sure openpilot is engaged in the route if cfg.proc_name == "controlsd":