diff --git a/system/camerad/test/test_camerad.py b/system/camerad/test/test_camerad.py index 1a2e365a8f..378d4b7058 100755 --- a/system/camerad/test/test_camerad.py +++ b/system/camerad/test/test_camerad.py @@ -1,25 +1,18 @@ #!/usr/bin/env python3 - import time import unittest +from collections import defaultdict import cereal.messaging as messaging +from cereal.services import service_list +from selfdrive.manager.process_config import managed_processes from system.hardware import TICI -from selfdrive.test.helpers import with_processes -TEST_TIMESPAN = 30 # random.randint(60, 180) # seconds -SKIP_FRAME_TOLERANCE = 0 -LAG_FRAME_TOLERANCE = 2 # ms +TEST_TIMESPAN = 30 +LAG_FRAME_TOLERANCE = 0.5 # ms -FPS_BASELINE = 20 -CAMERAS = { - "roadCameraState": FPS_BASELINE, - "driverCameraState": FPS_BASELINE // 2, -} +CAMERAS = ('roadCameraState', 'driverCameraState', 'wideRoadCameraState') -if TICI: - CAMERAS["driverCameraState"] = FPS_BASELINE - CAMERAS["wideRoadCameraState"] = FPS_BASELINE class TestCamerad(unittest.TestCase): @classmethod @@ -27,37 +20,57 @@ class TestCamerad(unittest.TestCase): if not TICI: raise unittest.SkipTest - @with_processes(['camerad']) - def test_frame_packets(self): - print("checking frame pkts continuity") - print(TEST_TIMESPAN) + # run camerad and record logs + managed_processes['camerad'].start() + time.sleep(3) + socks = {c: messaging.sub_sock(c, conflate=False, timeout=100) for c in CAMERAS} + + cls.logs = defaultdict(list) + start_time = time.monotonic() + while time.monotonic()- start_time < TEST_TIMESPAN: + for cam, s in socks.items(): + cls.logs[cam] += messaging.drain_sock(s) + time.sleep(0.2) + managed_processes['camerad'].stop() - sm = messaging.SubMaster([socket_name for socket_name in CAMERAS]) + cls.log_by_frame_id = defaultdict(list) + for cam, msgs in cls.logs.items(): + expected_frames = service_list[cam].frequency * TEST_TIMESPAN + assert expected_frames*0.95 < len(msgs) < expected_frames*1.05, f"unexpected frame count {cam}: {expected_frames=}, got {len(msgs)}" - last_frame_id = dict.fromkeys(CAMERAS, None) - last_ts = dict.fromkeys(CAMERAS, None) - start_time_sec = time.time() - while time.time()- start_time_sec < TEST_TIMESPAN: - sm.update() + for m in msgs: + cls.log_by_frame_id[getattr(m, m.which()).frameId].append(m) - for camera in CAMERAS: - if sm.updated[camera]: - ct = (sm[camera].timestampEof if not TICI else sm[camera].timestampSof) / 1e6 - if last_frame_id[camera] is None: - last_frame_id[camera] = sm[camera].frameId - last_ts[camera] = ct - continue + # strip beginning and end + for _ in range(3): + mn, mx = min(cls.log_by_frame_id.keys()), max(cls.log_by_frame_id.keys()) + del cls.log_by_frame_id[mn] + del cls.log_by_frame_id[mx] + + @classmethod + def tearDownClass(cls): + managed_processes['camerad'].stop() - dfid = sm[camera].frameId - last_frame_id[camera] - self.assertTrue(abs(dfid - 1) <= SKIP_FRAME_TOLERANCE, "%s frame id diff is %d" % (camera, dfid)) + def test_frame_skips(self): + skips = {} + frame_ids = self.log_by_frame_id.keys() + for frame_id in range(min(frame_ids), max(frame_ids)): + seen_cams = [msg.which() for msg in self.log_by_frame_id[frame_id]] + skip_cams = set(CAMERAS) - set(seen_cams) + if len(skip_cams): + skips[frame_id] = skip_cams + assert len(skips) == 0, f"Found frame skips, missing cameras for the following frames: {skips}" - dts = ct - last_ts[camera] - self.assertTrue(abs(dts - (1000/CAMERAS[camera])) < LAG_FRAME_TOLERANCE, f"{camera} frame t(ms) diff is {dts:f}") + def test_frame_sync(self): + frame_times = {frame_id: [getattr(m, m.which()).timestampSof for m in msgs] for frame_id, msgs in self.log_by_frame_id.items()} + diffs = {frame_id: (max(ts) - min(ts))/1e6 for frame_id, ts in frame_times.items()} - last_frame_id[camera] = sm[camera].frameId - last_ts[camera] = ct - time.sleep(0.01) + def get_desc(fid, diff): + cam_times = [(m.which(), getattr(m, m.which()).timestampSof/1e6) for m in self.log_by_frame_id[fid]] + return f"{diff=} {cam_times=}" + laggy_frames = {k: get_desc(k, v) for k, v in diffs.items() if v > LAG_FRAME_TOLERANCE} + assert len(laggy_frames) == 0, f"Frames not synced properly: {laggy_frames=}" if __name__ == "__main__": unittest.main()