diff --git a/cereal b/cereal index e0cf7e09ae..9b6b53396f 160000 --- a/cereal +++ b/cereal @@ -1 +1 @@ -Subproject commit e0cf7e09ae2864034bc370fb6c3db71a83eaf3e6 +Subproject commit 9b6b53396fff6ec7541a5415901d892e57756b91 diff --git a/release/files_common b/release/files_common index 6f671e1781..0280d14b63 100644 --- a/release/files_common +++ b/release/files_common @@ -469,6 +469,7 @@ body/crypto/** cereal/.gitignore cereal/__init__.py cereal/car.capnp +cereal/custom.capnp cereal/legacy.capnp cereal/log.capnp cereal/services.py @@ -478,6 +479,10 @@ cereal/logger/logger.h cereal/messaging/.gitignore cereal/messaging/__init__.py cereal/messaging/bridge.cc +cereal/messaging/event.cc +cereal/messaging/event.h +cereal/messaging/impl_fake.cc +cereal/messaging/impl_fake.h cereal/messaging/impl_msgq.cc cereal/messaging/impl_msgq.h cereal/messaging/impl_zmq.cc diff --git a/selfdrive/controls/tests/test_alerts.py b/selfdrive/controls/tests/test_alerts.py index 9ed7eee122..60c080163f 100755 --- a/selfdrive/controls/tests/test_alerts.py +++ b/selfdrive/controls/tests/test_alerts.py @@ -6,11 +6,12 @@ import random from PIL import Image, ImageDraw, ImageFont from cereal import log, car +from cereal.messaging import SubMaster from common.basedir import BASEDIR from common.params import Params from selfdrive.controls.lib.events import Alert, EVENTS, ET from selfdrive.controls.lib.alertmanager import set_offroad_alert -from selfdrive.test.process_replay.process_replay import FakeSubMaster, CONFIGS +from selfdrive.test.process_replay.process_replay import CONFIGS AlertSize = log.ControlsState.AlertSize @@ -34,7 +35,7 @@ class TestAlerts(unittest.TestCase): cls.CS = car.CarState.new_message() cls.CP = car.CarParams.new_message() cfg = [c for c in CONFIGS if c.proc_name == 'controlsd'][0] - cls.sm = FakeSubMaster(cfg.pub_sub.keys()) + cls.sm = SubMaster(cfg.pubs) def test_events_defined(self): # Ensure all events in capnp schema are defined in events.py diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 65415b4151..0a572a7502 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -1,27 +1,21 @@ #!/usr/bin/env python3 -import importlib import os -import sys -import threading import time import signal -from collections import defaultdict +from collections import OrderedDict from dataclasses import dataclass, field from typing import Dict, List, Optional, Callable -import capnp - import cereal.messaging as messaging -from cereal import car, log +from cereal import car from cereal.services import service_list from common.params import Params from common.timeout import Timeout from common.realtime import DT_CTRL from panda.python import ALTERNATIVE_EXPERIENCE from selfdrive.car.car_helpers import get_car, interfaces -from selfdrive.test.process_replay.helpers import OpenpilotPrefix -from selfdrive.manager.process import PythonProcess from selfdrive.manager.process_config import managed_processes +from selfdrive.test.process_replay.helpers import OpenpilotPrefix # Numpy gives different results based on CPU features after version 19 NUMPY_TOLERANCE = 1e-7 @@ -30,373 +24,400 @@ TIMEOUT = 15 PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__)) FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/") + +class ReplayContext: + def __init__(self, cfg): + self.proc_name = cfg.proc_name + self.pubs = cfg.pubs + self.drained_pub = cfg.drained_pub + assert(len(self.pubs) != 0 or self.drained_pub is not None) + + def __enter__(self): + messaging.toggle_fake_events(True) + messaging.set_fake_prefix(self.proc_name) + + if self.drained_pub is None: + self.events = OrderedDict() + for pub in self.pubs: + self.events[pub] = messaging.fake_event_handle(pub, enable=True) + else: + self.events = {self.drained_pub: messaging.fake_event_handle(self.drained_pub, enable=True)} + + return self + + def __exit__(self, exc_type, exc_obj, exc_tb): + del self.events + + messaging.toggle_fake_events(False) + messaging.delete_fake_prefix() + + @property + def all_recv_called_events(self): + return [man.recv_called_event for man in self.events.values()] + + @property + def all_recv_ready_events(self): + return [man.recv_ready_event for man in self.events.values()] + + def send_sync(self, pm, endpoint, dat): + self.events[endpoint].recv_called_event.wait() + self.events[endpoint].recv_called_event.clear() + pm.send(endpoint, dat) + self.events[endpoint].recv_ready_event.set() + + def unlock_sockets(self): + expected_sets = len(self.events) + while expected_sets > 0: + index = messaging.wait_for_one_event(self.all_recv_called_events) + self.all_recv_called_events[index].clear() + self.all_recv_ready_events[index].set() + expected_sets -= 1 + + def wait_for_recv_called(self): + messaging.wait_for_one_event(self.all_recv_called_events) + + def wait_for_next_recv(self, end_of_cycle): + index = messaging.wait_for_one_event(self.all_recv_called_events) + if self.drained_pub is not None and end_of_cycle: + self.all_recv_called_events[index].clear() + self.all_recv_ready_events[index].set() + self.all_recv_called_events[index].wait() + + @dataclass class ProcessConfig: proc_name: str - pub_sub: Dict[str, List[str]] + pubs: List[str] + subs: List[str] ignore: List[str] + config_callback: Optional[Callable] init_callback: Optional[Callable] should_recv_callback: Optional[Callable] - tolerance: Optional[float] - fake_pubsubmaster: bool - submaster_config: Dict[str, List[str]] = field(default_factory=dict) + tolerance: Optional[float] = None environ: Dict[str, str] = field(default_factory=dict) subtest_name: str = "" field_tolerances: Dict[str, float] = field(default_factory=dict) timeout: int = 30 - iter_wait_time: float = 0.0 - drain_sockets: bool = False + simulation: bool = True + drained_pub: Optional[str] = None -def wait_for_event(evt): - if not evt.wait(TIMEOUT): - if threading.currentThread().getName() == "MainThread": - # tested process likely died. don't let test just hang - raise Exception(f"Timeout reached. Tested process {os.environ['PROC_NAME']} likely crashed.") - else: - # done testing this process, let it die - sys.exit(0) - - -class FakeSocket: - def __init__(self, wait=True): +class DummySocket: + def __init__(self): self.data = [] - self.wait = wait - self.recv_called = threading.Event() - self.recv_ready = threading.Event() def receive(self, non_blocking=False): if non_blocking: return None - if self.wait: - self.recv_called.set() - wait_for_event(self.recv_ready) - self.recv_ready.clear() - return self.data.pop(0) + return self.data.pop() def send(self, data): - if self.wait: - wait_for_event(self.recv_called) - self.recv_called.clear() - self.data.append(data) - if self.wait: - self.recv_ready.set() - - def wait_for_recv(self): - wait_for_event(self.recv_called) - - -class DumbSocket: - def __init__(self, s=None): - if s is not None: - try: - dat = messaging.new_message(s) - except capnp.lib.capnp.KjException: # pylint: disable=c-extension-no-member - # lists - dat = messaging.new_message(s, 0) - - self.data = dat.to_bytes() - - def receive(self, non_blocking=False): - return self.data - - def send(self, dat): - pass - - -class FakeSubMaster(messaging.SubMaster): - def __init__(self, services, ignore_alive=None, ignore_avg_freq=None): - super().__init__(services, ignore_alive=ignore_alive, ignore_avg_freq=ignore_avg_freq, addr=None) - self.sock = {s: DumbSocket(s) for s in services} - self.update_called = threading.Event() - self.update_ready = threading.Event() - self.wait_on_getitem = False - - def __getitem__(self, s): - # hack to know when fingerprinting is done - if self.wait_on_getitem: - self.update_called.set() - wait_for_event(self.update_ready) - self.update_ready.clear() - return self.data[s] - - def update(self, timeout=-1): - self.update_called.set() - wait_for_event(self.update_ready) - self.update_ready.clear() - - def update_msgs(self, cur_time, msgs): - wait_for_event(self.update_called) - self.update_called.clear() - super().update_msgs(cur_time, msgs) - self.update_ready.set() - - def wait_for_update(self): - wait_for_event(self.update_called) - - -class FakePubMaster(messaging.PubMaster): - def __init__(self, services): # pylint: disable=super-init-not-called - self.data = defaultdict(list) - self.sock = {} - self.last_updated = None - for s in services: - self.sock[s] = DumbSocket() - - def send(self, s, dat): - self.last_updated = s - if isinstance(dat, bytes): - self.data[s].append(log.Event.from_bytes(dat)) - else: - self.data[s].append(dat.as_reader()) - def drain(self, s): - msgs = self.data[s] - self.data[s] = [] - - return msgs - - -def fingerprint(msgs, fsm, can_sock, fingerprint): +def controlsd_fingerprint_callback(rc, pm, msgs, fingerprint): print("start fingerprinting") - fsm.wait_on_getitem = True - - # populate fake socket with data for fingerprinting - canmsgs = [msg for msg in msgs if msg.which() == "can"] - wait_for_event(can_sock.recv_called) - can_sock.recv_called.clear() - can_sock.data = [msg.as_builder().to_bytes() for msg in canmsgs[:300]] - can_sock.recv_ready.set() - can_sock.wait = False + params = Params() + canmsgs = [msg for msg in msgs if msg.which() == "can"][:300] - # we know fingerprinting is done when controlsd sets sm['lateralPlan'].sensorValid - wait_for_event(fsm.update_called) - fsm.update_called.clear() + # controlsd expects one arbitrary can and pandaState + rc.send_sync(pm, "can", messaging.new_message("can", 1)) + pm.send("pandaStates", messaging.new_message("pandaStates", 1)) + rc.send_sync(pm, "can", messaging.new_message("can", 1)) + rc.wait_for_next_recv(True) - fsm.wait_on_getitem = False - can_sock.wait = True - can_sock.data = [] + # fingerprinting is done, when CarParams is set + while params.get("CarParams") is None: + if len(canmsgs) == 0: + raise ValueError("Fingerprinting failed. Run out of can msgs") - fsm.update_ready.set() + m = canmsgs.pop(0) + rc.send_sync(pm, "can", m.as_builder().to_bytes()) + rc.wait_for_next_recv(False) -def get_car_params(msgs, fsm, can_sock, fingerprint): +def get_car_params_callback(rc, pm, msgs, fingerprint): if fingerprint: CarInterface, _, _ = interfaces[fingerprint] CP = CarInterface.get_non_essential_params(fingerprint) else: - can = FakeSocket(wait=False) - sendcan = FakeSocket(wait=False) + can = DummySocket() + sendcan = DummySocket() - canmsgs = [msg for msg in msgs if msg.which() == 'can'] + canmsgs = [msg for msg in msgs if msg.which() == "can"] for m in canmsgs[:300]: can.send(m.as_builder().to_bytes()) _, CP = get_car(can, sendcan, Params().get_bool("ExperimentalLongitudinalEnabled")) Params().put("CarParams", CP.to_bytes()) -def controlsd_rcv_callback(msg, CP, cfg, fsm): +def controlsd_rcv_callback(msg, CP, cfg, frame): # no sendcan until controlsd is initialized - socks = [s for s in cfg.pub_sub[msg.which()] if - (fsm.frame + 1) % int(service_list[msg.which()].frequency / service_list[s].frequency) == 0] - if "sendcan" in socks and fsm.frame < 2000: - socks.remove("sendcan") - return socks, len(socks) > 0 - - -def radar_rcv_callback(msg, CP, cfg, fsm): if msg.which() != "can": - return [], False - elif CP.radarUnavailable: - return ["radarState", "liveTracks"], True + return False - radar_msgs = {"honda": [0x445], "toyota": [0x19f, 0x22f], "gm": [0x474], - "chrysler": [0x2d4]}.get(CP.carName, None) + socks = [ + s for s in cfg.subs if + frame % int(service_list[msg.which()].frequency / service_list[s].frequency) == 0 + ] + if "sendcan" in socks and (frame - 1) < 2000: + socks.remove("sendcan") + return len(socks) > 0 - if radar_msgs is None: - raise NotImplementedError - for m in msg.can: - if m.src == 1 and m.address in radar_msgs: - return ["radarState", "liveTracks"], True - return [], False +def radar_rcv_callback(msg, CP, cfg, frame): + return msg.which() == "can" -def calibration_rcv_callback(msg, CP, cfg, fsm): +def calibration_rcv_callback(msg, CP, cfg, frame): # calibrationd publishes 1 calibrationData every 5 cameraOdometry packets. # should_recv always true to increment frame - recv_socks = [] - frame = fsm.frame + 1 # incrementing hasn't happened yet in SubMaster - if frame == 0 or (msg.which() == 'cameraOdometry' and (frame % 5) == 0): - recv_socks = ["liveCalibration"] - return recv_socks, fsm.frame == 0 or msg.which() == 'cameraOdometry' + return (frame - 1) == 0 or msg.which() == 'cameraOdometry' -def torqued_rcv_callback(msg, CP, cfg, fsm): +def torqued_rcv_callback(msg, CP, cfg, frame): # should_recv always true to increment frame - recv_socks = [] - frame = fsm.frame + 1 # incrementing hasn't happened yet in SubMaster - if msg.which() == 'liveLocationKalman' and (frame % 5) == 0: - recv_socks = ["liveTorqueParameters"] - return recv_socks, fsm.frame == 0 or msg.which() == 'liveLocationKalman' + return (frame - 1) == 0 or msg.which() == 'liveLocationKalman' + + +class FrequencyBasedRcvCallback: + def __init__(self, trigger_msg_type): + self.trigger_msg_type = trigger_msg_type + + def __call__(self, msg, CP, cfg, frame): + if msg.which() != self.trigger_msg_type: + return False + resp_sockets = [ + s for s in cfg.subs + if frame % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0 + ] + return bool(len(resp_sockets)) -def locationd_rcv_callback(msg, CP, cfg, fsm): - trigger_msg = "accelerometer" if CP is not None and CP.notCar else "cameraOdometry" - if msg.which() == trigger_msg: - return ["liveLocationKalman"], True + +def laikad_config_pubsub_callback(params, cfg): + ublox = params.get_bool("UbloxAvailable") + drained_key = "ubloxGnss" if ublox else "qcomGnss" + sub_keys = ({"qcomGnss", } if ublox else {"ubloxGnss", }) + + return set(cfg.pubs) - sub_keys, drained_key + + +def locationd_config_pubsub_callback(params, cfg): + ublox = params.get_bool("UbloxAvailable") + sub_keys = ({"gpsLocation", } if ublox else {"gpsLocationExternal", }) - return [], False + return set(cfg.pubs) - sub_keys, None CONFIGS = [ ProcessConfig( proc_name="controlsd", - pub_sub={ - "can": ["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"], - "deviceState": [], "pandaStates": [], "peripheralState": [], "liveCalibration": [], "driverMonitoringState": [], - "longitudinalPlan": [], "lateralPlan": [], "liveLocationKalman": [], "liveParameters": [], "radarState": [], - "modelV2": [], "driverCameraState": [], "roadCameraState": [], "wideRoadCameraState": [], "managerState": [], - "testJoystick": [], "liveTorqueParameters": [], - }, + pubs=[ + "can", "deviceState", "pandaStates", "peripheralState", "liveCalibration", "driverMonitoringState", + "longitudinalPlan", "lateralPlan", "liveLocationKalman", "liveParameters", "radarState", + "modelV2", "driverCameraState", "roadCameraState", "wideRoadCameraState", "managerState", + "testJoystick", "liveTorqueParameters" + ], + subs=["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"], ignore=["logMonoTime", "valid", "controlsState.startMonoTime", "controlsState.cumLagMs"], - init_callback=fingerprint, + config_callback=None, + init_callback=controlsd_fingerprint_callback, should_recv_callback=controlsd_rcv_callback, tolerance=NUMPY_TOLERANCE, - fake_pubsubmaster=True, - submaster_config={ - 'ignore_avg_freq': ['radarState', 'longitudinalPlan', 'driverCameraState', 'driverMonitoringState'], # dcam is expected at 20 Hz - 'ignore_alive': [], - } + simulation=False, + drained_pub="can", ), ProcessConfig( proc_name="radard", - pub_sub={ - "can": ["radarState", "liveTracks"], - "carState": [], "modelV2": [], - }, + pubs=["can", "carState", "modelV2"], + subs=["radarState", "liveTracks"], ignore=["logMonoTime", "valid", "radarState.cumLagMs"], - init_callback=get_car_params, + config_callback=None, + init_callback=get_car_params_callback, should_recv_callback=radar_rcv_callback, - tolerance=None, - fake_pubsubmaster=True, + drained_pub="can", ), ProcessConfig( proc_name="plannerd", - pub_sub={ - "modelV2": ["lateralPlan", "longitudinalPlan", "uiPlan"], - "carControl": [], "carState": [], "controlsState": [], "radarState": [], - }, + pubs=["modelV2", "carControl", "carState", "controlsState", "radarState"], + subs=["lateralPlan", "longitudinalPlan", "uiPlan"], ignore=["logMonoTime", "valid", "longitudinalPlan.processingDelay", "longitudinalPlan.solverExecutionTime", "lateralPlan.solverExecutionTime"], - init_callback=get_car_params, - should_recv_callback=None, + config_callback=None, + init_callback=get_car_params_callback, + should_recv_callback=FrequencyBasedRcvCallback("modelV2"), tolerance=NUMPY_TOLERANCE, - fake_pubsubmaster=True, ), ProcessConfig( proc_name="calibrationd", - pub_sub={ - "carState": [], - "cameraOdometry": ["liveCalibration"], - "carParams": [], - }, + pubs=["carState", "cameraOdometry", "carParams"], + subs=["liveCalibration"], ignore=["logMonoTime", "valid"], - init_callback=get_car_params, + config_callback=None, + init_callback=get_car_params_callback, should_recv_callback=calibration_rcv_callback, - tolerance=None, - fake_pubsubmaster=True, ), ProcessConfig( proc_name="dmonitoringd", - pub_sub={ - "driverStateV2": ["driverMonitoringState"], - "liveCalibration": [], "carState": [], "modelV2": [], "controlsState": [], - }, + pubs=["driverStateV2", "liveCalibration", "carState", "modelV2", "controlsState"], + subs=["driverMonitoringState"], ignore=["logMonoTime", "valid"], - init_callback=get_car_params, - should_recv_callback=None, + config_callback=None, + init_callback=get_car_params_callback, + should_recv_callback=FrequencyBasedRcvCallback("driverStateV2"), tolerance=NUMPY_TOLERANCE, - fake_pubsubmaster=True, ), ProcessConfig( proc_name="locationd", - pub_sub={ - "cameraOdometry": ["liveLocationKalman"], - "accelerometer": ["liveLocationKalman"], "gyroscope": [], - "gpsLocationExternal": [], "liveCalibration": [], - "carParams": [], "carState": [], "gpsLocation": [], - }, + pubs=[ + "cameraOdometry", "accelerometer", "gyroscope", "gpsLocationExternal", + "liveCalibration", "carState", "carParams", "gpsLocation" + ], + subs=["liveLocationKalman"], ignore=["logMonoTime", "valid"], - init_callback=get_car_params, - should_recv_callback=locationd_rcv_callback, + config_callback=locationd_config_pubsub_callback, + init_callback=get_car_params_callback, + should_recv_callback=None, tolerance=NUMPY_TOLERANCE, - fake_pubsubmaster=False, ), ProcessConfig( proc_name="paramsd", - pub_sub={ - "liveLocationKalman": ["liveParameters"], - "carState": [] - }, + pubs=["liveLocationKalman", "carState"], + subs=["liveParameters"], ignore=["logMonoTime", "valid"], - init_callback=get_car_params, - should_recv_callback=None, + config_callback=None, + init_callback=get_car_params_callback, + should_recv_callback=FrequencyBasedRcvCallback("liveLocationKalman"), tolerance=NUMPY_TOLERANCE, - fake_pubsubmaster=True, ), ProcessConfig( proc_name="ubloxd", - pub_sub={ - "ubloxRaw": ["ubloxGnss", "gpsLocationExternal"], - }, + pubs=["ubloxRaw"], + subs=["ubloxGnss", "gpsLocationExternal"], ignore=["logMonoTime"], + config_callback=None, init_callback=None, should_recv_callback=None, - tolerance=None, - fake_pubsubmaster=False, - iter_wait_time=0.01, - drain_sockets=True ), ProcessConfig( proc_name="laikad", - pub_sub={ - "ubloxGnss": ["gnssMeasurements"], - "qcomGnss": ["gnssMeasurements"], - }, + pubs=["ubloxGnss", "qcomGnss"], + subs=["gnssMeasurements"], ignore=["logMonoTime"], - init_callback=get_car_params, + config_callback=laikad_config_pubsub_callback, + init_callback=get_car_params_callback, should_recv_callback=None, tolerance=NUMPY_TOLERANCE, - fake_pubsubmaster=False, timeout=60*10, # first messages are blocked on internet assistance + drained_pub="ubloxGnss", # config_callback will switch this to qcom if needed ), ProcessConfig( proc_name="torqued", - pub_sub={ - "carControl": [], "carState": [], - "liveLocationKalman": ["liveTorqueParameters"], - }, + pubs=["liveLocationKalman", "carState", "carControl"], + subs=["liveTorqueParameters"], ignore=["logMonoTime"], - init_callback=get_car_params, + config_callback=None, + init_callback=get_car_params_callback, should_recv_callback=torqued_rcv_callback, tolerance=NUMPY_TOLERANCE, - fake_pubsubmaster=True, ), ] +def get_process_config(name): + try: + return next(c for c in CONFIGS if c.proc_name == name) + except StopIteration as ex: + raise Exception(f"Cannot find process config with name: {name}") from ex + + def replay_process(cfg, lr, fingerprint=None): with OpenpilotPrefix(): - if cfg.fake_pubsubmaster: - return python_replay_process(cfg, lr, fingerprint) + controlsState = None + initialized = False + if cfg.proc_name == "controlsd": + for msg in lr: + if msg.which() == 'controlsState': + controlsState = msg.controlsState + if initialized: + break + elif msg.which() == 'carEvents': + initialized = car.CarEvent.EventName.controlsInitializing not in [e.name for e in msg.carEvents] + + assert controlsState is not None and initialized, "controlsState never initialized" + + CP = [m for m in lr if m.which() == 'carParams'][0].carParams + if fingerprint is not None: + setup_env(cfg=cfg, controlsState=controlsState, lr=lr, fingerprint=fingerprint) else: - return replay_process_with_sockets(cfg, lr, fingerprint) + setup_env(CP=CP, cfg=cfg, controlsState=controlsState, lr=lr) + + if cfg.config_callback is not None: + params = Params() + cfg.pubs, cfg.drained_pub = cfg.config_callback(params, cfg) + + all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) + pub_msgs = [msg for msg in all_msgs if msg.which() in set(cfg.pubs)] + with ReplayContext(cfg) as rc: + pm = messaging.PubMaster(cfg.pubs) + sockets = {s: messaging.sub_sock(s, timeout=100) for s in cfg.subs} -def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None): + managed_processes[cfg.proc_name].prepare() + managed_processes[cfg.proc_name].start() + + if cfg.init_callback is not None: + cfg.init_callback(rc, pm, all_msgs, fingerprint) + CP = car.CarParams.from_bytes(Params().get("CarParams", block=True)) + + log_msgs, msg_queue = [], [] + try: + # Wait for process to startup + with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(cfg.proc_name)}"): + while not all(pm.all_readers_updated(s) for s in cfg.pubs): + time.sleep(0) + + # Do the replay + cnt = 0 + for msg in pub_msgs: + with Timeout(cfg.timeout, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"): + resp_sockets, end_of_cycle = cfg.subs, True + if cfg.should_recv_callback is not None: + end_of_cycle = cfg.should_recv_callback(msg, CP, cfg, cnt) + + msg_queue.append(msg) + if end_of_cycle: + rc.wait_for_recv_called() + + # call recv to let sub-sockets reconnect, after we know the process is ready + if cnt == 0: + for s in sockets.values(): + messaging.recv_one_or_none(s) + + for m in msg_queue: + pm.send(m.which(), m.as_builder()) + msg_queue = [] + + rc.unlock_sockets() + rc.wait_for_next_recv(True) + + for s in resp_sockets: + ms = messaging.drain_sock(sockets[s]) + for m in ms: + m = m.as_builder() + m.logMonoTime = msg.logMonoTime + log_msgs.append(m.as_reader()) + cnt += 1 + assert(managed_processes[cfg.proc_name].proc.is_alive()) + finally: + managed_processes[cfg.proc_name].signal(signal.SIGKILL) + managed_processes[cfg.proc_name].stop() + + return log_msgs + + +def setup_env(CP=None, cfg=None, controlsState=None, lr=None, fingerprint=None): params = Params() params.clear_all() params.put_bool("OpenpilotEnabledToggle", True) @@ -407,8 +428,12 @@ def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None): os.environ["NO_RADAR_SLEEP"] = "1" os.environ["REPLAY"] = "1" - os.environ["SKIP_FW_QUERY"] = "" - os.environ["FINGERPRINT"] = "" + if fingerprint is not None: + os.environ['SKIP_FW_QUERY'] = "1" + os.environ['FINGERPRINT'] = fingerprint + else: + os.environ["SKIP_FW_QUERY"] = "" + os.environ["FINGERPRINT"] = "" if lr is not None: services = {m.which() for m in lr} @@ -424,7 +449,7 @@ def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None): os.environ.update(cfg.environ) os.environ['PROC_NAME'] = cfg.proc_name - if simulation: + if cfg is not None and cfg.simulation: os.environ["SIMULATION"] = "1" elif "SIMULATION" in os.environ: del os.environ["SIMULATION"] @@ -440,181 +465,19 @@ def setup_env(simulation=False, CP=None, cfg=None, controlsState=None, lr=None): if CP.alternativeExperience == ALTERNATIVE_EXPERIENCE.DISABLE_DISENGAGE_ON_GAS: params.put_bool("DisengageOnAccelerator", False) - if CP.fingerprintSource == "fw": - params.put("CarParamsCache", CP.as_builder().to_bytes()) - else: - os.environ['SKIP_FW_QUERY'] = "1" - os.environ['FINGERPRINT'] = CP.carFingerprint + if fingerprint is None: + if CP.fingerprintSource == "fw": + params.put("CarParamsCache", CP.as_builder().to_bytes()) + os.environ['SKIP_FW_QUERY'] = "" + os.environ['FINGERPRINT'] = "" + else: + os.environ['SKIP_FW_QUERY'] = "1" + os.environ['FINGERPRINT'] = CP.carFingerprint if CP.openpilotLongitudinalControl: params.put_bool("ExperimentalLongitudinalEnabled", True) -def python_replay_process(cfg, lr, fingerprint=None): - sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] - pub_sockets = [s for s in cfg.pub_sub.keys() if s != 'can'] - - fsm = FakeSubMaster(pub_sockets, **cfg.submaster_config) - fpm = FakePubMaster(sub_sockets) - can_sock = None - args = (fsm, fpm) - if 'can' in list(cfg.pub_sub.keys()): - can_sock = FakeSocket() - args = (fsm, fpm, can_sock) - - all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) - pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] - - controlsState = None - initialized = False - for msg in lr: - if msg.which() == 'controlsState': - controlsState = msg.controlsState - if initialized: - break - elif msg.which() == 'carEvents': - initialized = car.CarEvent.EventName.controlsInitializing not in [e.name for e in msg.carEvents] - - assert controlsState is not None and initialized, "controlsState never initialized" - - if fingerprint is not None: - os.environ['SKIP_FW_QUERY'] = "1" - os.environ['FINGERPRINT'] = fingerprint - setup_env(cfg=cfg, controlsState=controlsState, lr=lr) - else: - CP = [m for m in lr if m.which() == 'carParams'][0].carParams - setup_env(CP=CP, cfg=cfg, controlsState=controlsState, lr=lr) - - assert(type(managed_processes[cfg.proc_name]) is PythonProcess) - managed_processes[cfg.proc_name].prepare() - mod = importlib.import_module(managed_processes[cfg.proc_name].module) - - thread = threading.Thread(target=mod.main, args=args) - thread.daemon = True - thread.start() - - if cfg.init_callback is not None: - if 'can' not in list(cfg.pub_sub.keys()): - can_sock = None - cfg.init_callback(all_msgs, fsm, can_sock, fingerprint) - - CP = car.CarParams.from_bytes(Params().get("CarParams", block=True)) - - # wait for started process to be ready - if 'can' in list(cfg.pub_sub.keys()): - can_sock.wait_for_recv() - else: - fsm.wait_for_update() - - log_msgs, msg_queue = [], [] - for msg in pub_msgs: - recv_socks = cfg.pub_sub[msg.which()] - if cfg.should_recv_callback is not None: - _, should_recv = cfg.should_recv_callback(msg, CP, cfg, fsm) - else: - socks = [s for s in cfg.pub_sub[msg.which()] if - (fsm.frame + 1) % max(1, int(service_list[msg.which()].frequency / service_list[s].frequency)) == 0] - should_recv = bool(len(socks)) - - if msg.which() == 'can': - can_sock.send(msg.as_builder().to_bytes()) - else: - msg_queue.append(msg.as_builder()) - - if should_recv: - fsm.update_msgs(msg.logMonoTime / 1e9, msg_queue) - msg_queue = [] - - if can_sock is not None: - can_sock.recv_called.wait() - else: - fsm.update_called.wait() - - for s in recv_socks: - ms = fpm.drain(s) - for m in ms: - m = m.as_builder() - m.logMonoTime = msg.logMonoTime - log_msgs.append(m.as_reader()) - - return log_msgs - - -def replay_process_with_sockets(cfg, lr, fingerprint=None): - pm = messaging.PubMaster(cfg.pub_sub.keys()) - sub_sockets = [s for _, sub in cfg.pub_sub.items() for s in sub] - sockets = {s: messaging.sub_sock(s, timeout=100) for s in sub_sockets} - - all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) - pub_msgs = [msg for msg in all_msgs if msg.which() in list(cfg.pub_sub.keys())] - - # We need to fake SubMaster alive since we can't inject a fake clock - CP = [m for m in lr if m.which() == 'carParams'][0].carParams - setup_env(simulation=True, CP=CP, cfg=cfg, lr=lr) - - if cfg.proc_name == "laikad": - ublox = Params().get_bool("UbloxAvailable") - keys = set(cfg.pub_sub.keys()) - ({"qcomGnss", } if ublox else {"ubloxGnss", }) - pub_msgs = [msg for msg in pub_msgs if msg.which() in keys] - - managed_processes[cfg.proc_name].prepare() - managed_processes[cfg.proc_name].start() - - if cfg.init_callback is not None: - cfg.init_callback(all_msgs, None, None, fingerprint) - CP = car.CarParams.from_bytes(Params().get("CarParams", block=True)) - - log_msgs = [] - try: - # Wait for process to startup - with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(cfg.proc_name)}"): - while not any(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()): - time.sleep(0) - - for s in sockets.values(): - messaging.recv_one_or_none(s) - - # Do the replay - cnt = 0 - curr_CP = None - for msg in pub_msgs: - with Timeout(cfg.timeout, error_msg=f"timed out testing process {repr(cfg.proc_name)}, {cnt}/{len(pub_msgs)} msgs done"): - if msg.which() == 'carParams': - curr_CP = msg.carParams - - resp_sockets = cfg.pub_sub[msg.which()] - if cfg.should_recv_callback is not None: - resp_sockets, _ = cfg.should_recv_callback(msg, curr_CP, cfg, None) - - # Make sure all subscribers are connected - if len(log_msgs) == 0 and len(resp_sockets) > 0: - for s in sockets.values(): - messaging.recv_one_or_none(s) - - pm.send(msg.which(), msg.as_builder()) - while not pm.all_readers_updated(msg.which()): - time.sleep(0) - - time.sleep(cfg.iter_wait_time) - - for s in resp_sockets: - msgs = [] - if cfg.drain_sockets: - msgs.extend(messaging.drain_sock(sockets[s], wait_for_one=True)) - else: - msgs = [messaging.recv_one_retry(sockets[s])] - for m in msgs: - m = m.as_builder() - m.logMonoTime = msg.logMonoTime - log_msgs.append(m.as_reader()) - cnt += 1 - finally: - managed_processes[cfg.proc_name].signal(signal.SIGKILL) - managed_processes[cfg.proc_name].stop() - - return log_msgs - - def check_enabled(msgs): cur_enabled_count = 0 max_enabled_count = 0 diff --git a/selfdrive/test/process_replay/regen.py b/selfdrive/test/process_replay/regen.py index dea9a737e8..fbabc1bd29 100755 --- a/selfdrive/test/process_replay/regen.py +++ b/selfdrive/test/process_replay/regen.py @@ -283,10 +283,9 @@ def regen_segment(lr, frs=None, daemons="all", outdir=FAKEDATA, disable_tqdm=Fal # TODO add configs for modeld, dmonitoringmodeld fakeable_daemons = {} for config in CONFIGS: - replayable_messages = set([msg for sub in config.pub_sub.values() for msg in sub]) processes = [ multiprocessing.Process(target=replay_service, args=(msg, lr)) - for msg in replayable_messages + for msg in config.subs ] fakeable_daemons[config.proc_name] = processes diff --git a/selfdrive/test/process_replay/test_fuzzy.py b/selfdrive/test/process_replay/test_fuzzy.py index 29fdf641f1..12f5fca37d 100755 --- a/selfdrive/test/process_replay/test_fuzzy.py +++ b/selfdrive/test/process_replay/test_fuzzy.py @@ -99,7 +99,7 @@ def get_strategy_for_events(event_types, finite=False): def get_strategy_for_process(process, finite=False): - return get_strategy_for_events(get_process_config(process).pub_sub.keys(), finite) + return get_strategy_for_events(get_process_config(process).pubs, finite) def convert_to_lr(msgs): diff --git a/selfdrive/test/profiling/profiler.py b/selfdrive/test/profiling/profiler.py index 732a69eebd..a0940b327b 100755 --- a/selfdrive/test/profiling/profiler.py +++ b/selfdrive/test/profiling/profiler.py @@ -25,7 +25,7 @@ CARS = { def get_inputs(msgs, process, fingerprint): for config in CONFIGS: if config.proc_name == process: - sub_socks = list(config.pub_sub.keys()) + sub_socks = list(config.pubs) trigger = sub_socks[0] break