diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 1f8c77d567..322594fb8d 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -445,31 +445,32 @@ def cpp_replay_process(cfg, lr, fingerprint=None): managed_processes[cfg.proc_name].prepare() managed_processes[cfg.proc_name].start() - with Timeout(TIMEOUT): - while not all(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()): - time.sleep(0) - - # Make sure all subscribers are connected - sockets = {s: messaging.sub_sock(s, timeout=2000) for s in sub_sockets} - for s in sub_sockets: - messaging.recv_one_or_none(sockets[s]) - - for i, msg in enumerate(tqdm(pub_msgs, disable=False)): - pm.send(msg.which(), msg.as_builder()) - - resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) - for s in resp_sockets: - response = messaging.recv_one(sockets[s]) - - if response is None: - print(f"Warning, no response received {i}") - else: - log_msgs.append(response) - - if not len(resp_sockets): # We only need to wait if we didn't already wait for a response - while not pm.all_readers_updated(msg.which()): - time.sleep(0) - + try: + with Timeout(TIMEOUT): + while not all(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()): + time.sleep(0) + + # Make sure all subscribers are connected + sockets = {s: messaging.sub_sock(s, timeout=2000) for s in sub_sockets} + for s in sub_sockets: + messaging.recv_one_or_none(sockets[s]) + + for i, msg in enumerate(tqdm(pub_msgs, disable=False)): + pm.send(msg.which(), msg.as_builder()) + + resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) + for s in resp_sockets: + response = messaging.recv_one(sockets[s]) + + if response is None: + print(f"Warning, no response received {i}") + else: + log_msgs.append(response) + + if not len(resp_sockets): # We only need to wait if we didn't already wait for a response + while not pm.all_readers_updated(msg.which()): + time.sleep(0) + finally: managed_processes[cfg.proc_name].signal(signal.SIGKILL) managed_processes[cfg.proc_name].stop()