pr-setup-speedup
Shane Smiskol 1 month ago
parent 61840b467d
commit fa90cf1548
  1. 6
      selfdrive/debug/run_process_on_route.py
  2. 59
      selfdrive/test/process_replay/process_replay.py

@ -25,6 +25,6 @@ if __name__ == "__main__":
inputs = [i for i in inputs if i.which() not in produces] inputs = [i for i in inputs if i.which() not in produces]
outputs = sorted(inputs + outputs, key=lambda x: x.logMonoTime) outputs = sorted(inputs + outputs, key=lambda x: x.logMonoTime)
# fn = f"{args.route.replace('/', '_')}_{'_'.join(args.process)}.zst" fn = f"{args.route.replace('/', '_')}_{'_'.join(args.process)}.zst"
# print(f"Saving log to {fn}") print(f"Saving log to {fn}")
# save_log(fn, outputs) save_log(fn, outputs)

@ -210,8 +210,8 @@ class ProcessContainer:
self.cfg.vision_pubs = [meta.camera_state for meta in streams_metas if meta.camera_state in self.cfg.vision_pubs] self.cfg.vision_pubs = [meta.camera_state for meta in streams_metas if meta.camera_state in self.cfg.vision_pubs]
def _start_process(self): def _start_process(self):
# if self.capture is not None: if self.capture is not None:
# self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher) self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher)
# self.process.prepare() # self.process.prepare()
self.process.start() self.process.start()
@ -221,11 +221,6 @@ class ProcessContainer:
fingerprint: str | None, capture_output: bool fingerprint: str | None, capture_output: bool
): ):
with self.prefix as p: with self.prefix as p:
if capture_output:
self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix)
if self.capture is not None:
self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher)
self.process.prepare() self.process.prepare()
self.prefix.create_dirs() self.prefix.create_dirs()
@ -246,13 +241,13 @@ class ProcessContainer:
self._setup_vision_ipc(all_msgs, frs) self._setup_vision_ipc(all_msgs, frs)
assert self.vipc_server is not None assert self.vipc_server is not None
# if capture_output: if capture_output:
# self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix) self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix)
self._start_process() self._start_process()
# if self.cfg.init_callback is not None: if self.cfg.init_callback is not None:
# self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint) self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint)
def stop(self): def stop(self):
with self.prefix: with self.prefix:
@ -687,27 +682,27 @@ def _replay_multi_process(
internal_pub_index_heap: list[tuple[int, int]] = [] internal_pub_index_heap: list[tuple[int, int]] = []
pbar = tqdm(total=len(external_pub_queue), disable=disable_progress) pbar = tqdm(total=len(external_pub_queue), disable=disable_progress)
# while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)): while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)):
# if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]): if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]):
# msg = external_pub_queue.pop(0) msg = external_pub_queue.pop(0)
# pbar.update(1) pbar.update(1)
# else: else:
# _, index = heapq.heappop(internal_pub_index_heap) _, index = heapq.heappop(internal_pub_index_heap)
# msg = internal_pub_queue[index] msg = internal_pub_queue[index]
#
# target_containers = pubs_to_containers[msg.which()] target_containers = pubs_to_containers[msg.which()]
# for container in target_containers: for container in target_containers:
# output_msgs = container.run_step(msg, frs) output_msgs = container.run_step(msg, frs)
# for m in output_msgs: for m in output_msgs:
# if m.which() in all_pubs: if m.which() in all_pubs:
# internal_pub_queue.append(m) internal_pub_queue.append(m)
# heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1))
# log_msgs.extend(output_msgs) log_msgs.extend(output_msgs)
#
# # flush last set of messages from each process # flush last set of messages from each process
# for container in containers: for container in containers:
# last_time = log_msgs[-1].logMonoTime if len(log_msgs) > 0 else int(time.monotonic() * 1e9) last_time = log_msgs[-1].logMonoTime if len(log_msgs) > 0 else int(time.monotonic() * 1e9)
# log_msgs.extend(container.get_output_msgs(last_time)) log_msgs.extend(container.get_output_msgs(last_time))
finally: finally:
for container in containers: for container in containers:
container.stop() container.stop()

Loading…
Cancel
Save