diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index d4f1744d37..ac5e94864c 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -268,6 +268,19 @@ class ProcessContainer: self.prefix.clean_dirs() self._clean_env() + def get_output_msgs(self, start_time: int): + assert self.rc and self.sockets + + output_msgs = [] + self.rc.wait_for_recv_called() + for socket in self.sockets: + ms = messaging.drain_sock(socket) + for m in ms: + m = m.as_builder() + m.logMonoTime = start_time + int(self.cfg.processing_time * 1e9) + output_msgs.append(m.as_reader()) + return output_msgs + def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, FrameReader] | None) -> list[capnp._DynamicStructReader]: assert self.rc and self.pm and self.sockets and self.process.proc @@ -279,18 +292,19 @@ class ProcessContainer: self.msg_queue.append(msg) if end_of_cycle: - self.rc.wait_for_recv_called() - # call recv to let sub-sockets reconnect, after we know the process is ready if self.cnt == 0: for s in self.sockets: messaging.recv_one_or_none(s) - # empty recv on drained pub indicates the end of messages, only do that if there're any + # certain processes use drain_sock. need to cause empty recv to break from this loop trigger_empty_recv = False if self.cfg.main_pub and self.cfg.main_pub_drained: trigger_empty_recv = next((True for m in self.msg_queue if m.which() == self.cfg.main_pub), False) + # get output msgs from previous inputs + output_msgs = self.get_output_msgs(msg.logMonoTime) + for m in self.msg_queue: self.pm.send(m.which(), m.as_builder()) # send frames if needed @@ -304,14 +318,8 @@ class ProcessContainer: self.msg_queue = [] self.rc.unlock_sockets() - self.rc.wait_for_next_recv(trigger_empty_recv) - - for socket in self.sockets: - ms = messaging.drain_sock(socket) - for m in ms: - m = m.as_builder() - m.logMonoTime = msg.logMonoTime + int(self.cfg.processing_time * 1e9) - output_msgs.append(m.as_reader()) + if trigger_empty_recv: + self.rc.unlock_sockets() self.cnt += 1 assert self.process.proc.is_alive() @@ -740,6 +748,11 @@ def _replay_multi_process( internal_pub_queue.append(m) heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) log_msgs.extend(output_msgs) + + # flush last set of messages from each process + for container in containers: + last_time = log_msgs[-1].logMonoTime if len(log_msgs) > 0 else int(time.monotonic() * 1e9) + log_msgs.extend(container.get_output_msgs(last_time)) finally: for container in containers: container.stop()