diff --git a/selfdrive/debug/run_process_on_route.py b/selfdrive/debug/run_process_on_route.py index 2b2cba5343..2ccb0fb3e7 100755 --- a/selfdrive/debug/run_process_on_route.py +++ b/selfdrive/debug/run_process_on_route.py @@ -25,6 +25,6 @@ if __name__ == "__main__": inputs = [i for i in inputs if i.which() not in produces] outputs = sorted(inputs + outputs, key=lambda x: x.logMonoTime) - # fn = f"{args.route.replace('/', '_')}_{'_'.join(args.process)}.zst" - # print(f"Saving log to {fn}") - # save_log(fn, outputs) + fn = f"{args.route.replace('/', '_')}_{'_'.join(args.process)}.zst" + print(f"Saving log to {fn}") + save_log(fn, outputs) diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index d0d465474d..d63963a080 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -210,8 +210,8 @@ class ProcessContainer: self.cfg.vision_pubs = [meta.camera_state for meta in streams_metas if meta.camera_state in self.cfg.vision_pubs] def _start_process(self): - # if self.capture is not None: - # self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher) + if self.capture is not None: + self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher) # self.process.prepare() self.process.start() @@ -221,11 +221,6 @@ class ProcessContainer: fingerprint: str | None, capture_output: bool ): with self.prefix as p: - if capture_output: - self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix) - - if self.capture is not None: - self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher) self.process.prepare() self.prefix.create_dirs() @@ -246,13 +241,13 @@ class ProcessContainer: self._setup_vision_ipc(all_msgs, frs) assert self.vipc_server is not None - # if capture_output: - # self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix) + if capture_output: + self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix) self._start_process() - # if self.cfg.init_callback is not None: - # self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint) + if self.cfg.init_callback is not None: + self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint) def stop(self): with self.prefix: @@ -687,27 +682,27 @@ def _replay_multi_process( internal_pub_index_heap: list[tuple[int, int]] = [] pbar = tqdm(total=len(external_pub_queue), disable=disable_progress) - # while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)): - # if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]): - # msg = external_pub_queue.pop(0) - # pbar.update(1) - # else: - # _, index = heapq.heappop(internal_pub_index_heap) - # msg = internal_pub_queue[index] - # - # target_containers = pubs_to_containers[msg.which()] - # for container in target_containers: - # output_msgs = container.run_step(msg, frs) - # for m in output_msgs: - # if m.which() in all_pubs: - # internal_pub_queue.append(m) - # heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) - # log_msgs.extend(output_msgs) - # - # # flush last set of messages from each process - # for container in containers: - # last_time = log_msgs[-1].logMonoTime if len(log_msgs) > 0 else int(time.monotonic() * 1e9) - # log_msgs.extend(container.get_output_msgs(last_time)) + while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)): + if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]): + msg = external_pub_queue.pop(0) + pbar.update(1) + else: + _, index = heapq.heappop(internal_pub_index_heap) + msg = internal_pub_queue[index] + + target_containers = pubs_to_containers[msg.which()] + for container in target_containers: + output_msgs = container.run_step(msg, frs) + for m in output_msgs: + if m.which() in all_pubs: + internal_pub_queue.append(m) + heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) + log_msgs.extend(output_msgs) + + # flush last set of messages from each process + for container in containers: + last_time = log_msgs[-1].logMonoTime if len(log_msgs) > 0 else int(time.monotonic() * 1e9) + log_msgs.extend(container.get_output_msgs(last_time)) finally: for container in containers: container.stop()