process_replay: universal replay mechanism using cereal fake sockets (#28115)

* replay_process_with_fake_sockets implementation

* add missing polled_pubs to configs

* drained_pubs field

* updated cereal

* Remove python/native variations of process replay. Replace with universal one using cereal fake sockets

* Replace old py FakeSocket with DummySocket

* Invalidate and deregister fake sockets after replay is done

* Remove unused import

* Set up new prefix for each replay

* Fixes for radard

* Refactor ReplayContext and ProcessConfig

* Minor fixes

* Reimplement controlsd fingerprinting callback

* time.sleep for sockets to safely reconnect

* Fix fingerprinting for controlsd

* Fixes for regen to work

* Fix replay loop to respect submaster frames

* Fix profiler to use new ProcessConfig fields

* Remove tqdm

* Refactor tests to use new ProcessConfig

* Add FrequencyBasedRcvCallback

* Make tolerance None by default

* Update cereal

* Add get_process_config utility func

* Update cereal. Simplify sync procedure

* Chain context managers

* New sub-socket reconnection procedure

* Fix linter issues

* Revert chaining of context managers

* Init controlsState only when replaying controlsd. Update cereal

* Update cereal

* Update process_replay to use new cereal API

* Update cereal

* Update cereal

* Update cereal

* Simplify radard recv callback

* Update release/files_common
old-commit-hash: e6ac6320ac
beeps
Kacper Rączy 2 years ago committed by GitHub
parent 52e1cf2e64
commit a48b67f720
  1. 2
      cereal
  2. 5
      release/files_common
  3. 5
      selfdrive/controls/tests/test_alerts.py
  4. 705
      selfdrive/test/process_replay/process_replay.py
  5. 3
      selfdrive/test/process_replay/regen.py
  6. 2
      selfdrive/test/process_replay/test_fuzzy.py
  7. 2
      selfdrive/test/profiling/profiler.py

@ -1 +1 @@
Subproject commit e0cf7e09ae2864034bc370fb6c3db71a83eaf3e6
Subproject commit 9b6b53396fff6ec7541a5415901d892e57756b91

@ -469,6 +469,7 @@ body/crypto/**
cereal/.gitignore
cereal/__init__.py
cereal/car.capnp
cereal/custom.capnp
cereal/legacy.capnp
cereal/log.capnp
cereal/services.py
@ -478,6 +479,10 @@ cereal/logger/logger.h
cereal/messaging/.gitignore
cereal/messaging/__init__.py
cereal/messaging/bridge.cc
cereal/messaging/event.cc
cereal/messaging/event.h
cereal/messaging/impl_fake.cc
cereal/messaging/impl_fake.h
cereal/messaging/impl_msgq.cc
cereal/messaging/impl_msgq.h
cereal/messaging/impl_zmq.cc

@ -6,11 +6,12 @@ import random
from PIL import Image, ImageDraw, ImageFont
from cereal import log, car
from cereal.messaging import SubMaster
from common.basedir import BASEDIR
from common.params import Params
from selfdrive.controls.lib.events import Alert, EVENTS, ET
from selfdrive.controls.lib.alertmanager import set_offroad_alert
from selfdrive.test.process_replay.process_replay import FakeSubMaster, CONFIGS
from selfdrive.test.process_replay.process_replay import CONFIGS
AlertSize = log.ControlsState.AlertSize
@ -34,7 +35,7 @@ class TestAlerts(unittest.TestCase):
cls.CS = car.CarState.new_message()
cls.CP = car.CarParams.new_message()
cfg = [c for c in CONFIGS if c.proc_name == 'controlsd'][0]
cls.sm = FakeSubMaster(cfg.pub_sub.keys())
cls.sm = SubMaster(cfg.pubs)
def test_events_defined(self):
# Ensure all events in capnp schema are defined in events.py

@ -1,27 +1,21 @@
#!/usr/bin/env python3
import importlib
import os
import sys
import threading
import time
import signal
from collections import defaultdict
from collections import OrderedDict
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable
import capnp
import cereal.messaging as messaging
from cereal import car, log
from cereal import car
from cereal.services import service_list
from common.params import Params
from common.timeout import Timeout
from common.realtime import DT_CTRL
from panda.python import ALTERNATIVE_EXPERIENCE
from selfdrive.car.car_helpers import get_car, interfaces
from selfdrive.test.process_replay.helpers import OpenpilotPrefix
from selfdrive.manager.process import PythonProcess
from selfdrive.manager.process_config import managed_processes
from selfdrive.test.process_replay.helpers import OpenpilotPrefix
# Numpy gives different results based on CPU features after version 19
NUMPY_TOLERANCE = 1e-7
@ -30,373 +24,400 @@ TIMEOUT = 15
PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__))
FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/")
class ReplayContext:
def __init__(self, cfg):
self.proc_name = cfg.proc_name
self.pubs = cfg.pubs
self.drained_pub = cfg.drained_pub
assert(len(self.pubs) != 0 or self.drained_pub is not None)
def __enter__(self):
messaging.toggle_fake_events(True)
messaging.set_fake_prefix(self.proc_name)
if self.drained_pub is None:
self.events = OrderedDict()
for pub in self.pubs:
self.events[pub] = messaging.fake_event_handle(pub, enable=True)
else:
self.events = {self.drained_pub: messaging.fake_event_handle(self.drained_pub, enable=True)}
return self
def __exit__(self, exc_type, exc_obj, exc_tb):
del self.events
messaging.toggle_fake_events(False)
messaging.delete_fake_prefix()
@property
def all_recv_called_events(self):
return [man.recv_called_event for man in self.events.values()]
@property
def all_recv_ready_events(self):
return [man.recv_ready_event for man in self.events.values()]
def send_sync(self, pm, endpoint, dat):
self.events[endpoint].recv_called_event.wait()
self.events[endpoint].recv_called_event.clear()
pm.send(endpoint, dat)
self.events[endpoint].recv_ready_event.set()
def unlock_sockets(self):
expected_sets = len(self.events)
while expected_sets > 0:
index = messaging.wait_for_one_event(self.all_recv_called_events)
self.all_recv_called_events[index].clear()
self.all_recv_ready_events[index].set()
expected_sets -= 1
def wait_for_recv_called(self):
messaging.wait_for_one_event(self.all_recv_called_events)
def wait_for_next_recv(self, end_of_cycle):
index = messaging.wait_for_one_event(self.all_recv_called_events)
if self.drained_pub is not None and end_of_cycle:
self.all_recv_called_events[index].clear()
self.all_recv_ready_events[index].set()
self.all_recv_called_events[index].wait()
@dataclass
class ProcessConfig:
proc_name: str
pub_sub: Dict[str, List[str]]
pubs: List[str]
subs: List[str]
ignore: List[str]
config_callback: Optional[Callable]
init_callback: Optional[Callable]
should_recv_callback: Optional[Callable]
tolerance: Optional[float]
fake_pubsubmaster: bool
submaster_config: Dict[str, List[str]] = field(default_factory=dict)
tolerance: Optional[float] = None
environ: Dict[str, str] = field(default_factory=dict)
subtest_name: str = ""
field_tolerances: Dict[str, float] = field(default_factory=dict)
timeout: int = 30
iter_wait_time: float = 0.0
drain_sockets: bool = False
def wait_for_event(evt):
if not evt.wait(TIMEOUT):
if threading.currentThread().getName() == "MainThread":
# tested process likely died. don't let test just hang
raise Exception(f"Timeout reached. Tested process {os.environ['PROC_NAME']} likely crashed.")
else:
# done testing this process, let it die
sys.exit(0)
simulation: bool = True
drained_pub: Optional[str] = None
class FakeSocket:
def __init__(self, wait=True):
class DummySocket:
def __init__(self):
self.data = []
self.wait = wait
self.recv_called = threading.Event()
self.recv_ready = threading.Event()
def receive(self, non_blocking=False):
if non_blocking:
return None
if self.wait:
self.recv_called.set()
wait_for_event(self.recv_ready)
self.recv_ready.clear()
return self.data.pop(0)
return self.data.pop()
def send(self, data):
if self.wait:
wait_for_event(self.recv_called)
self.recv_called.clear()
self.data.append(data)
if self.wait:
self.recv_ready.set()
def wait_for_recv(self):
wait_for_event(self.recv_called)
class DumbSocket:
def __init__(self, s=None):
if s is not None:
try:
dat = messaging.new_message(s)
except capnp.lib.capnp.KjException: # pylint: disable=c-extension-no-member
# lists
dat = messaging.new_message(s, 0)
self.data = dat.to_bytes()
def receive(self, non_blocking=False):
return self.data
def send(self, dat):
pass
class FakeSubMaster(messaging.SubMaster):
def __init__(self, services, ignore_alive=None, ignore_avg_freq=None):
super().__init__(services, ignore_alive=ignore_alive, ignore_avg_freq=ignore_avg_freq, addr=None)
self.sock = {s: DumbSocket(s) for s in services}
self.update_called = threading.Event()
self.update_ready = threading.Event()
self.wait_on_getitem = False
def __getitem__(self, s):
# hack to know when fingerprinting is done
if self.wait_on_getitem:
self.update_called.set()
wait_for_event(self.update_ready)
self.update_ready.clear()
return self.data[s]
def update(self, timeout=-1):
self.update_called.set()
wait_for_event(self.update_ready)
self.update_ready.clear()
def update_msgs(self, cur_time, msgs):
wait_for_event(self.update_called)
self.update_called.clear()
super().update_msgs(cur_time, msgs)
self.update_ready.set()
def wait_for_update(self):
wait_for_event(self.update_called)
class FakePubMaster(messaging.PubMaster):
def __init__(self, services): # pylint: disable=super-init-not-called
self.data = defaultdict(list)
self.sock = {}
self.last_updated = None
for s in services:
self.sock[s] = DumbSocket()
def send(self, s, dat):
self.last_updated = s
if isinstance(dat, bytes):
self.data[s].append(log.Event.from_bytes(dat))
else:
self.data[s].append(dat.as_reader())
def drain(self, s):
msgs = self.data[s]
self.data[s] = []
return msgs
def fingerprint(msgs, fsm, can_sock, fingerprint):
def controlsd_fingerprint_callback(rc, pm, msgs, fingerprint):
print("start fingerprinting")
fsm.wait_on_getitem = True
# populate fake socket with data for fingerprinting
canmsgs = [msg for msg in msgs if msg.which() == "can"]
wait_for_event(can_sock.recv_called)
can_sock.recv_called.clear()
can_sock.data = [msg.as_builder().to_bytes() for msg in canmsgs[:300]]
can_sock.recv_ready.set()
can_sock.wait = False
params = Params()
canmsgs = [msg for msg in msgs if msg.which() == "can"][:300]
# we know fingerprinting is done when controlsd sets sm['lateralPlan'].sensorValid
wait_for_event(fsm.update_called)
fsm.update_called.clear()
# controlsd expects one arbitrary can and pandaState
rc.send_sync(pm, "can", messaging.new_message("can", 1))
pm.send("pandaStates", messaging.new_message("pandaStates", 1))
rc.send_sync(pm, "can", messaging.new_message("can", 1))
rc.wait_for_next_recv(True)
fsm.wait_on_getitem = False
can_sock.wait = True
can_sock.data = []
# fingerprinting is done, when CarParams is set
while params.get("CarParams") is None:
if len(canmsgs) == 0:
raise ValueError("Fingerprinting failed. Run out of can msgs")
fsm.update_ready.set()
m = canmsgs.pop(0)
rc.send_sync(pm, "can", m.as_builder().to_bytes())
rc.wait_for_next_recv(False)
def get_car_params(msgs, fsm, can_sock, fingerprint):
def get_car_params_callback(rc, pm, msgs, fingerprint):
if fingerprint:
CarInterface, _, _ = interfaces[fingerprint]
CP = CarInterface.get_non_essential_params(fingerprint)
else:
can = FakeSocket(wait=False)
sendcan = FakeSocket(wait=False)
can = DummySocket()
sendcan = DummySocket()
canmsgs = [msg for msg in msgs if msg.which() == 'can']
canmsgs = [msg for msg in msgs if msg.which() == "can"]
for m in canmsgs[:300]:
can.send(m.as_builder().to_bytes())
_, CP = get_car(can, sendcan, Params().get_bool("ExperimentalLongitudinalEnabled"))
Params().put("CarParams", CP.to_bytes())
def controlsd_rcv_callback(msg, CP, cfg, fsm):
def controlsd_rcv_callback(msg, CP, cfg, frame):
# no sendcan until controlsd is initialized
socks = [s for s in cfg.pub_sub[msg.which()] if
(fsm.frame + 1) % int(service_list[msg.which()].frequency / service_list[s].frequency) == 0]
if "sendcan" in socks and fsm.frame < 2000:
socks.remove("sendcan")
return socks, len(socks) > 0
def radar_rcv_callback(msg, CP, cfg, fsm):
if msg.which() != "can":
return [], False
elif CP.radarUnavailable:
return ["radarState", "liveTracks"], True
return False
radar_msgs = {"honda": [0x445], "toyota": [0x19f, 0x22f], "gm": [0x474],
"chrysler": [0x2d4]}.get(CP.carName, None)
socks = [
s for s in cfg.subs if
frame % int(service_list[msg.which()].frequency / service_list[s].frequency) == 0
]
if "sendcan" in socks and (frame - 1) < 2000:
socks.remove("sendcan")
return len(socks) > 0
if radar_msgs is None:
raise NotImplementedError
for m in msg.can:
if m.src == 1 and m.address in radar_msgs:
return ["radarState", "liveTracks"], True
return [], False
def radar_rcv_callback(msg, CP, cfg, frame):
return msg.which() == "can"
def calibration_rcv_callback(msg, CP, cfg, fsm):
def calibration_rcv_callback(msg, CP, cfg, frame):
# calibrationd publishes 1 calibrationData every 5 cameraOdometry packets.
# should_recv always true to increment frame
recv_socks = []
frame = fsm.frame + 1 # incrementing hasn't happened yet in SubMaster
if frame == 0 or (msg.which() == 'cameraOdometry' and (frame % 5) == 0):
recv_socks = ["liveCalibration"]
return recv_socks, fsm.frame == 0 or msg.which() == 'cameraOdometry'
return (frame - 1) == 0 or msg.which() == 'cameraOdometry'
def torqued_rcv_callback(msg, CP, cfg, fsm):
def torqued_rcv_callback(msg, CP, cfg, frame):
# should_recv always true to increment frame
recv_socks = []
frame = fsm.frame + 1 # incrementing hasn't happened yet in SubMaster
if msg.which() == 'liveLocationKalman' and (frame % 5) == 0:
recv_socks = ["liveTorqueParameters"]
return recv_socks, fsm.frame == 0 or msg.which() == 'liveLocationKalman'
return (frame - 1) == 0 or msg.which() == 'liveLocationKalman'
def locationd_rcv_callback(msg, CP, cfg, fsm):
trigger_msg = "accelerometer" if CP is not None and CP.notCar else "cameraOdometry"
if msg.which() == trigger_msg:
return ["liveLocationKalman"], True
class FrequencyBasedRcvCallback:
def __init__(self, trigger_msg_type):
self.trigger_msg_type = trigger_msg_type
return [], False
def __call__(self, msg, CP, cfg, frame):
if msg.which() != self.trigger_msg_type:
return False
resp_sockets = [
s for s in cfg.subs
if frame % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0
]
return bool(len(resp_sockets))
def laikad_config_pubsub_callback(params, cfg):
ublox = params.get_bool("UbloxAvailable")
drained_key = "ubloxGnss" if ublox else "qcomGnss"
sub_keys = ({"qcomGnss", } if ublox else {"ubloxGnss", })
return set(cfg.pubs) - sub_keys, drained_key
def locationd_config_pubsub_callback(params, cfg):
ublox = params.get_bool("UbloxAvailable")
sub_keys = ({"gpsLocation", } if ublox else {"gpsLocationExternal", })
return set(cfg.pubs) - sub_keys, None
CONFIGS = [
ProcessConfig(
proc_name="controlsd",
pub_sub={
"can": ["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"],
"deviceState": [], "pandaStates": [], "peripheralState": [], "liveCalibration": [], "driverMonitoringState": [],
"longitudinalPlan": [], "lateralPlan": [], "liveLocationKalman": [], "liveParameters": [], "radarState": [],
"modelV2": [], "driverCameraState": [], "roadCameraState": [], "wideRoadCameraState": [], "managerState": [],
"testJoystick": [], "liveTorqueParameters": [],
},
pubs=[
"can", "deviceState", "pandaStates", "peripheralState", "liveCalibration", "driverMonitoringState",
"longitudinalPlan", "lateralPlan", "liveLocationKalman", "liveParameters", "radarState",
"modelV2", "driverCameraState", "roadCameraState", "wideRoadCameraState", "managerState",
"testJoystick", "liveTorqueParameters"
],
subs=["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"],
ignore=["logMonoTime", "valid", "controlsState.startMonoTime", "controlsState.cumLagMs"],
init_callback=fingerprint,
config_callback=None,
init_callback=controlsd_fingerprint_callback,
should_recv_callback=controlsd_rcv_callback,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
submaster_config={
'ignore_avg_freq': ['radarState', 'longitudinalPlan', 'driverCameraState', 'driverMonitoringState'], # dcam is expected at 20 Hz
'ignore_alive': [],
}
simulation=False,
drained_pub="can",
),
ProcessConfig(
proc_name="radard",
pub_sub={
"can": ["radarState", "liveTracks"],
"carState": [], "modelV2": [],
},
pubs=["can", "carState", "modelV2"],
subs=["radarState", "liveTracks"],
ignore=["logMonoTime", "valid", "radarState.cumLagMs"],
init_callback=get_car_params,
config_callback=None,
init_callback=get_car_params_callback,
should_recv_callback=radar_rcv_callback,
tolerance=None,
fake_pubsubmaster=True,
drained_pub="can",
),
ProcessConfig(
proc_name="plannerd",
pub_sub={
"modelV2": ["lateralPlan", "longitudinalPlan", "uiPlan"],
"carControl": [], "carState": [], "controlsState": [], "radarState": [],
},
pubs=["modelV2", "carControl", "carState", "controlsState", "radarState"],
subs=["lateralPlan", "longitudinalPlan", "uiPlan"],
ignore=["logMonoTime", "valid", "longitudinalPlan.processingDelay", "longitudinalPlan.solverExecutionTime", "lateralPlan.solverExecutionTime"],
init_callback=get_car_params,
should_recv_callback=None,
config_callback=None,
init_callback=get_car_params_callback,
should_recv_callback=FrequencyBasedRcvCallback("modelV2"),
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
),
ProcessConfig(
proc_name="calibrationd",
pub_sub={
"carState": [],
"cameraOdometry": ["liveCalibration"],
"carParams": [],
},
pubs=["carState", "cameraOdometry", "carParams"],
subs=["liveCalibration"],
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
config_callback=None,
init_callback=get_car_params_callback,
should_recv_callback=calibration_rcv_callback,
tolerance=None,
fake_pubsubmaster=True,
),
ProcessConfig(
proc_name="dmonitoringd",
pub_sub={
"driverStateV2": ["driverMonitoringState"],
"liveCalibration": [], "carState": [], "modelV2": [], "controlsState": [],
},
pubs=["driverStateV2", "liveCalibration", "carState", "modelV2", "controlsState"],
subs=["driverMonitoringState"],
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=None,
config_callback=None,
init_callback=get_car_params_callback,
should_recv_callback=FrequencyBasedRcvCallback("driverStateV2"),
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
),
ProcessConfig(
proc_name="locationd",
pub_sub={
"cameraOdometry": ["liveLocationKalman"],
"accelerometer": ["liveLocationKalman"], "gyroscope": [],
"gpsLocationExternal": [], "liveCalibration": [],
"carParams": [], "carState": [], "gpsLocation": [],
},
pubs=[
"cameraOdometry", "accelerometer", "gyroscope", "gpsLocationExternal",
"liveCalibration", "carState", "carParams", "gpsLocation"
],
subs=["liveLocationKalman"],
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=locationd_rcv_callback,
config_callback=locationd_config_pubsub_callback,
init_callback=get_car_params_callback,
should_recv_callback=None,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=False,
),
ProcessConfig(
proc_name="paramsd",
pub_sub={
"liveLocationKalman": ["liveParameters"],
"carState": []
},
pubs=["liveLocationKalman", "carState"],
subs=["liveParameters"],
ignore=["logMonoTime", "valid"],
init_callback=get_car_params,
should_recv_callback=None,
config_callback=None,
init_callback=get_car_params_callback,
should_recv_callback=FrequencyBasedRcvCallback("liveLocationKalman"),
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
),
ProcessConfig(
proc_name="ubloxd",
pub_sub={
"ubloxRaw": ["ubloxGnss", "gpsLocationExternal"],
},
pubs=["ubloxRaw"],
subs=["ubloxGnss", "gpsLocationExternal"],
ignore=["logMonoTime"],
config_callback=None,
init_callback=None,
should_recv_callback=None,
tolerance=None,
fake_pubsubmaster=False,
iter_wait_time=0.01,
drain_sockets=True
),
ProcessConfig(
proc_name="laikad",
pub_sub={
"ubloxGnss": ["gnssMeasurements"],
"qcomGnss": ["gnssMeasurements"],
},
pubs=["ubloxGnss", "qcomGnss"],
subs=["gnssMeasurements"],
ignore=["logMonoTime"],
init_callback=get_car_params,
config_callback=laikad_config_pubsub_callback,
init_callback=get_car_params_callback,
should_recv_callback=None,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=False,
timeout=60*10, # first messages are blocked on internet assistance
drained_pub="ubloxGnss", # config_callback will switch this to qcom if needed
),
ProcessConfig(
proc_name="torqued",
pub_sub={
"carControl": [], "carState": [],
"liveLocationKalman": ["liveTorqueParameters"],
},
pubs=["liveLocationKalman", "carState", "carControl"],
subs=["liveTorqueParameters"],
ignore=["logMonoTime"],
init_callback=get_car_params,
config_callback=None,
init_callback=get_car_params_callback,
should_recv_callback=torqued_rcv_callback,
tolerance=NUMPY_TOLERANCE,
fake_pubsubmaster=True,
),
]
def get_process_config(name):
try:
return next(c for c in CONFIGS if c.proc_name == name)
except StopIteration as ex:
raise Exception(f"Cannot find process config with name: {name}") from ex
def replay_process(cfg, lr, fingerprint=None):
with OpenpilotPrefix():
if cfg.fake_pubsubmaster:
return python_replay_process(cfg, lr, fingerprint)
controlsState = None
initialized = False
if cfg.proc_name == "controlsd":
for msg in lr:
if msg.which() == 'controlsState':
controlsState = msg.controlsState
if initialized:
break
elif msg.which() == 'carEvents':
initialized = car.CarEvent.EventName.controlsInitializing not in [e.name for e in msg.carEvents]
assert controlsState is not None and initialized, "controlsState never initialized"
CP = [m for m in lr if m.which() == 'carParams'][0].carParams
if fingerprint is not None:
setup_env(cfg=cfg, controlsState=controlsState, lr=lr, fingerprint=fingerprint)
else:
return replay_process_with_sockets(cfg, lr, fingerprint)
setup_env(CP=CP, cfg=cfg, controlsState=controlsState, lr=lr)
if cfg.config_callback is not None:
params = Params()
cfg.pubs, cfg.drained_pub = cfg.config_callback(params, cfg)
def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None):
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
pub_msgs = [msg for msg in all_msgs if msg.which() in set(cfg.pubs)]
with ReplayContext(cfg) as rc:
pm = messaging.PubMaster(cfg.pubs)
sockets = {s: messaging.sub_sock(s, timeout=100) for s in cfg.subs}
managed_processes[cfg.proc_name].prepare()
managed_processes[cfg.proc_name].start()
if cfg.init_callback is not None:
cfg.init_callback(rc, pm, all_msgs, fingerprint)
CP = car.CarParams.from_bytes(Params().get("CarParams", block=True))
log_msgs, msg_queue = [], []
try:
# Wait for process to startup
with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(cfg.proc_name)}"):
while not all(pm.all_readers_updated(s) for s in cfg.pubs):
time.sleep(0)
# Do the replay
cnt = 0
for msg in pub_msgs:
with Timeout(cfg.timeout, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"):
resp_sockets, end_of_cycle = cfg.subs, True
if cfg.should_recv_callback is not None:
end_of_cycle = cfg.should_recv_callback(msg, CP, cfg, cnt)
msg_queue.append(msg)
if end_of_cycle:
rc.wait_for_recv_called()
# call recv to let sub-sockets reconnect, after we know the process is ready
if cnt == 0:
for s in sockets.values():
messaging.recv_one_or_none(s)
for m in msg_queue:
pm.send(m.which(), m.as_builder())
msg_queue = []
rc.unlock_sockets()
rc.wait_for_next_recv(True)
for s in resp_sockets:
ms = messaging.drain_sock(sockets[s])
for m in ms:
m = m.as_builder()
m.logMonoTime = msg.logMonoTime
log_msgs.append(m.as_reader())
cnt += 1
assert(managed_processes[cfg.proc_name].proc.is_alive())
finally:
managed_processes[cfg.proc_name].signal(signal.SIGKILL)
managed_processes[cfg.proc_name].stop()
return log_msgs
def setup_env(CP=None, cfg=None, controlsState=None, lr=None, fingerprint=None):
params = Params()
params.clear_all()
params.put_bool("OpenpilotEnabledToggle", True)
@ -407,6 +428,10 @@ def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None):
os.environ["NO_RADAR_SLEEP"] = "1"
os.environ["REPLAY"] = "1"
if fingerprint is not None:
os.environ['SKIP_FW_QUERY'] = "1"
os.environ['FINGERPRINT'] = fingerprint
else:
os.environ["SKIP_FW_QUERY"] = ""
os.environ["FINGERPRINT"] = ""
@ -424,7 +449,7 @@ def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None):
os.environ.update(cfg.environ)
os.environ['PROC_NAME'] = cfg.proc_name
if simulation:
if cfg is not None and cfg.simulation:
os.environ["SIMULATION"] = "1"
elif "SIMULATION" in os.environ:
del os.environ["SIMULATION"]
@ -440,8 +465,11 @@ def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None):
if CP.alternativeExperience == ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS:
params.put_bool("DisengageOnAccelerator", False)
if fingerprint is None:
if CP.fingerprintSource == "fw":
params.put("CarParamsCache", CP.as_builder().to_bytes())
os.environ['SKIP_FW_QUERY'] = ""
os.environ['FINGERPRINT'] = ""
else:
os.environ['SKIP_FW_QUERY'] = "1"
os.environ['FINGERPRINT'] = CP.carFingerprint
@ -450,171 +478,6 @@ def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None):
params.put_bool("ExperimentalLongitudinalEnabled", True)
def python_replay_process(cfg, lr, fingerprint=None):
sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub]
pub_sockets = [s for s in cfg.pub_sub.keys() if s != 'can']
fsm = FakeSubMaster(pub_sockets, **cfg.submaster_config)
fpm = FakePubMaster(sub_sockets)
can_sock = None
args = (fsm, fpm)
if 'can' in list(cfg.pub_sub.keys()):
can_sock = FakeSocket()
args = (fsm, fpm, can_sock)
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())]
controlsState = None
initialized = False
for msg in lr:
if msg.which() == 'controlsState':
controlsState = msg.controlsState
if initialized:
break
elif msg.which() == 'carEvents':
initialized = car.CarEvent.EventName.controlsInitializing not in [e.name for e in msg.carEvents]
assert controlsState is not None and initialized, "controlsState never initialized"
if fingerprint is not None:
os.environ['SKIP_FW_QUERY'] = "1"
os.environ['FINGERPRINT'] = fingerprint
setup_env(cfg=cfg, controlsState=controlsState, lr=lr)
else:
CP = [m for m in lr if m.which() == 'carParams'][0].carParams
setup_env(CP=CP, cfg=cfg, controlsState=controlsState, lr=lr)
assert(type(managed_processes[cfg.proc_name]) is PythonProcess)
managed_processes[cfg.proc_name].prepare()
mod = importlib.import_module(managed_processes[cfg.proc_name].module)
thread = threading.Thread(target=mod.main, args=args)
thread.daemon = True
thread.start()
if cfg.init_callback is not None:
if 'can' not in list(cfg.pub_sub.keys()):
can_sock = None
cfg.init_callback(all_msgs, fsm, can_sock, fingerprint)
CP = car.CarParams.from_bytes(Params().get("CarParams", block=True))
# wait for started process to be ready
if 'can' in list(cfg.pub_sub.keys()):
can_sock.wait_for_recv()
else:
fsm.wait_for_update()
log_msgs, msg_queue = [], []
for msg in pub_msgs:
recv_socks = cfg.pub_sub[msg.which()]
if cfg.should_recv_callback is not None:
_, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm)
else:
socks = [s for s in cfg.pub_sub[msg.which()] if
(fsm.frame + 1) % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0]
should_recv = bool(len(socks))
if msg.which() == 'can':
can_sock.send(msg.as_builder().to_bytes())
else:
msg_queue.append(msg.as_builder())
if should_recv:
fsm.update_msgs(msg.logMonoTime / 1e9, msg_queue)
msg_queue = []
if can_sock is not None:
can_sock.recv_called.wait()
else:
fsm.update_called.wait()
for s in recv_socks:
ms = fpm.drain(s)
for m in ms:
m = m.as_builder()
m.logMonoTime = msg.logMonoTime
log_msgs.append(m.as_reader())
return log_msgs
def replay_process_with_sockets(cfg, lr, fingerprint=None):
pm = messaging.PubMaster(cfg.pub_sub.keys())
sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub]
sockets = {s: messaging.sub_sock(s, timeout=100) for s in sub_sockets}
all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime)
pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())]
# We need to fake SubMaster alive since we can't inject a fake clock
CP = [m for m in lr if m.which() == 'carParams'][0].carParams
setup_env(simulation=True, CP=CP, cfg=cfg, lr=lr)
if cfg.proc_name == "laikad":
ublox = Params().get_bool("UbloxAvailable")
keys = set(cfg.pub_sub.keys()) - ({"qcomGnss", } if ublox else {"ubloxGnss", })
pub_msgs = [msg for msg in pub_msgs if msg.which() in keys]
managed_processes[cfg.proc_name].prepare()
managed_processes[cfg.proc_name].start()
if cfg.init_callback is not None:
cfg.init_callback(all_msgs, None, None, fingerprint)
CP = car.CarParams.from_bytes(Params().get("CarParams", block=True))
log_msgs = []
try:
# Wait for process to startup
with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(cfg.proc_name)}"):
while not any(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()):
time.sleep(0)
for s in sockets.values():
messaging.recv_one_or_none(s)
# Do the replay
cnt = 0
curr_CP = None
for msg in pub_msgs:
with Timeout(cfg.timeout, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"):
if msg.which() == 'carParams':
curr_CP = msg.carParams
resp_sockets = cfg.pub_sub[msg.which()]
if cfg.should_recv_callback is not None:
resp_sockets, _ = cfg.should_recv_callback(msg, curr_CP, cfg, None)
# Make sure all subscribers are connected
if len(log_msgs) == 0 and len(resp_sockets) > 0:
for s in sockets.values():
messaging.recv_one_or_none(s)
pm.send(msg.which(), msg.as_builder())
while not pm.all_readers_updated(msg.which()):
time.sleep(0)
time.sleep(cfg.iter_wait_time)
for s in resp_sockets:
msgs = []
if cfg.drain_sockets:
msgs.extend(messaging.drain_sock(sockets[s], wait_for_one=True))
else:
msgs = [messaging.recv_one_retry(sockets[s])]
for m in msgs:
m = m.as_builder()
m.logMonoTime = msg.logMonoTime
log_msgs.append(m.as_reader())
cnt += 1
finally:
managed_processes[cfg.proc_name].signal(signal.SIGKILL)
managed_processes[cfg.proc_name].stop()
return log_msgs
def check_enabled(msgs):
cur_enabled_count = 0
max_enabled_count = 0

@ -283,10 +283,9 @@ def regen_segment(lr, frs=None, daemons="all", outdir=FAKEDATA, disable_tqdm=Fal
# TODO add configs for modeld, dmonitoringmodeld
fakeable_daemons = {}
for config in CONFIGS:
replayable_messages = set([msg for sub in config.pub_sub.values() for msg in sub])
processes = [
multiprocessing.Process(target=replay_service, args=(msg, lr))
for msg in replayable_messages
for msg in config.subs
]
fakeable_daemons[config.proc_name] = processes

@ -99,7 +99,7 @@ def get_strategy_for_events(event_types, finite=False):
def get_strategy_for_process(process, finite=False):
return get_strategy_for_events(get_process_config(process).pub_sub.keys(), finite)
return get_strategy_for_events(get_process_config(process).pubs, finite)
def convert_to_lr(msgs):

@ -25,7 +25,7 @@ CARS = {
def get_inputs(msgs, process, fingerprint):
for config in CONFIGS:
if config.proc_name == process:
sub_socks = list(config.pub_sub.keys())
sub_socks = list(config.pubs)
trigger = sub_socks[0]
break

Loading…
Cancel
Save