|
|
|
@ -718,7 +718,6 @@ def _replay_multi_process( |
|
|
|
|
pbar = tqdm(total=len(external_pub_queue), disable=disable_progress) |
|
|
|
|
times = defaultdict(list) |
|
|
|
|
while len(external_pub_queue) != 0 or (len(internal_pub_index_heap) != 0 and not all(c.has_empty_queue for c in containers)): |
|
|
|
|
t = time.monotonic() |
|
|
|
|
if len(internal_pub_index_heap) == 0 or (len(external_pub_queue) != 0 and external_pub_queue[0].logMonoTime < internal_pub_index_heap[0][0]): |
|
|
|
|
msg = external_pub_queue.pop(0) |
|
|
|
|
pbar.update(1) |
|
|
|
@ -726,9 +725,6 @@ def _replay_multi_process( |
|
|
|
|
_, index = heapq.heappop(internal_pub_index_heap) |
|
|
|
|
msg = internal_pub_queue[index] |
|
|
|
|
|
|
|
|
|
# print(f'get msg took {time.monotonic() - t}s') |
|
|
|
|
|
|
|
|
|
t = time.monotonic() |
|
|
|
|
target_containers = pubs_to_containers[msg.which()] |
|
|
|
|
for container in target_containers: |
|
|
|
|
t1 = time.monotonic() |
|
|
|
@ -739,10 +735,8 @@ def _replay_multi_process( |
|
|
|
|
internal_pub_queue.append(m) |
|
|
|
|
heapq.heappush(internal_pub_index_heap, (m.logMonoTime, len(internal_pub_queue) - 1)) |
|
|
|
|
log_msgs.extend(output_msgs) |
|
|
|
|
# print(f'run_step for {container.cfg.proc_name} took {time.monotonic() - t1}s') |
|
|
|
|
# print(f'all run_steps took {time.monotonic() - t}s') |
|
|
|
|
|
|
|
|
|
print("Average run_step times:") |
|
|
|
|
print("Total run_step times:") |
|
|
|
|
for container, time_list in times.items(): |
|
|
|
|
print(f" {container}: {sum(time_list)}s") |
|
|
|
|
print('Total run_step time: {:.2f}s'.format(sum(sum(time_list) for time_list in times.values()))) |
|
|
|
|