From 1f61e411f34d3a5126cacd95d13f4ed3a450b138 Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Wed, 12 May 2021 16:58:28 +0200 Subject: [PATCH] process replay: add timeout to C++ test in case of process crash (#20886) old-commit-hash: 38acef1c6af274a3a187f51c5235e6158db704dd --- .../test/process_replay/process_replay.py | 45 ++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index f90666b52c..734edd9a9d 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -14,6 +14,7 @@ import cereal.messaging as messaging from cereal import car, log from cereal.services import service_list from common.params import Params +from common.timeout import Timeout from selfdrive.car.fingerprints import FW_VERSIONS from selfdrive.car.car_helpers import get_car, interfaces from selfdrive.manager.process import PythonProcess @@ -437,36 +438,38 @@ def cpp_replay_process(cfg, lr, fingerprint=None): all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] + log_msgs = [] os.environ["SIMULATION"] = "1" # Disable submaster alive checks managed_processes[cfg.proc_name].prepare() managed_processes[cfg.proc_name].start() - while not all(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()): - time.sleep(0) + with Timeout(TIMEOUT): + while not all(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()): + time.sleep(0) - # Make sure all subscribers are connected - sockets = {s: messaging.sub_sock(s, timeout=2000) for s in sub_sockets} - for s in sub_sockets: - messaging.recv_one_or_none(sockets[s]) + # Make sure all subscribers are connected + sockets = {s: messaging.sub_sock(s, timeout=2000) for s in sub_sockets} + for s in sub_sockets: + messaging.recv_one_or_none(sockets[s]) - log_msgs = [] - for i, msg in enumerate(tqdm(pub_msgs, disable=CI)): - pm.send(msg.which(), msg.as_builder()) + for i, msg in enumerate(tqdm(pub_msgs, disable=CI)): + pm.send(msg.which(), msg.as_builder()) - resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) - for s in resp_sockets: - response = messaging.recv_one(sockets[s]) + resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) + for s in resp_sockets: + response = messaging.recv_one(sockets[s]) + + if response is None: + print(f"Warning, no response received {i}") + else: + log_msgs.append(response) - if response is None: - print(f"Warning, no response received {i}") - else: - log_msgs.append(response) + if not len(resp_sockets): # We only need to wait if we didn't already wait for a response + while not pm.all_readers_updated(msg.which()): + time.sleep(0) - if not len(resp_sockets): # We only need to wait if we didn't already wait for a response - while not pm.all_readers_updated(msg.which()): - time.sleep(0) + managed_processes[cfg.proc_name].signal(signal.SIGKILL) + managed_processes[cfg.proc_name].stop() - managed_processes[cfg.proc_name].signal(signal.SIGKILL) - managed_processes[cfg.proc_name].stop() return log_msgs