From 61840b467d9e2b2726b181f330c925db3666d46f Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Sat, 2 Aug 2025 13:37:44 -0700 Subject: [PATCH 1/2] profile --- selfdrive/debug/run_process_on_route.py | 6 +- selfdrive/test/process_replay/migration.py | 6 +- .../test/process_replay/process_replay.py | 65 ++++++++++--------- 3 files changed, 44 insertions(+), 33 deletions(-) diff --git a/selfdrive/debug/run_process_on_route.py b/selfdrive/debug/run_process_on_route.py index 2ccb0fb3e7..2b2cba5343 100755 --- a/selfdrive/debug/run_process_on_route.py +++ b/selfdrive/debug/run_process_on_route.py @@ -25,6 +25,6 @@ if __name__ == "__main__": inputs = [i for i in inputs if i.which() not in produces] outputs = sorted(inputs + outputs, key=lambda x: x.logMonoTime) - fn = f"{args.route.replace('/', '_')}_{'_'.join(args.process)}.zst" - print(f"Saving log to {fn}") - save_log(fn, outputs) + # fn = f"{args.route.replace('/', '_')}_{'_'.join(args.process)}.zst" + # print(f"Saving log to {fn}") + # save_log(fn, outputs) diff --git a/selfdrive/test/process_replay/migration.py b/selfdrive/test/process_replay/migration.py index 33b363cfd9..7cde546dbb 100644 --- a/selfdrive/test/process_replay/migration.py +++ b/selfdrive/test/process_replay/migration.py @@ -205,11 +205,15 @@ def migrate_controlsState(msgs): def migrate_carState(msgs): ops = [] last_cs = None + need_migration = False for index, msg in msgs: if msg.which() == 'controlsState': last_cs = msg elif msg.which() == 'carState' and last_cs is not None: - if last_cs.controlsState.vCruiseDEPRECATED - msg.carState.vCruise > 0.1: + if not need_migration and last_cs.controlsState.vCruiseDEPRECATED - msg.carState.vCruise > 0.1: + need_migration = True + + if need_migration: msg = msg.as_builder() msg.carState.vCruise = last_cs.controlsState.vCruiseDEPRECATED msg.carState.vCruiseCluster = last_cs.controlsState.vCruiseClusterDEPRECATED diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 29a268b452..d0d465474d 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -210,9 +210,9 @@ class ProcessContainer: self.cfg.vision_pubs = [meta.camera_state for meta in streams_metas if meta.camera_state in self.cfg.vision_pubs] def _start_process(self): - if self.capture is not None: - self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher) - self.process.prepare() + # if self.capture is not None: + # self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher) + # self.process.prepare() self.process.start() def start( @@ -221,6 +221,13 @@ class ProcessContainer: fingerprint: str | None, capture_output: bool ): with self.prefix as p: + if capture_output: + self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix) + + if self.capture is not None: + self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher) + self.process.prepare() + self.prefix.create_dirs() self._setup_env(params_config, environ_config) @@ -239,13 +246,13 @@ class ProcessContainer: self._setup_vision_ipc(all_msgs, frs) assert self.vipc_server is not None - if capture_output: - self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix) + # if capture_output: + # self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix) self._start_process() - if self.cfg.init_callback is not None: - self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint) + # if self.cfg.init_callback is not None: + # self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint) def stop(self): with self.prefix: @@ -658,7 +665,7 @@ def _replay_multi_process( required_vision_pubs = {m.camera_state for m in available_streams(lr)} & set(all_vision_pubs) assert all(st in frs for st in required_vision_pubs), f"frs for this process must contain following vision streams: {required_vision_pubs}" - all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) + all_msgs = lr # sorted(lr, key=lambda msg: msg.logMonoTime) log_msgs = [] containers = [] try: @@ -680,27 +687,27 @@ def _replay_multi_process( internal_pub_index_heap: list[tuple[int, int]] = [] pbar = tqdm(total=len(external_pub_queue), disable=disable_progress) - while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)): - if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]): - msg = external_pub_queue.pop(0) - pbar.update(1) - else: - _, index = heapq.heappop(internal_pub_index_heap) - msg = internal_pub_queue[index] - - target_containers = pubs_to_containers[msg.which()] - for container in target_containers: - output_msgs = container.run_step(msg, frs) - for m in output_msgs: - if m.which() in all_pubs: - internal_pub_queue.append(m) - heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) - log_msgs.extend(output_msgs) - - # flush last set of messages from each process - for container in containers: - last_time = log_msgs[-1].logMonoTime if len(log_msgs) > 0 else int(time.monotonic() * 1e9) - log_msgs.extend(container.get_output_msgs(last_time)) + # while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)): + # if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]): + # msg = external_pub_queue.pop(0) + # pbar.update(1) + # else: + # _, index = heapq.heappop(internal_pub_index_heap) + # msg = internal_pub_queue[index] + # + # target_containers = pubs_to_containers[msg.which()] + # for container in target_containers: + # output_msgs = container.run_step(msg, frs) + # for m in output_msgs: + # if m.which() in all_pubs: + # internal_pub_queue.append(m) + # heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) + # log_msgs.extend(output_msgs) + # + # # flush last set of messages from each process + # for container in containers: + # last_time = log_msgs[-1].logMonoTime if len(log_msgs) > 0 else int(time.monotonic() * 1e9) + # log_msgs.extend(container.get_output_msgs(last_time)) finally: for container in containers: container.stop() From fa90cf15486e3da57961838457480b0d129ee484 Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Sat, 2 Aug 2025 13:41:02 -0700 Subject: [PATCH 2/2] clean up --- selfdrive/debug/run_process_on_route.py | 6 +- .../test/process_replay/process_replay.py | 59 +++++++++---------- 2 files changed, 30 insertions(+), 35 deletions(-) diff --git a/selfdrive/debug/run_process_on_route.py b/selfdrive/debug/run_process_on_route.py index 2b2cba5343..2ccb0fb3e7 100755 --- a/selfdrive/debug/run_process_on_route.py +++ b/selfdrive/debug/run_process_on_route.py @@ -25,6 +25,6 @@ if __name__ == "__main__": inputs = [i for i in inputs if i.which() not in produces] outputs = sorted(inputs + outputs, key=lambda x: x.logMonoTime) - # fn = f"{args.route.replace('/', '_')}_{'_'.join(args.process)}.zst" - # print(f"Saving log to {fn}") - # save_log(fn, outputs) + fn = f"{args.route.replace('/', '_')}_{'_'.join(args.process)}.zst" + print(f"Saving log to {fn}") + save_log(fn, outputs) diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index d0d465474d..d63963a080 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -210,8 +210,8 @@ class ProcessContainer: self.cfg.vision_pubs = [meta.camera_state for meta in streams_metas if meta.camera_state in self.cfg.vision_pubs] def _start_process(self): - # if self.capture is not None: - # self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher) + if self.capture is not None: + self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher) # self.process.prepare() self.process.start() @@ -221,11 +221,6 @@ class ProcessContainer: fingerprint: str | None, capture_output: bool ): with self.prefix as p: - if capture_output: - self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix) - - if self.capture is not None: - self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher) self.process.prepare() self.prefix.create_dirs() @@ -246,13 +241,13 @@ class ProcessContainer: self._setup_vision_ipc(all_msgs, frs) assert self.vipc_server is not None - # if capture_output: - # self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix) + if capture_output: + self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix) self._start_process() - # if self.cfg.init_callback is not None: - # self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint) + if self.cfg.init_callback is not None: + self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint) def stop(self): with self.prefix: @@ -687,27 +682,27 @@ def _replay_multi_process( internal_pub_index_heap: list[tuple[int, int]] = [] pbar = tqdm(total=len(external_pub_queue), disable=disable_progress) - # while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)): - # if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]): - # msg = external_pub_queue.pop(0) - # pbar.update(1) - # else: - # _, index = heapq.heappop(internal_pub_index_heap) - # msg = internal_pub_queue[index] - # - # target_containers = pubs_to_containers[msg.which()] - # for container in target_containers: - # output_msgs = container.run_step(msg, frs) - # for m in output_msgs: - # if m.which() in all_pubs: - # internal_pub_queue.append(m) - # heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) - # log_msgs.extend(output_msgs) - # - # # flush last set of messages from each process - # for container in containers: - # last_time = log_msgs[-1].logMonoTime if len(log_msgs) > 0 else int(time.monotonic() * 1e9) - # log_msgs.extend(container.get_output_msgs(last_time)) + while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)): + if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]): + msg = external_pub_queue.pop(0) + pbar.update(1) + else: + _, index = heapq.heappop(internal_pub_index_heap) + msg = internal_pub_queue[index] + + target_containers = pubs_to_containers[msg.which()] + for container in target_containers: + output_msgs = container.run_step(msg, frs) + for m in output_msgs: + if m.which() in all_pubs: + internal_pub_queue.append(m) + heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) + log_msgs.extend(output_msgs) + + # flush last set of messages from each process + for container in containers: + last_time = log_msgs[-1].logMonoTime if len(log_msgs) > 0 else int(time.monotonic() * 1e9) + log_msgs.extend(container.get_output_msgs(last_time)) finally: for container in containers: container.stop()