process replay: speed up startup (#35879)

* format

* containers might not be set

* opts

* halves startup time for 12 procs (1.6 to 0.8s)

* stash

* clean up

* who knew going through entire list of msgs each time is so slow

* rewrite this to be more readable

* speed up lr

* clean up

* more

* more
pull/35883/head
Shane Smiskol 3 days ago committed by GitHub
parent 4d01b7bec8
commit dd09c4f341
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 14
      selfdrive/test/process_replay/process_replay.py

@ -6,6 +6,7 @@ import heapq
import signal import signal
from collections import Counter from collections import Counter
from dataclasses import dataclass, field from dataclasses import dataclass, field
from itertools import islice
from typing import Any from typing import Any
from collections.abc import Callable, Iterable from collections.abc import Callable, Iterable
from tqdm import tqdm from tqdm import tqdm
@ -302,7 +303,7 @@ class ProcessContainer:
# certain processes use drain_sock. need to cause empty recv to break from this loop # certain processes use drain_sock. need to cause empty recv to break from this loop
trigger_empty_recv = False trigger_empty_recv = False
if self.cfg.main_pub and self.cfg.main_pub_drained: if self.cfg.main_pub and self.cfg.main_pub_drained:
trigger_empty_recv = next((True for m in self.msg_queue if m.which() == self.cfg.main_pub), False) trigger_empty_recv = any(m.which() == self.cfg.main_pub for m in self.msg_queue)
# get output msgs from previous inputs # get output msgs from previous inputs
output_msgs = self.get_output_msgs(msg.logMonoTime) output_msgs = self.get_output_msgs(msg.logMonoTime)
@ -331,7 +332,7 @@ class ProcessContainer:
def card_fingerprint_callback(rc, pm, msgs, fingerprint): def card_fingerprint_callback(rc, pm, msgs, fingerprint):
print("start fingerprinting") print("start fingerprinting")
params = Params() params = Params()
canmsgs = [msg for msg in msgs if msg.which() == "can"][:300] canmsgs = list(islice((m for m in msgs if m.which() == "can"), 300))
# card expects one arbitrary can and pandaState # card expects one arbitrary can and pandaState
rc.send_sync(pm, "can", messaging.new_message("can", 1)) rc.send_sync(pm, "can", messaging.new_message("can", 1))
@ -358,19 +359,18 @@ def get_car_params_callback(rc, pm, msgs, fingerprint):
can = DummySocket() can = DummySocket()
sendcan = DummySocket() sendcan = DummySocket()
canmsgs = [msg for msg in msgs if msg.which() == "can"] canmsgs = list(islice((m for m in msgs if m.which() == "can"), 300))
cached_params_raw = params.get("CarParamsCache") cached_params_raw = params.get("CarParamsCache")
has_cached_cp = cached_params_raw is not None
assert len(canmsgs) != 0, "CAN messages are required for fingerprinting" assert len(canmsgs) != 0, "CAN messages are required for fingerprinting"
assert os.environ.get("SKIP_FW_QUERY", False) or has_cached_cp, \ assert os.environ.get("SKIP_FW_QUERY", False) or cached_params_raw is not None, \
"CarParamsCache is required for fingerprinting. Make sure to keep carParams msgs in the logs." "CarParamsCache is required for fingerprinting. Make sure to keep carParams msgs in the logs."
for m in canmsgs[:300]: for m in canmsgs:
can.send(m.as_builder().to_bytes()) can.send(m.as_builder().to_bytes())
can_callbacks = can_comm_callbacks(can, sendcan) can_callbacks = can_comm_callbacks(can, sendcan)
cached_params = None cached_params = None
if has_cached_cp: if cached_params_raw is not None:
with car.CarParams.from_bytes(cached_params_raw) as _cached_params: with car.CarParams.from_bytes(cached_params_raw) as _cached_params:
cached_params = _cached_params cached_params = _cached_params

Loading…
Cancel
Save