From 937e2f7eb2dd831d6dc5a6748f04a3095a8b8b7b Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Thu, 27 Feb 2025 13:41:33 -0800 Subject: [PATCH] camerad: spectra stress test (#34716) * cam stress * lint fixes --------- Co-authored-by: Comma Device --- system/camerad/cameras/spectra.cc | 11 +++ system/camerad/cameras/spectra.h | 11 +++ system/camerad/test/test_camerad.py | 117 +++++++++++++--------------- 3 files changed, 75 insertions(+), 64 deletions(-) diff --git a/system/camerad/cameras/spectra.cc b/system/camerad/cameras/spectra.cc index 2ab7bc568a..f5757dcded 100644 --- a/system/camerad/cameras/spectra.cc +++ b/system/camerad/cameras/spectra.cc @@ -917,6 +917,9 @@ bool SpectraCamera::enqueue_buffer(int i, uint64_t request_id) { // in IFE_PROCESSED mode, this is both frame readout and image processing (~1ms) sync_wait.sync_obj = sync_objs_ife[i]; sync_wait.timeout_ms = 100; + if (stress_test("IFE sync")) { + sync_wait.timeout_ms = 1; + } ret = do_sync_control(m->cam_sync_fd, CAM_SYNC_WAIT, &sync_wait, sizeof(sync_wait)); if (ret != 0) { LOGE("failed to wait for IFE sync: %d %d", ret, sync_wait.sync_obj); @@ -926,6 +929,9 @@ bool SpectraCamera::enqueue_buffer(int i, uint64_t request_id) { if (ret == 0 && sync_objs_bps[i]) { sync_wait.sync_obj = sync_objs_bps[i]; sync_wait.timeout_ms = 50; // typically 7ms + if (stress_test("BPS sync")) { + sync_wait.timeout_ms = 1; + } ret = do_sync_control(m->cam_sync_fd, CAM_SYNC_WAIT, &sync_wait, sizeof(sync_wait)); if (ret != 0) { LOGE("failed to wait for BPS sync: %d %d", ret, sync_wait.sync_obj); @@ -1365,6 +1371,11 @@ void SpectraCamera::camera_close() { // Processes camera events and returns true if the frame is ready for further processing bool SpectraCamera::handle_camera_event(const cam_req_mgr_message *event_data) { + if (stress_test("skipping handling camera event")) { + LOGW("skipping event"); + return false; + } + if (!enabled) return false; // ID from the qcom camera request manager diff --git a/system/camerad/cameras/spectra.h b/system/camerad/cameras/spectra.h index 9c2026058a..d9abf884c1 100644 --- a/system/camerad/cameras/spectra.h +++ b/system/camerad/cameras/spectra.h @@ -10,6 +10,7 @@ #include "media/cam_req_mgr.h" #include "common/util.h" +#include "common/swaglog.h" #include "system/camerad/cameras/hw.h" #include "system/camerad/cameras/camera_common.h" #include "system/camerad/sensors/sensor.h" @@ -205,4 +206,14 @@ private: }; inline static std::map camera_sync_data; inline static bool first_frame_synced = false; + + // a mode for stressing edge cases: realignment, sync failures, etc. + inline bool stress_test(const char* log, float prob=0.01) { + static bool enable = getenv("SPECTRA_STRESS_TEST") != nullptr; + bool triggered = enable && ((static_cast(rand()) / RAND_MAX) < prob); + if (triggered) { + LOGE("stress test: %s", log); + } + return triggered; + } }; diff --git a/system/camerad/test/test_camerad.py b/system/camerad/test/test_camerad.py index cf4013eb4a..e88a7bf4bf 100644 --- a/system/camerad/test/test_camerad.py +++ b/system/camerad/test/test_camerad.py @@ -1,81 +1,70 @@ -import pytest +import os import time +import pytest import numpy as np -from collections import defaultdict import cereal.messaging as messaging -from cereal import log from cereal.services import SERVICE_LIST -from openpilot.common.retry import retry from openpilot.system.manager.process_config import managed_processes +from openpilot.tools.lib.log_time_series import msgs_to_time_series TEST_TIMESPAN = 10 -LAG_FRAME_TOLERANCE = {log.FrameData.ImageSensor.ar0231: 0.5, # ARs use synced pulses for frame starts - log.FrameData.ImageSensor.ox03c10: 1.1, # OXs react to out-of-sync at next frame - log.FrameData.ImageSensor.os04c10: 1.1} -FRAME_DELTA_TOLERANCE = {log.FrameData.ImageSensor.ar0231: 1.0, - log.FrameData.ImageSensor.ox03c10: 1.0, - log.FrameData.ImageSensor.os04c10: 1.0} - CAMERAS = ('roadCameraState', 'driverCameraState', 'wideRoadCameraState') -@retry(attempts=3, delay=5.0) -def setup_camerad(cls): - # run camerad and record logs - managed_processes['camerad'].start() - time.sleep(3) - socks = {c: messaging.sub_sock(c, conflate=False, timeout=100) for c in CAMERAS} - - cls.logs = defaultdict(list) - start_time = time.monotonic() - while time.monotonic()- start_time < TEST_TIMESPAN: - for cam, s in socks.items(): - cls.logs[cam] += messaging.drain_sock(s) - time.sleep(0.2) - managed_processes['camerad'].stop() - - cls.log_by_frame_id = defaultdict(list) - cls.sensor_type = None - for cam, msgs in cls.logs.items(): - if cls.sensor_type is None: - cls.sensor_type = getattr(msgs[0], msgs[0].which()).sensor.raw - expected_frames = SERVICE_LIST[cam].frequency * TEST_TIMESPAN - assert expected_frames*0.95 < len(msgs) < expected_frames*1.05, f"unexpected frame count {cam}: {expected_frames=}, got {len(msgs)}" - dts = np.abs(np.diff([getattr(m, m.which()).timestampSof/1e6 for m in msgs]) - 1000/SERVICE_LIST[cam].frequency) - assert (dts < FRAME_DELTA_TOLERANCE[cls.sensor_type]).all(), f"{cam} dts(ms) out of spec: max diff {dts.max()}, 99 percentile {np.percentile(dts, 99)}" +def run_and_log(procs, services, duration): + logs = [] + + try: + for p in procs: + managed_processes[p].start() + socks = [messaging.sub_sock(s, conflate=False, timeout=100) for s in services] + + start_time = time.monotonic() + while time.monotonic() - start_time < duration: + for s in socks: + logs.extend(messaging.drain_sock(s)) + for p in procs: + assert managed_processes[p].proc.is_alive() + finally: + for p in procs: + managed_processes[p].stop() + + return logs - for m in msgs: - cls.log_by_frame_id[getattr(m, m.which()).frameId].append(m) +@pytest.fixture(scope="module") +def logs(): + logs = run_and_log(["camerad", ], CAMERAS, TEST_TIMESPAN) + ts = msgs_to_time_series(logs) - # strip beginning and end - for _ in range(3): - mn, mx = min(cls.log_by_frame_id.keys()), max(cls.log_by_frame_id.keys()) - del cls.log_by_frame_id[mn] - del cls.log_by_frame_id[mx] + for cam in CAMERAS: + expected_frames = SERVICE_LIST[cam].frequency * TEST_TIMESPAN + cnt = len(ts[cam]['t']) + assert expected_frames*0.8 < cnt < expected_frames*1.2, f"unexpected frame count {cam}: {expected_frames=}, got {cnt}" + + dts = np.abs(np.diff([ts[cam]['timestampSof']/1e6]) - 1000/SERVICE_LIST[cam].frequency) + assert (dts < 1.0).all(), f"{cam} dts(ms) out of spec: max diff {dts.max()}, 99 percentile {np.percentile(dts, 99)}" + return ts @pytest.mark.tici class TestCamerad: - @classmethod - def setup_class(cls): - setup_camerad(cls) - - def test_frame_skips(self): - skips = {} - frame_ids = self.log_by_frame_id.keys() - for frame_id in range(min(frame_ids), max(frame_ids)): - seen_cams = [msg.which() for msg in self.log_by_frame_id[frame_id]] - skip_cams = set(CAMERAS) - set(seen_cams) - if len(skip_cams): - skips[frame_id] = skip_cams - assert len(skips) == 0, f"Found frame skips, missing cameras for the following frames: {skips}" - - def test_frame_sync(self): - frame_times = {frame_id: [getattr(m, m.which()).timestampSof for m in msgs] for frame_id, msgs in self.log_by_frame_id.items()} - diffs = {frame_id: (max(ts) - min(ts))/1e6 for frame_id, ts in frame_times.items()} - - def get_desc(fid, diff): - cam_times = [(m.which(), getattr(m, m.which()).timestampSof/1e6) for m in self.log_by_frame_id[fid]] - return (diff, cam_times) - laggy_frames = {k: get_desc(k, v) for k, v in diffs.items() if v > LAG_FRAME_TOLERANCE[self.sensor_type]} + def test_frame_skips(self, logs): + for c in CAMERAS: + assert set(np.diff(logs[c]['frameId'])) == {1, }, f"{c} has frame skips" + + def test_frame_sync(self, logs): + n = range(len(logs['roadCameraState']['t'][:-10])) + + frame_ids = {i: [logs[cam]['frameId'][i] for cam in CAMERAS] for i in n} + assert all(len(set(v)) == 1 for v in frame_ids.values()), "frame IDs not aligned" + + frame_times = {i: [logs[cam]['timestampSof'][i] for cam in CAMERAS] for i in n} + diffs = {i: (max(ts) - min(ts))/1e6 for i, ts in frame_times.items()} + + laggy_frames = {k: v for k, v in diffs.items() if v > 1.1} assert len(laggy_frames) == 0, f"Frames not synced properly: {laggy_frames=}" + + @pytest.mark.skip("TODO: enable this") + def test_stress_test(self, logs): + os.environ['SPECTRA_STRESS_TEST'] = '1' + run_and_log(["camerad", ], CAMERAS, 5)