diff --git a/selfdrive/modeld/tests/test_modeld.py b/selfdrive/modeld/tests/test_modeld.py index 2fcff785a9..758948811e 100755 --- a/selfdrive/modeld/tests/test_modeld.py +++ b/selfdrive/modeld/tests/test_modeld.py @@ -9,10 +9,7 @@ from cereal.visionipc import VisionIpcServer, VisionStreamType from common.transformations.camera import tici_f_frame_size from common.realtime import DT_MDL from selfdrive.manager.process_config import managed_processes - - -VIPC_STREAM = {"roadCameraState": VisionStreamType.VISION_STREAM_ROAD, "driverCameraState": VisionStreamType.VISION_STREAM_DRIVER, - "wideRoadCameraState": VisionStreamType.VISION_STREAM_WIDE_ROAD} +from selfdrive.test.process_replay.vision_meta import meta_from_camera_state IMG = np.zeros(int(tici_f_frame_size[0]*tici_f_frame_size[1]*(3/2)), dtype=np.uint8) IMG_BYTES = IMG.flatten().tobytes() @@ -48,9 +45,10 @@ class TestModeld(unittest.TestCase): cs.frameId = frame_id cs.timestampSof = int((frame_id * DT_MDL) * 1e9) cs.timestampEof = int(cs.timestampSof + (DT_MDL * 1e9)) + cam_meta = meta_from_camera_state(cam) self.pm.send(msg.which(), msg) - self.vipc_server.send(VIPC_STREAM[msg.which()], IMG_BYTES, cs.frameId, + self.vipc_server.send(cam_meta.stream, IMG_BYTES, cs.frameId, cs.timestampSof, cs.timestampEof) return cs diff --git a/selfdrive/test/process_replay/migration.py b/selfdrive/test/process_replay/migration.py index 31b3acde56..6c8055e3e4 100644 --- a/selfdrive/test/process_replay/migration.py +++ b/selfdrive/test/process_replay/migration.py @@ -1,13 +1,57 @@ +from collections import defaultdict + from cereal import messaging +from selfdrive.test.process_replay.vision_meta import meta_from_encode_index -def migrate_all(lr, old_logtime=False): +def migrate_all(lr, old_logtime=False, camera_states=False): msgs = migrate_sensorEvents(lr, old_logtime) msgs = migrate_carParams(msgs, old_logtime) + if camera_states: + msgs = migrate_cameraStates(msgs) return msgs +def migrate_cameraStates(lr): + all_msgs = [] + frame_to_encode_id = defaultdict(dict) + + for msg in lr: + if msg.which() not in ["roadEncodeIdx", "wideRoadEncodeIdx", "driverEncodeIdx"]: + continue + + encode_index = getattr(msg, msg.which()) + meta = meta_from_encode_index(msg.which()) + + assert encode_index.segmentId < 1200, f"Encoder index segmentId greater that 1200: {msg.which()} {encode_index.segmentId}" + frame_to_encode_id[meta.camera_state][encode_index.frameId] = encode_index.segmentId + + for msg in lr: + if msg.which() not in ["roadCameraState", "wideRoadCameraState", "driverCameraState"]: + all_msgs.append(msg) + continue + + camera_state = getattr(msg, msg.which()) + encode_id = frame_to_encode_id[msg.which()].get(camera_state.frameId) + if encode_id is None: + print(f"Missing encoded frame for camera feed {msg.which()} with frameId: {camera_state.frameId}") + continue + + new_msg = messaging.new_message(msg.which()) + new_camera_state = getattr(new_msg, new_msg.which()) + new_camera_state.frameId = encode_id + new_camera_state.encodeId = encode_id + new_camera_state.timestampSof = camera_state.timestampSof + new_camera_state.timestampEof = camera_state.timestampEof + new_msg.logMonoTime = msg.logMonoTime + new_msg.valid = msg.valid + + all_msgs.append(new_msg.as_reader()) + + return all_msgs + + def migrate_carParams(lr, old_logtime=False): all_msgs = [] for msg in lr: diff --git a/selfdrive/test/process_replay/model_replay.py b/selfdrive/test/process_replay/model_replay.py index 55d95a6a6a..e9789489e7 100755 --- a/selfdrive/test/process_replay/model_replay.py +++ b/selfdrive/test/process_replay/model_replay.py @@ -4,18 +4,15 @@ import sys import time from collections import defaultdict from typing import Any -from itertools import zip_longest import cereal.messaging as messaging -from cereal.visionipc import VisionIpcServer, VisionStreamType from common.spinner import Spinner -from common.timeout import Timeout -from common.transformations.camera import tici_f_frame_size, tici_d_frame_size from system.hardware import PC from selfdrive.manager.process_config import managed_processes from selfdrive.test.openpilotci import BASE_URL, get_url from selfdrive.test.process_replay.compare_logs import compare_logs from selfdrive.test.process_replay.test_processes import format_diff +from selfdrive.test.process_replay.process_replay import get_process_config, replay_process from system.version import get_commit from tools.lib.framereader import FrameReader from tools.lib.logreader import LogReader @@ -27,20 +24,30 @@ MAX_FRAMES = 100 if PC else 600 NAV_FRAMES = 50 NO_NAV = "NO_NAV" in os.environ -SEND_EXTRA_INPUTS = bool(os.getenv("SEND_EXTRA_INPUTS", "0")) +SEND_EXTRA_INPUTS = bool(int(os.getenv("SEND_EXTRA_INPUTS", "0"))) -VIPC_STREAM = {"roadCameraState": VisionStreamType.VISION_STREAM_ROAD, "driverCameraState": VisionStreamType.VISION_STREAM_DRIVER, - "wideRoadCameraState": VisionStreamType.VISION_STREAM_WIDE_ROAD} def get_log_fn(ref_commit, test_route): return f"{test_route}_model_tici_{ref_commit}.bz2" -def replace_calib(msg, calib): - msg = msg.as_builder() - if calib is not None: - msg.liveCalibration.rpyCalib = calib.tolist() - return msg +def trim_logs_to_max_frames(logs, max_frames, frs_types, include_all_types): + all_msgs = [] + cam_state_counts = defaultdict(int) + # keep adding messages until cam states are equal to MAX_FRAMES + for msg in sorted(logs, key=lambda m: m.logMonoTime): + all_msgs.append(msg) + if msg.which() in frs_types: + cam_state_counts[msg.which()] += 1 + + if all(cam_state_counts[state] == max_frames for state in frs_types): + break + + if len(include_all_types) != 0: + other_msgs = [m for m in logs if m.which() in include_all_types] + all_msgs.extend(other_msgs) + + return all_msgs def nav_model_replay(lr): @@ -102,102 +109,38 @@ def model_replay(lr, frs): else: spinner = None - vipc_server = VisionIpcServer("camerad") - vipc_server.create_buffers(VisionStreamType.VISION_STREAM_ROAD, 40, False, *(tici_f_frame_size)) - vipc_server.create_buffers(VisionStreamType.VISION_STREAM_DRIVER, 40, False, *(tici_d_frame_size)) - vipc_server.create_buffers(VisionStreamType.VISION_STREAM_WIDE_ROAD, 40, False, *(tici_f_frame_size)) - vipc_server.start_listener() + log_msgs = [] - sm = messaging.SubMaster(['modelV2', 'driverStateV2']) - pm = messaging.PubMaster(['roadCameraState', 'wideRoadCameraState', 'driverCameraState', 'liveCalibration', 'lateralPlan']) + # modeld is using frame pairs + modeld_logs = trim_logs_to_max_frames(lr, MAX_FRAMES, {"roadCameraState", "wideRoadCameraState"}, {"roadEncodeIdx", "wideRoadEncodeIdx"}) + dmodeld_logs = trim_logs_to_max_frames(lr, MAX_FRAMES, {"driverCameraState"}, {"driverEncodeIdx"}) + if not SEND_EXTRA_INPUTS: + modeld_logs = [msg for msg in modeld_logs if msg.which() not in ["liveCalibration", "lateralPlan"]] + dmodeld_logs = [msg for msg in dmodeld_logs if msg.which() not in ["liveCalibration", "lateralPlan"]] + # initial calibration + cal_msg = next(msg for msg in lr if msg.which() == "liveCalibration").as_builder() + cal_msg.logMonoTime = lr[0].logMonoTime + modeld_logs.insert(0, cal_msg.as_reader()) + dmodeld_logs.insert(0, cal_msg.as_reader()) + + modeld = get_process_config("modeld") + dmonitoringmodeld = get_process_config("dmonitoringmodeld") try: - managed_processes['modeld'].start() - managed_processes['dmonitoringmodeld'].start() - time.sleep(5) - sm.update(1000) - - log_msgs = [] - last_desire = None - recv_cnt = defaultdict(int) - frame_idxs = defaultdict(int) - - # init modeld with valid calibration - cal_msgs = [msg for msg in lr if msg.which() == "liveCalibration"] - for _ in range(5): - pm.send(cal_msgs[0].which(), cal_msgs[0].as_builder()) - time.sleep(0.1) - - msgs = defaultdict(list) - for msg in lr: - msgs[msg.which()].append(msg) - - for cam_msgs in zip_longest(msgs['roadCameraState'], msgs['wideRoadCameraState'], msgs['driverCameraState']): - # need a pair of road/wide msgs - if None in (cam_msgs[0], cam_msgs[1]): - break - - for msg in cam_msgs: - if msg is None: - continue - - if SEND_EXTRA_INPUTS: - if msg.which() == "liveCalibration": - last_calib = list(msg.liveCalibration.rpyCalib) - pm.send(msg.which(), replace_calib(msg, last_calib)) - elif msg.which() == "lateralPlan": - last_desire = msg.lateralPlan.desire - dat = messaging.new_message('lateralPlan') - dat.lateralPlan.desire = last_desire - pm.send('lateralPlan', dat) - - if msg.which() in VIPC_STREAM: - msg = msg.as_builder() - camera_state = getattr(msg, msg.which()) - img = frs[msg.which()].get(frame_idxs[msg.which()], pix_fmt="nv12")[0] - frame_idxs[msg.which()] += 1 - - # send camera state and frame - camera_state.frameId = frame_idxs[msg.which()] - pm.send(msg.which(), msg) - vipc_server.send(VIPC_STREAM[msg.which()], img.flatten().tobytes(), camera_state.frameId, - camera_state.timestampSof, camera_state.timestampEof) - - recv = None - if msg.which() in ('roadCameraState', 'wideRoadCameraState'): - if min(frame_idxs['roadCameraState'], frame_idxs['wideRoadCameraState']) > recv_cnt['modelV2']: - recv = "modelV2" - elif msg.which() == 'driverCameraState': - recv = "driverStateV2" - - # wait for a response - with Timeout(15, f"timed out waiting for {recv}"): - if recv: - recv_cnt[recv] += 1 - log_msgs.append(messaging.recv_one(sm.sock[recv])) - - if spinner: - spinner.update("replaying models: road %d/%d, driver %d/%d" % (frame_idxs['roadCameraState'], - frs['roadCameraState'].frame_count, frame_idxs['driverCameraState'], frs['driverCameraState'].frame_count)) - - - if any(frame_idxs[c] >= frs[c].frame_count for c in frame_idxs.keys()) or frame_idxs['roadCameraState'] == MAX_FRAMES: - break - else: - print(f'Received {frame_idxs["roadCameraState"]} frames') - + if spinner: + spinner.update("running model replay") + modeld_msgs = replay_process(modeld, modeld_logs, frs) + dmonitoringmodeld_msgs = replay_process(dmonitoringmodeld, dmodeld_logs, frs) + log_msgs.extend([m for m in modeld_msgs if m.which() == "modelV2"]) + log_msgs.extend([m for m in dmonitoringmodeld_msgs if m.which() == "driverStateV2"]) finally: if spinner: spinner.close() - managed_processes['modeld'].stop() - managed_processes['dmonitoringmodeld'].stop() - return log_msgs if __name__ == "__main__": - update = "--update" in sys.argv replay_dir = os.path.dirname(os.path.abspath(__file__)) ref_commit_fn = os.path.join(replay_dir, "model_replay_ref_commit") @@ -247,10 +190,18 @@ if __name__ == "__main__": ref_commit = f.read().strip() log_fn = get_log_fn(ref_commit, TEST_ROUTE) try: - expected_msgs = 2*MAX_FRAMES + all_logs = list(LogReader(BASE_URL + log_fn)) + cmp_log = [] + + # logs are ordered based on type: modelV2, driverStateV2, nav messages (navThumbnail, mapRenderState, navModel) + model_start_index = next(i for i, m in enumerate(all_logs) if m.which() == "modelV2") + cmp_log += all_logs[model_start_index:model_start_index + MAX_FRAMES] + dmon_start_index = next(i for i, m in enumerate(all_logs) if m.which() == "driverStateV2") + cmp_log += all_logs[dmon_start_index:dmon_start_index + MAX_FRAMES] if not NO_NAV: - expected_msgs += NAV_FRAMES*3 - cmp_log = list(LogReader(BASE_URL + log_fn))[:expected_msgs] + nav_start_index = next(i for i, m in enumerate(all_logs) if m.which() in ["navThumbnail", "mapRenderState", "navModel"]) + nav_logs = all_logs[nav_start_index:nav_start_index + NAV_FRAMES*3] + cmp_log += nav_logs ignore = [ 'logMonoTime', diff --git a/selfdrive/test/process_replay/model_replay_ref_commit b/selfdrive/test/process_replay/model_replay_ref_commit index d3a49b71c1..f4d0b4fc98 100644 --- a/selfdrive/test/process_replay/model_replay_ref_commit +++ b/selfdrive/test/process_replay/model_replay_ref_commit @@ -1 +1 @@ -f3595e5f0d3e04c68612e1fd639607146e23c8ad +965fa8cc8c131a8978c142813658b724a519ac9e diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index c3facd6186..78907ecc18 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -12,6 +12,7 @@ import capnp import cereal.messaging as messaging from cereal import car from cereal.services import service_list +from cereal.visionipc import VisionIpcServer, get_endpoint_name as vipc_get_endpoint_name from common.params import Params from common.timeout import Timeout from common.realtime import DT_CTRL @@ -19,6 +20,7 @@ from panda.python import ALTERNATIVE_EXPERIENCE from selfdrive.car.car_helpers import get_car, interfaces from selfdrive.manager.process_config import managed_processes from selfdrive.test.process_replay.helpers import OpenpilotPrefix +from selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams from selfdrive.test.process_replay.migration import migrate_all from tools.lib.logreader import LogReader @@ -29,24 +31,24 @@ TIMEOUT = 15 PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__)) FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/") - class ReplayContext: def __init__(self, cfg): self.proc_name = cfg.proc_name self.pubs = cfg.pubs - self.drained_pub = cfg.drained_pub - assert(len(self.pubs) != 0 or self.drained_pub is not None) + self.main_pub = cfg.main_pub + self.main_pub_drained = cfg.main_pub_drained + assert(len(self.pubs) != 0 or self.main_pub is not None) def __enter__(self): messaging.toggle_fake_events(True) messaging.set_fake_prefix(self.proc_name) - if self.drained_pub is None: + if self.main_pub is None: self.events = OrderedDict() for pub in self.pubs: self.events[pub] = messaging.fake_event_handle(pub, enable=True) else: - self.events = {self.drained_pub: messaging.fake_event_handle(self.drained_pub, enable=True)} + self.events = {self.main_pub: messaging.fake_event_handle(self.main_pub, enable=True)} return self @@ -83,7 +85,7 @@ class ReplayContext: def wait_for_next_recv(self, trigger_empty_recv): index = messaging.wait_for_one_event(self.all_recv_called_events) - if self.drained_pub is not None and trigger_empty_recv: + if self.main_pub is not None and self.main_pub_drained and trigger_empty_recv: self.all_recv_called_events[index].clear() self.all_recv_ready_events[index].set() self.all_recv_called_events[index].wait() @@ -95,16 +97,19 @@ class ProcessConfig: pubs: List[str] subs: List[str] ignore: List[str] - config_callback: Optional[Callable] - init_callback: Optional[Callable] - should_recv_callback: Optional[Callable] + config_callback: Optional[Callable] = None + init_callback: Optional[Callable] = None + should_recv_callback: Optional[Callable] = None tolerance: Optional[float] = None environ: Dict[str, str] = field(default_factory=dict) subtest_name: str = "" field_tolerances: Dict[str, float] = field(default_factory=dict) timeout: int = 30 simulation: bool = True - drained_pub: Optional[str] = None + main_pub: Optional[str] = None + main_pub_drained: bool = True + vision_pubs: List[str] = field(default_factory=list) + ignore_alive_pubs: List[str] = field(default_factory=list) class DummySocket: @@ -143,6 +148,7 @@ def controlsd_fingerprint_callback(rc, pm, msgs, fingerprint): def get_car_params_callback(rc, pm, msgs, fingerprint): + params = Params() if fingerprint: CarInterface, _, _ = interfaces[fingerprint] CP = CarInterface.get_non_essential_params(fingerprint) @@ -151,12 +157,15 @@ def get_car_params_callback(rc, pm, msgs, fingerprint): sendcan = DummySocket() canmsgs = [msg for msg in msgs if msg.which() == "can"] - assert len(canmsgs) != 0, "CAN messages are required for carParams initialization" + has_cached_cp = params.get("CarParamsCache") is not None + assert len(canmsgs) != 0, "CAN messages are required for fingerprinting" + assert os.environ.get("SKIP_FW_QUERY", False) or has_cached_cp, "CarParamsCache is required for fingerprinting. Make sure to keep carParams msgs in the logs." for m in canmsgs[:300]: can.send(m.as_builder().to_bytes()) _, CP = get_car(can, sendcan, Params().get_bool("ExperimentalLongitudinalEnabled")) - Params().put("CarParams", CP.to_bytes()) + params.put("CarParams", CP.to_bytes()) + return CP def controlsd_rcv_callback(msg, cfg, frame): @@ -173,10 +182,6 @@ def controlsd_rcv_callback(msg, cfg, frame): return len(socks) > 0 -def radar_rcv_callback(msg, cfg, frame): - return msg.which() == "can" - - def calibration_rcv_callback(msg, cfg, frame): # calibrationd publishes 1 calibrationData every 5 cameraOdometry packets. # should_recv always true to increment frame @@ -188,6 +193,42 @@ def torqued_rcv_callback(msg, cfg, frame): return (frame - 1) == 0 or msg.which() == 'liveLocationKalman' +def dmonitoringmodeld_rcv_callback(msg, cfg, frame): + return msg.which() == "driverCameraState" + + +class ModeldCameraSyncRcvCallback: + def __init__(self): + self.road_present = False + self.wide_road_present = False + self.is_dual_camera = True + + def __call__(self, msg, cfg, frame): + if msg.which() == "initData": + self.is_dual_camera = msg.initData.deviceType in ["tici", "tizi"] + elif msg.which() == "roadCameraState": + self.road_present = True + elif msg.which() == "wideRoadCameraState": + self.wide_road_present = True + + if self.road_present and self.wide_road_present: + self.road_present, self.wide_road_present = False, False + return True + elif self.road_present and not self.is_dual_camera: + self.road_present = False + return True + else: + return False + + +class MessageBasedRcvCallback: + def __init__(self, trigger_msg_type): + self.trigger_msg_type = trigger_msg_type + + def __call__(self, msg, cfg, frame): + return msg.which() == self.trigger_msg_type + + class FrequencyBasedRcvCallback: def __init__(self, trigger_msg_type): self.trigger_msg_type = trigger_msg_type @@ -205,17 +246,17 @@ class FrequencyBasedRcvCallback: def laikad_config_pubsub_callback(params, cfg): ublox = params.get_bool("UbloxAvailable") - drained_key = "ubloxGnss" if ublox else "qcomGnss" + main_key = "ubloxGnss" if ublox else "qcomGnss" sub_keys = ({"qcomGnss", } if ublox else {"ubloxGnss", }) - return set(cfg.pubs) - sub_keys, drained_key + return set(cfg.pubs) - sub_keys, main_key, True def locationd_config_pubsub_callback(params, cfg): ublox = params.get_bool("UbloxAvailable") sub_keys = ({"gpsLocation", } if ublox else {"gpsLocationExternal", }) - return set(cfg.pubs) - sub_keys, None + return set(cfg.pubs) - sub_keys, None, False CONFIGS = [ @@ -229,29 +270,26 @@ CONFIGS = [ ], subs=["controlsState", "carState", "carControl", "sendcan", "carEvents", "carParams"], ignore=["logMonoTime", "valid", "controlsState.startMonoTime", "controlsState.cumLagMs"], - config_callback=None, init_callback=controlsd_fingerprint_callback, should_recv_callback=controlsd_rcv_callback, tolerance=NUMPY_TOLERANCE, simulation=False, - drained_pub="can", + main_pub="can", ), ProcessConfig( proc_name="radard", pubs=["can", "carState", "modelV2"], subs=["radarState", "liveTracks"], ignore=["logMonoTime", "valid", "radarState.cumLagMs"], - config_callback=None, init_callback=get_car_params_callback, - should_recv_callback=radar_rcv_callback, - drained_pub="can", + should_recv_callback=MessageBasedRcvCallback("can"), + main_pub="can", ), ProcessConfig( proc_name="plannerd", pubs=["modelV2", "carControl", "carState", "controlsState", "radarState"], subs=["lateralPlan", "longitudinalPlan", "uiPlan"], ignore=["logMonoTime", "valid", "longitudinalPlan.processingDelay", "longitudinalPlan.solverExecutionTime", "lateralPlan.solverExecutionTime"], - config_callback=None, init_callback=get_car_params_callback, should_recv_callback=FrequencyBasedRcvCallback("modelV2"), tolerance=NUMPY_TOLERANCE, @@ -261,8 +299,6 @@ CONFIGS = [ pubs=["carState", "cameraOdometry", "carParams"], subs=["liveCalibration"], ignore=["logMonoTime", "valid"], - config_callback=None, - init_callback=None, should_recv_callback=calibration_rcv_callback, ), ProcessConfig( @@ -270,8 +306,6 @@ CONFIGS = [ pubs=["driverStateV2", "liveCalibration", "carState", "modelV2", "controlsState"], subs=["driverMonitoringState"], ignore=["logMonoTime", "valid"], - config_callback=None, - init_callback=None, should_recv_callback=FrequencyBasedRcvCallback("driverStateV2"), tolerance=NUMPY_TOLERANCE, ), @@ -284,8 +318,6 @@ CONFIGS = [ subs=["liveLocationKalman"], ignore=["logMonoTime", "valid"], config_callback=locationd_config_pubsub_callback, - init_callback=None, - should_recv_callback=None, tolerance=NUMPY_TOLERANCE, ), ProcessConfig( @@ -293,7 +325,6 @@ CONFIGS = [ pubs=["liveLocationKalman", "carState"], subs=["liveParameters"], ignore=["logMonoTime", "valid"], - config_callback=None, init_callback=get_car_params_callback, should_recv_callback=FrequencyBasedRcvCallback("liveLocationKalman"), tolerance=NUMPY_TOLERANCE, @@ -303,9 +334,6 @@ CONFIGS = [ pubs=["ubloxRaw"], subs=["ubloxGnss", "gpsLocationExternal"], ignore=["logMonoTime"], - config_callback=None, - init_callback=None, - should_recv_callback=None, ), ProcessConfig( proc_name="laikad", @@ -313,22 +341,43 @@ CONFIGS = [ subs=["gnssMeasurements"], ignore=["logMonoTime"], config_callback=laikad_config_pubsub_callback, - init_callback=None, - should_recv_callback=None, tolerance=NUMPY_TOLERANCE, timeout=60*10, # first messages are blocked on internet assistance - drained_pub="ubloxGnss", # config_callback will switch this to qcom if needed + main_pub="ubloxGnss", # config_callback will switch this to qcom if needed ), ProcessConfig( proc_name="torqued", pubs=["liveLocationKalman", "carState", "carControl"], subs=["liveTorqueParameters"], ignore=["logMonoTime"], - config_callback=None, init_callback=get_car_params_callback, should_recv_callback=torqued_rcv_callback, tolerance=NUMPY_TOLERANCE, ), + ProcessConfig( + proc_name="modeld", + pubs=["lateralPlan", "roadCameraState", "wideRoadCameraState", "liveCalibration", "driverMonitoringState"], + subs=["modelV2", "cameraOdometry"], + ignore=["logMonoTime", "modelV2.frameDropPerc", "modelV2.modelExecutionTime"], + should_recv_callback=ModeldCameraSyncRcvCallback(), + tolerance=NUMPY_TOLERANCE, + main_pub=vipc_get_endpoint_name("camerad", meta_from_camera_state("roadCameraState").stream), + main_pub_drained=False, + vision_pubs=["roadCameraState", "wideRoadCameraState"], + ignore_alive_pubs=["wideRoadCameraState"], + ), + ProcessConfig( + proc_name="dmonitoringmodeld", + pubs=["liveCalibration", "driverCameraState"], + subs=["driverStateV2"], + ignore=["logMonoTime", "driverStateV2.modelExecutionTime", "driverStateV2.dspExecutionTime"], + should_recv_callback=dmonitoringmodeld_rcv_callback, + tolerance=NUMPY_TOLERANCE, + main_pub=vipc_get_endpoint_name("camerad", meta_from_camera_state("driverCameraState").stream), + main_pub_drained=False, + vision_pubs=["driverCameraState"], + ignore_alive_pubs=["driverCameraState"], + ), ] @@ -344,9 +393,9 @@ def replay_process_with_name(name, lr, *args, **kwargs): return replay_process(cfg, lr, *args, **kwargs) -def replay_process(cfg, lr, fingerprint=None, return_all_logs=False, custom_params=None, disable_progress=False): - all_msgs = migrate_all(lr, old_logtime=True) - process_logs = _replay_single_process(cfg, all_msgs, fingerprint, custom_params or {}, disable_progress) +def replay_process(cfg, lr, frs=None, fingerprint=None, return_all_logs=False, custom_params=None, disable_progress=False): + all_msgs = migrate_all(lr, old_logtime=True, camera_states=len(cfg.vision_pubs) != 0) + process_logs = _replay_single_process(cfg, all_msgs, frs, fingerprint, custom_params, disable_progress) if return_all_logs: keys = set(cfg.subs) @@ -361,7 +410,7 @@ def replay_process(cfg, lr, fingerprint=None, return_all_logs=False, custom_para def _replay_single_process( - cfg: ProcessConfig, lr: Union[LogReader, List[capnp._DynamicStructReader]], + cfg: ProcessConfig, lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]], fingerprint: Optional[str], custom_params: Optional[Dict[str, Any]], disable_progress: bool ): with OpenpilotPrefix(): @@ -383,11 +432,11 @@ def _replay_single_process( else: CP = next((m.carParams for m in lr if m.which() == "carParams"), None) assert CP is not None or "carParams" not in cfg.pubs, "carParams are missing and process needs it" - setup_env(CP=CP, cfg=cfg, controlsState=controlsState, lr=lr, custom_params=custom_params) + setup_env(cfg=cfg, CP=CP, controlsState=controlsState, lr=lr, custom_params=custom_params) if cfg.config_callback is not None: params = Params() - cfg.pubs, cfg.drained_pub = cfg.config_callback(params, cfg) + cfg.pubs, cfg.main_pub, cfg.main_pub_drained = cfg.config_callback(params, cfg) all_msgs = sorted(lr, key=lambda msg: msg.logMonoTime) pub_msgs = [msg for msg in all_msgs if msg.which() in set(cfg.pubs)] @@ -396,6 +445,13 @@ def _replay_single_process( pm = messaging.PubMaster(cfg.pubs) sockets = {s: messaging.sub_sock(s, timeout=100) for s in cfg.subs} + vipc_server = None + if len(cfg.vision_pubs) != 0: + assert frs is not None, "frs must be provided when replaying process using vision streams" + assert all(meta_from_camera_state(st) is not None for st in cfg.vision_pubs),f"undefined vision stream spotted, probably misconfigured process: {cfg.vision_pubs}" + assert all(st in frs for st in cfg.vision_pubs), f"frs for this process must contain following vision streams: {cfg.vision_pubs}" + vipc_server = setup_vision_ipc(cfg, lr) + managed_processes[cfg.proc_name].prepare() managed_processes[cfg.proc_name].start() @@ -406,7 +462,7 @@ def _replay_single_process( try: # Wait for process to startup with Timeout(10, error_msg=f"timed out waiting for process to start: {repr(cfg.proc_name)}"): - while not all(pm.all_readers_updated(s) for s in cfg.pubs): + while not all(pm.all_readers_updated(s) for s in cfg.pubs if s not in cfg.ignore_alive_pubs): time.sleep(0) # Do the replay @@ -428,11 +484,19 @@ def _replay_single_process( # empty recv on drained pub indicates the end of messages, only do that if there're any trigger_empty_recv = False - if cfg.drained_pub: - trigger_empty_recv = next((True for m in msg_queue if m.which() == cfg.drained_pub), False) + if cfg.main_pub and cfg.main_pub_drained: + trigger_empty_recv = next((True for m in msg_queue if m.which() == cfg.main_pub), False) for m in msg_queue: pm.send(m.which(), m.as_builder()) + # send frames if needed + if vipc_server is not None and m.which() in cfg.vision_pubs: + camera_state = getattr(m, m.which()) + camera_meta = meta_from_camera_state(m.which()) + assert frs is not None + img = frs[m.which()].get(camera_state.frameId, pix_fmt="nv12")[0] + vipc_server.send(camera_meta.stream, img.flatten().tobytes(), + camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof) msg_queue = [] rc.unlock_sockets() @@ -454,6 +518,21 @@ def _replay_single_process( return log_msgs +def setup_vision_ipc(cfg, lr): + assert len(cfg.vision_pubs) != 0 + + device_type = next(msg.initData.deviceType for msg in lr if msg.which() == "initData") + + vipc_server = VisionIpcServer("camerad") + streams_metas = available_streams(lr) + for meta in streams_metas: + if meta.camera_state in cfg.vision_pubs: + vipc_server.create_buffers(meta.stream, 2, False, *meta.frame_sizes[device_type]) + vipc_server.start_listener() + + return vipc_server + + def setup_env(cfg=None, CP=None, controlsState=None, lr=None, fingerprint=None, custom_params=None, log_dir=None): if platform.system() != "Darwin": os.environ["PARAMS_ROOT"] = "/dev/shm/params" diff --git a/selfdrive/test/process_replay/test_processes.py b/selfdrive/test/process_replay/test_processes.py index 6962e51cb5..1b07081cd8 100755 --- a/selfdrive/test/process_replay/test_processes.py +++ b/selfdrive/test/process_replay/test_processes.py @@ -65,6 +65,7 @@ excluded_interfaces = ["mock", "mazda", "tesla"] BASE_URL = "https://commadataci.blob.core.windows.net/openpilotci/" REF_COMMIT_FN = os.path.join(PROC_REPLAY_DIR, "ref_commit") +EXCLUDED_PROCS = {"modeld", "dmonitoringmodeld"} def run_test_process(data): @@ -155,7 +156,7 @@ def format_diff(results, log_paths, ref_commit): if __name__ == "__main__": all_cars = {car for car, _ in segments} - all_procs = {cfg.proc_name for cfg in CONFIGS} + all_procs = {cfg.proc_name for cfg in CONFIGS if cfg.proc_name not in EXCLUDED_PROCS} parser = argparse.ArgumentParser(description="Regression test to identify changes in a process's output") parser.add_argument("--whitelist-procs", type=str, nargs="*", default=all_procs, diff --git a/selfdrive/test/process_replay/vision_meta.py b/selfdrive/test/process_replay/vision_meta.py new file mode 100644 index 0000000000..77c6b0345d --- /dev/null +++ b/selfdrive/test/process_replay/vision_meta.py @@ -0,0 +1,43 @@ +from collections import namedtuple +from cereal.visionipc import VisionStreamType +from common.realtime import DT_MDL, DT_DMON +from common.transformations.camera import tici_f_frame_size, tici_d_frame_size, tici_e_frame_size, eon_f_frame_size, eon_d_frame_size + +VideoStreamMeta = namedtuple("VideoStreamMeta", ["camera_state", "encode_index", "stream", "dt", "frame_sizes"]) +ROAD_CAMERA_FRAME_SIZES = {"tici": tici_f_frame_size, "tizi": tici_f_frame_size, "eon": eon_f_frame_size} +WIDE_ROAD_CAMERA_FRAME_SIZES = {"tici": tici_e_frame_size, "tizi": tici_e_frame_size} +DRIVER_FRAME_SIZES = {"tici": tici_d_frame_size, "tizi": tici_d_frame_size, "eon": eon_d_frame_size} +VIPC_STREAM_METADATA = [ + # metadata: (state_msg_type, encode_msg_type, stream_type, dt, frame_sizes) + ("roadCameraState", "roadEncodeIdx", VisionStreamType.VISION_STREAM_ROAD, DT_MDL, ROAD_CAMERA_FRAME_SIZES), + ("wideRoadCameraState", "wideRoadEncodeIdx", VisionStreamType.VISION_STREAM_WIDE_ROAD, DT_MDL, WIDE_ROAD_CAMERA_FRAME_SIZES), + ("driverCameraState", "driverEncodeIdx", VisionStreamType.VISION_STREAM_DRIVER, DT_DMON, DRIVER_FRAME_SIZES), +] + + +def meta_from_camera_state(state): + meta = next((VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA if meta[0] == state), None) + return meta + + +def meta_from_encode_index(encode_index): + meta = next((VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA if meta[1] == encode_index), None) + return meta + + +def meta_from_stream_type(stream_type): + meta = next((VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA if meta[2] == stream_type), None) + return meta + + +def available_streams(lr=None): + if lr is None: + return [VideoStreamMeta(*meta) for meta in VIPC_STREAM_METADATA] + + result = [] + for meta in VIPC_STREAM_METADATA: + has_cam_state = next((True for m in lr if m.which() == meta[0]), False) + if has_cam_state: + result.append(VideoStreamMeta(*meta)) + + return result