diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 16c0c0f6a3..5871488fe6 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -77,9 +77,9 @@ class ReplayContext: def wait_for_recv_called(self): messaging.wait_for_one_event(self.all_recv_called_events) - def wait_for_next_recv(self, end_of_cycle): + def wait_for_next_recv(self, trigger_empty_recv): index = messaging.wait_for_one_event(self.all_recv_called_events) - if self.drained_pub is not None and end_of_cycle: + if self.drained_pub is not None and trigger_empty_recv: self.all_recv_called_events[index].clear() self.all_recv_ready_events[index].set() self.all_recv_called_events[index].wait() @@ -419,12 +419,17 @@ def _replay_single_process(cfg, lr, fingerprint): for s in sockets.values(): messaging.recv_one_or_none(s) + # empty recv on drained pub indicates the end of messages, only do that if there're any + trigger_empty_recv = False + if cfg.drained_pub: + trigger_empty_recv = next((True for m in msg_queue if m.which() == cfg.drained_pub), False) + for m in msg_queue: pm.send(m.which(), m.as_builder()) msg_queue = [] rc.unlock_sockets() - rc.wait_for_next_recv(True) + rc.wait_for_next_recv(trigger_empty_recv) for s in resp_sockets: ms = messaging.drain_sock(sockets[s])