camerad: improved frame sync and skip tests (#25904)

* camerad: cleanup frame sync and skip tests

* fix linter

Co-authored-by: Comma Device <device@comma.ai>
pull/25914/head
Adeeb Shihadeh 3 years ago committed by GitHub
parent 4e310b807f
commit 1c6dc12a04
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 87
      system/camerad/test/test_camerad.py

@ -1,25 +1,18 @@
#!/usr/bin/env python3
import time
import unittest
from collections import defaultdict
import cereal.messaging as messaging
from cereal.services import service_list
from selfdrive.manager.process_config import managed_processes
from system.hardware import TICI
from selfdrive.test.helpers import with_processes
TEST_TIMESPAN = 30 # random.randint(60, 180) # seconds
SKIP_FRAME_TOLERANCE = 0
LAG_FRAME_TOLERANCE = 2 # ms
TEST_TIMESPAN = 30
LAG_FRAME_TOLERANCE = 0.5 # ms
FPS_BASELINE = 20
CAMERAS = {
"roadCameraState": FPS_BASELINE,
"driverCameraState": FPS_BASELINE // 2,
}
CAMERAS = ('roadCameraState', 'driverCameraState', 'wideRoadCameraState')
if TICI:
CAMERAS["driverCameraState"] = FPS_BASELINE
CAMERAS["wideRoadCameraState"] = FPS_BASELINE
class TestCamerad(unittest.TestCase):
@classmethod
@ -27,37 +20,57 @@ class TestCamerad(unittest.TestCase):
if not TICI:
raise unittest.SkipTest
@with_processes(['camerad'])
def test_frame_packets(self):
print("checking frame pkts continuity")
print(TEST_TIMESPAN)
# run camerad and record logs
managed_processes['camerad'].start()
time.sleep(3)
socks = {c: messaging.sub_sock(c, conflate=False, timeout=100) for c in CAMERAS}
cls.logs = defaultdict(list)
start_time = time.monotonic()
while time.monotonic()- start_time < TEST_TIMESPAN:
for cam, s in socks.items():
cls.logs[cam] += messaging.drain_sock(s)
time.sleep(0.2)
managed_processes['camerad'].stop()
sm = messaging.SubMaster([socket_name for socket_name in CAMERAS])
cls.log_by_frame_id = defaultdict(list)
for cam, msgs in cls.logs.items():
expected_frames = service_list[cam].frequency * TEST_TIMESPAN
assert expected_frames*0.95 < len(msgs) < expected_frames*1.05, f"unexpected frame count {cam}: {expected_frames=}, got {len(msgs)}"
last_frame_id = dict.fromkeys(CAMERAS, None)
last_ts = dict.fromkeys(CAMERAS, None)
start_time_sec = time.time()
while time.time()- start_time_sec < TEST_TIMESPAN:
sm.update()
for m in msgs:
cls.log_by_frame_id[getattr(m, m.which()).frameId].append(m)
for camera in CAMERAS:
if sm.updated[camera]:
ct = (sm[camera].timestampEof if not TICI else sm[camera].timestampSof) / 1e6
if last_frame_id[camera] is None:
last_frame_id[camera] = sm[camera].frameId
last_ts[camera] = ct
continue
# strip beginning and end
for _ in range(3):
mn, mx = min(cls.log_by_frame_id.keys()), max(cls.log_by_frame_id.keys())
del cls.log_by_frame_id[mn]
del cls.log_by_frame_id[mx]
@classmethod
def tearDownClass(cls):
managed_processes['camerad'].stop()
dfid = sm[camera].frameId - last_frame_id[camera]
self.assertTrue(abs(dfid - 1) <= SKIP_FRAME_TOLERANCE, "%s frame id diff is %d" % (camera, dfid))
def test_frame_skips(self):
skips = {}
frame_ids = self.log_by_frame_id.keys()
for frame_id in range(min(frame_ids), max(frame_ids)):
seen_cams = [msg.which() for msg in self.log_by_frame_id[frame_id]]
skip_cams = set(CAMERAS) - set(seen_cams)
if len(skip_cams):
skips[frame_id] = skip_cams
assert len(skips) == 0, f"Found frame skips, missing cameras for the following frames: {skips}"
dts = ct - last_ts[camera]
self.assertTrue(abs(dts - (1000/CAMERAS[camera])) < LAG_FRAME_TOLERANCE, f"{camera} frame t(ms) diff is {dts:f}")
def test_frame_sync(self):
frame_times = {frame_id: [getattr(m, m.which()).timestampSof for m in msgs] for frame_id, msgs in self.log_by_frame_id.items()}
diffs = {frame_id: (max(ts) - min(ts))/1e6 for frame_id, ts in frame_times.items()}
last_frame_id[camera] = sm[camera].frameId
last_ts[camera] = ct
time.sleep(0.01)
def get_desc(fid, diff):
cam_times = [(m.which(), getattr(m, m.which()).timestampSof/1e6) for m in self.log_by_frame_id[fid]]
return f"{diff=} {cam_times=}"
laggy_frames = {k: get_desc(k, v) for k, v in diffs.items() if v > LAG_FRAME_TOLERANCE}
assert len(laggy_frames) == 0, f"Frames not synced properly: {laggy_frames=}"
if __name__ == "__main__":
unittest.main()

Loading…
Cancel
Save