camerad: spectra stress test (#34716)

* cam stress

* lint fixes

---------

Co-authored-by: Comma Device <device@comma.ai>
pull/34665/head^2
Adeeb Shihadeh 8 months ago committed by GitHub
parent c8e598e647
commit 937e2f7eb2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 11
      system/camerad/cameras/spectra.cc
  2. 11
      system/camerad/cameras/spectra.h
  3. 113
      system/camerad/test/test_camerad.py

@ -917,6 +917,9 @@ bool SpectraCamera::enqueue_buffer(int i, uint64_t request_id) {
// in IFE_PROCESSED mode, this is both frame readout and image processing (~1ms)
sync_wait.sync_obj = sync_objs_ife[i];
sync_wait.timeout_ms = 100;
if (stress_test("IFE sync")) {
sync_wait.timeout_ms = 1;
}
ret = do_sync_control(m->cam_sync_fd, CAM_SYNC_WAIT, &sync_wait, sizeof(sync_wait));
if (ret != 0) {
LOGE("failed to wait for IFE sync: %d %d", ret, sync_wait.sync_obj);
@ -926,6 +929,9 @@ bool SpectraCamera::enqueue_buffer(int i, uint64_t request_id) {
if (ret == 0 && sync_objs_bps[i]) {
sync_wait.sync_obj = sync_objs_bps[i];
sync_wait.timeout_ms = 50; // typically 7ms
if (stress_test("BPS sync")) {
sync_wait.timeout_ms = 1;
}
ret = do_sync_control(m->cam_sync_fd, CAM_SYNC_WAIT, &sync_wait, sizeof(sync_wait));
if (ret != 0) {
LOGE("failed to wait for BPS sync: %d %d", ret, sync_wait.sync_obj);
@ -1365,6 +1371,11 @@ void SpectraCamera::camera_close() {
// Processes camera events and returns true if the frame is ready for further processing
bool SpectraCamera::handle_camera_event(const cam_req_mgr_message *event_data) {
if (stress_test("skipping handling camera event")) {
LOGW("skipping event");
return false;
}
if (!enabled) return false;
// ID from the qcom camera request manager

@ -10,6 +10,7 @@
#include "media/cam_req_mgr.h"
#include "common/util.h"
#include "common/swaglog.h"
#include "system/camerad/cameras/hw.h"
#include "system/camerad/cameras/camera_common.h"
#include "system/camerad/sensors/sensor.h"
@ -205,4 +206,14 @@ private:
};
inline static std::map<int, SyncData> camera_sync_data;
inline static bool first_frame_synced = false;
// a mode for stressing edge cases: realignment, sync failures, etc.
inline bool stress_test(const char* log, float prob=0.01) {
static bool enable = getenv("SPECTRA_STRESS_TEST") != nullptr;
bool triggered = enable && ((static_cast<double>(rand()) / RAND_MAX) < prob);
if (triggered) {
LOGE("stress test: %s", log);
}
return triggered;
}
};

@ -1,81 +1,70 @@
import pytest
import os
import time
import pytest
import numpy as np
from collections import defaultdict
import cereal.messaging as messaging
from cereal import log
from cereal.services import SERVICE_LIST
from openpilot.common.retry import retry
from openpilot.system.manager.process_config import managed_processes
from openpilot.tools.lib.log_time_series import msgs_to_time_series
TEST_TIMESPAN = 10
LAG_FRAME_TOLERANCE = {log.FrameData.ImageSensor.ar0231: 0.5, # ARs use synced pulses for frame starts
log.FrameData.ImageSensor.ox03c10: 1.1, # OXs react to out-of-sync at next frame
log.FrameData.ImageSensor.os04c10: 1.1}
FRAME_DELTA_TOLERANCE = {log.FrameData.ImageSensor.ar0231: 1.0,
log.FrameData.ImageSensor.ox03c10: 1.0,
log.FrameData.ImageSensor.os04c10: 1.0}
CAMERAS = ('roadCameraState', 'driverCameraState', 'wideRoadCameraState')
@retry(attempts=3, delay=5.0)
def setup_camerad(cls):
# run camerad and record logs
managed_processes['camerad'].start()
time.sleep(3)
socks = {c: messaging.sub_sock(c, conflate=False, timeout=100) for c in CAMERAS}
cls.logs = defaultdict(list)
def run_and_log(procs, services, duration):
logs = []
try:
for p in procs:
managed_processes[p].start()
socks = [messaging.sub_sock(s, conflate=False, timeout=100) for s in services]
start_time = time.monotonic()
while time.monotonic()- start_time < TEST_TIMESPAN:
for cam, s in socks.items():
cls.logs[cam] += messaging.drain_sock(s)
time.sleep(0.2)
managed_processes['camerad'].stop()
cls.log_by_frame_id = defaultdict(list)
cls.sensor_type = None
for cam, msgs in cls.logs.items():
if cls.sensor_type is None:
cls.sensor_type = getattr(msgs[0], msgs[0].which()).sensor.raw
expected_frames = SERVICE_LIST[cam].frequency * TEST_TIMESPAN
assert expected_frames*0.95 < len(msgs) < expected_frames*1.05, f"unexpected frame count {cam}: {expected_frames=}, got {len(msgs)}"
while time.monotonic() - start_time < duration:
for s in socks:
logs.extend(messaging.drain_sock(s))
for p in procs:
assert managed_processes[p].proc.is_alive()
finally:
for p in procs:
managed_processes[p].stop()
dts = np.abs(np.diff([getattr(m, m.which()).timestampSof/1e6 for m in msgs]) - 1000/SERVICE_LIST[cam].frequency)
assert (dts < FRAME_DELTA_TOLERANCE[cls.sensor_type]).all(), f"{cam} dts(ms) out of spec: max diff {dts.max()}, 99 percentile {np.percentile(dts, 99)}"
return logs
for m in msgs:
cls.log_by_frame_id[getattr(m, m.which()).frameId].append(m)
@pytest.fixture(scope="module")
def logs():
logs = run_and_log(["camerad", ], CAMERAS, TEST_TIMESPAN)
ts = msgs_to_time_series(logs)
# strip beginning and end
for _ in range(3):
mn, mx = min(cls.log_by_frame_id.keys()), max(cls.log_by_frame_id.keys())
del cls.log_by_frame_id[mn]
del cls.log_by_frame_id[mx]
for cam in CAMERAS:
expected_frames = SERVICE_LIST[cam].frequency * TEST_TIMESPAN
cnt = len(ts[cam]['t'])
assert expected_frames*0.8 < cnt < expected_frames*1.2, f"unexpected frame count {cam}: {expected_frames=}, got {cnt}"
dts = np.abs(np.diff([ts[cam]['timestampSof']/1e6]) - 1000/SERVICE_LIST[cam].frequency)
assert (dts < 1.0).all(), f"{cam} dts(ms) out of spec: max diff {dts.max()}, 99 percentile {np.percentile(dts, 99)}"
return ts
@pytest.mark.tici
class TestCamerad:
@classmethod
def setup_class(cls):
setup_camerad(cls)
def test_frame_skips(self):
skips = {}
frame_ids = self.log_by_frame_id.keys()
for frame_id in range(min(frame_ids), max(frame_ids)):
seen_cams = [msg.which() for msg in self.log_by_frame_id[frame_id]]
skip_cams = set(CAMERAS) - set(seen_cams)
if len(skip_cams):
skips[frame_id] = skip_cams
assert len(skips) == 0, f"Found frame skips, missing cameras for the following frames: {skips}"
def test_frame_sync(self):
frame_times = {frame_id: [getattr(m, m.which()).timestampSof for m in msgs] for frame_id, msgs in self.log_by_frame_id.items()}
diffs = {frame_id: (max(ts) - min(ts))/1e6 for frame_id, ts in frame_times.items()}
def get_desc(fid, diff):
cam_times = [(m.which(), getattr(m, m.which()).timestampSof/1e6) for m in self.log_by_frame_id[fid]]
return (diff, cam_times)
laggy_frames = {k: get_desc(k, v) for k, v in diffs.items() if v > LAG_FRAME_TOLERANCE[self.sensor_type]}
def test_frame_skips(self, logs):
for c in CAMERAS:
assert set(np.diff(logs[c]['frameId'])) == {1, }, f"{c} has frame skips"
def test_frame_sync(self, logs):
n = range(len(logs['roadCameraState']['t'][:-10]))
frame_ids = {i: [logs[cam]['frameId'][i] for cam in CAMERAS] for i in n}
assert all(len(set(v)) == 1 for v in frame_ids.values()), "frame IDs not aligned"
frame_times = {i: [logs[cam]['timestampSof'][i] for cam in CAMERAS] for i in n}
diffs = {i: (max(ts) - min(ts))/1e6 for i, ts in frame_times.items()}
laggy_frames = {k: v for k, v in diffs.items() if v > 1.1}
assert len(laggy_frames) == 0, f"Frames not synced properly: {laggy_frames=}"
@pytest.mark.skip("TODO: enable this")
def test_stress_test(self, logs):
os.environ['SPECTRA_STRESS_TEST'] = '1'
run_and_log(["camerad", ], CAMERAS, 5)

Loading…
Cancel
Save