import math import json import os import pytest import shutil import subprocess import time import numpy as np from collections import Counter, defaultdict from pathlib import Path from tabulate import tabulate from cereal import log import cereal.messaging as messaging from cereal.services import SERVICE_LIST from openpilot.common.basedir import BASEDIR from openpilot.common.timeout import Timeout from openpilot.common.params import Params from openpilot.selfdrive.selfdrived.events import EVENTS, ET from openpilot.selfdrive.test.helpers import set_params_enabled, release_only from openpilot.system.hardware import HARDWARE from openpilot.system.hardware.hw import Paths from openpilot.tools.lib.logreader import LogReader from openpilot.tools.lib.log_time_series import msgs_to_time_series """ CPU usage budget * each process is entitled to at least 8% * total CPU usage of openpilot (sum(PROCS.values()) should not exceed MAX_TOTAL_CPU """ TEST_DURATION = 25 LOG_OFFSET = 8 MAX_TOTAL_CPU = 280. # total for all 8 cores PROCS = { # Baseline CPU usage by process "selfdrive.controls.controlsd": 16.0, "selfdrive.selfdrived.selfdrived": 16.0, "selfdrive.car.card": 26.0, "./loggerd": 14.0, "./encoderd": 17.0, "./camerad": 10.0, "selfdrive.controls.plannerd": 9.0, "./ui": 18.0, "selfdrive.locationd.paramsd": 9.0, "./sensord": 7.0, "selfdrive.controls.radard": 2.0, "selfdrive.modeld.modeld": 22.0, "selfdrive.modeld.dmonitoringmodeld": 21.0, "system.hardware.hardwared": 4.0, "selfdrive.locationd.calibrationd": 2.0, "selfdrive.locationd.torqued": 5.0, "selfdrive.locationd.locationd": 25.0, "selfdrive.ui.soundd": 3.0, "selfdrive.monitoring.dmonitoringd": 4.0, "./proclogd": 2.0, "system.logmessaged": 1.0, "system.tombstoned": 0, "./logcatd": 1.0, "system.micd": 5.0, "system.timed": 0, "selfdrive.pandad.pandad": 0, "system.statsd": 1.0, "system.loggerd.uploader": 15.0, "system.loggerd.deleter": 1.0, } PROCS.update({ "tici": { "./pandad": 5.0, "./ubloxd": 1.0, "system.ubloxd.pigeond": 6.0, }, "tizi": { "./pandad": 19.0, "system.qcomgpsd.qcomgpsd": 1.0, } }.get(HARDWARE.get_device_type(), {})) TIMINGS = { # rtols: max/min, rsd "can": [2.5, 0.35], "pandaStates": [2.5, 0.35], "peripheralState": [2.5, 0.35], "sendcan": [2.5, 0.35], "carState": [2.5, 0.35], "carControl": [2.5, 0.35], "controlsState": [2.5, 0.35], "longitudinalPlan": [2.5, 0.5], "driverAssistance": [2.5, 0.5], "roadCameraState": [2.5, 0.35], "driverCameraState": [2.5, 0.35], "modelV2": [2.5, 0.35], "driverStateV2": [2.5, 0.40], "livePose": [2.5, 0.35], "wideRoadCameraState": [1.5, 0.35], } LOGS_SIZE = { # MB per segment "qlog.zst": 0.5, "rlog.zst": 8.1, "qcamera.ts": 2.3, } LOGS_SIZE.update(dict.fromkeys(['ecamera.hevc', 'fcamera.hevc', 'dcamera.hevc'], 76.5)) def cputime_total(ct): return ct.cpuUser + ct.cpuSystem + ct.cpuChildrenUser + ct.cpuChildrenSystem @pytest.mark.tici @pytest.mark.skip_tici_setup class TestOnroad: @classmethod def setup_class(cls): if "DEBUG" in os.environ: segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog.zst")), Path(Paths.log_root()).iterdir()) segs = sorted(segs, key=lambda x: x.stat().st_mtime) cls.lr = list(LogReader(os.path.join(segs[-1], "rlog.zst"))) cls.ts = msgs_to_time_series(cls.lr) return # setup env params = Params() params.remove("CurrentRoute") params.put_bool("RecordFront", True) set_params_enabled() os.environ['REPLAY'] = '1' os.environ['TESTING_CLOSET'] = '1' if os.path.exists(Paths.log_root()): shutil.rmtree(Paths.log_root()) # start manager and run openpilot for TEST_DURATION proc = None try: manager_path = os.path.join(BASEDIR, "system/manager/manager.py") cls.manager_st = time.monotonic() proc = subprocess.Popen(["python", manager_path]) sm = messaging.SubMaster(['carState']) with Timeout(30, "controls didn't start"): while not sm.seen['carState']: sm.update(1000) route = params.get("CurrentRoute", encoding="utf-8") assert route is not None segs = list(Path(Paths.log_root()).glob(f"{route}--*")) assert len(segs) == 1 time.sleep(TEST_DURATION) finally: if proc is not None: proc.terminate() if proc.wait(60) is None: proc.kill() cls.lr = list(LogReader(os.path.join(str(segs[0]), "rlog.zst"))) st = time.monotonic() cls.ts = msgs_to_time_series(cls.lr) print("msgs to time series", time.monotonic() - st) log_path = segs[0] cls.log_sizes = {} for f in log_path.iterdir(): assert f.is_file() cls.log_sizes[f] = f.stat().st_size / 1e6 cls.msgs = defaultdict(list) for m in cls.lr: cls.msgs[m.which()].append(m) def test_service_frequencies(self, subtests): for s, msgs in self.msgs.items(): if s in ('initData', 'sentinel'): continue # skip gps services for now if s in ('ubloxGnss', 'ubloxRaw', 'gnssMeasurements', 'gpsLocation', 'gpsLocationExternal', 'qcomGnss'): continue with subtests.test(service=s): assert len(msgs) >= math.floor(SERVICE_LIST[s].frequency*int(TEST_DURATION*0.8)) def test_manager_starting_time(self): st = self.ts['managerState']['t'][0] assert (st - self.manager_st) < 12.5, f"manager.py took {st - self.manager_st}s to publish the first 'managerState' msg" def test_cloudlog_size(self): msgs = self.msgs['logMessage'] total_size = sum(len(m.as_builder().to_bytes()) for m in msgs) assert total_size < 3.5e5 cnt = Counter(json.loads(m.logMessage)['filename'] for m in msgs) big_logs = [f for f, n in cnt.most_common(3) if n / sum(cnt.values()) > 30.] assert len(big_logs) == 0, f"Log spam: {big_logs}" def test_log_sizes(self, subtests): # TODO: this isn't super stable between different devices for f, sz in self.log_sizes.items(): rate = LOGS_SIZE[f.name]/60. minn = rate * TEST_DURATION * 0.5 maxx = rate * TEST_DURATION * 1.5 with subtests.test(file=f.name): assert minn < sz < maxx def test_ui_timings(self): result = "\n" result += "------------------------------------------------\n" result += "-------------- UI Draw Timing ------------------\n" result += "------------------------------------------------\n" ts = self.ts['uiDebug']['drawTimeMillis'] result += f"min {min(ts):.2f}ms\n" result += f"max {max(ts):.2f}ms\n" result += f"std {np.std(ts):.2f}ms\n" result += f"mean {np.mean(ts):.2f}ms\n" result += "------------------------------------------------\n" print(result) assert max(ts) < 250. assert np.mean(ts) < 10. #self.assertLess(np.std(ts), 5.) # some slow frames are expected since camerad/modeld can preempt ui veryslow = [x for x in ts if x > 40.] assert len(veryslow) < 5, f"Too many slow frame draw times: {veryslow}" def test_cpu_usage(self, subtests): print("\n------------------------------------------------") print("------------------ CPU Usage -------------------") print("------------------------------------------------") plogs_by_proc = defaultdict(list) for pl in self.msgs['procLog']: for x in pl.procLog.procs: if len(x.cmdline) > 0: n = list(x.cmdline)[0] plogs_by_proc[n].append(x) cpu_ok = True dt = (self.msgs['procLog'][-1].logMonoTime - self.msgs['procLog'][0].logMonoTime) / 1e9 header = ['process', 'usage', 'expected', 'max allowed', 'test result'] rows = [] for proc_name, expected in PROCS.items(): error = "" usage = 0. x = plogs_by_proc[proc_name] if len(x) > 2: cpu_time = cputime_total(x[-1]) - cputime_total(x[0]) usage = cpu_time / dt * 100. max_allowed = max(expected * 1.8, expected + 5.0) if usage > max_allowed: error = "❌ USING MORE CPU THAN EXPECTED ❌" cpu_ok = False else: error = "❌ NO METRICS FOUND ❌" cpu_ok = False rows.append([proc_name, usage, expected, max_allowed, error or "✅"]) print(tabulate(rows, header, tablefmt="simple_grid", stralign="center", numalign="center", floatfmt=".2f")) # Ensure there's no missing procs all_procs = {p.name for p in self.msgs['managerState'][0].managerState.processes if p.shouldBeRunning} for p in all_procs: with subtests.test(proc=p): assert any(p in pp for pp in PROCS.keys()), f"Expected CPU usage missing for {p}" # total CPU check procs_tot = sum([(max(x) if isinstance(x, tuple) else x) for x in PROCS.values()]) with subtests.test(name="total CPU"): assert procs_tot < MAX_TOTAL_CPU, "Total CPU budget exceeded" print("------------------------------------------------") print(f"Total allocated CPU usage is {procs_tot}%, budget is {MAX_TOTAL_CPU}%, {MAX_TOTAL_CPU-procs_tot:.1f}% left") print("------------------------------------------------") assert cpu_ok def test_memory_usage(self): print("\n------------------------------------------------") print("--------------- Memory Usage -------------------") print("------------------------------------------------") offset = int(SERVICE_LIST['deviceState'].frequency * LOG_OFFSET) mems = [m.deviceState.memoryUsagePercent for m in self.msgs['deviceState'][offset:]] print("Memory usage: ", mems) # check for big leaks. note that memory usage is # expected to go up while the MSGQ buffers fill up assert np.average(mems) <= 65, "Average memory usage above 65%" assert np.max(np.diff(mems)) <= 4, "Max memory increase too high" assert np.average(np.diff(mems)) <= 1, "Average memory increase too high" def test_camera_frame_timings(self, subtests): # test timing within a single camera result = "\n" result += "------------------------------------------------\n" result += "----------------- SOF Timing ------------------\n" result += "------------------------------------------------\n" for name in ['roadCameraState', 'wideRoadCameraState', 'driverCameraState']: ts = self.ts[name]['timestampSof'] d_ms = np.diff(ts) / 1e6 d50 = np.abs(d_ms-50) result += f"{name} sof delta vs 50ms: min {min(d50):.2f}ms\n" result += f"{name} sof delta vs 50ms: max {max(d50):.2f}ms\n" result += f"{name} sof delta vs 50ms: mean {d50.mean():.2f}ms\n" with subtests.test(camera=name): assert max(d50) < 5.0, f"high SOF delta vs 50ms: {max(d50)}" result += "------------------------------------------------\n" print(result) def test_camera_sync(self, subtests): cam_states = ['roadCameraState', 'wideRoadCameraState', 'driverCameraState'] encode_cams = ['roadEncodeIdx', 'wideRoadEncodeIdx', 'driverEncodeIdx'] for cams in (cam_states, encode_cams): with subtests.test(cams=cams): # sanity checks within a single cam for cam in cams: with subtests.test(test="frame_skips", camera=cam): assert set(np.diff(self.ts[cam]['frameId'])) == {1, }, "Frame ID skips" # EOF > SOF eof_sof_diff = self.ts[cam]['timestampEof'] - self.ts[cam]['timestampSof'] assert np.all(eof_sof_diff > 0) assert np.all(eof_sof_diff < 50*1e6) first_fid = {c: min(self.ts[c]['frameId']) for c in cams} if cam.endswith('CameraState'): # camerad guarantees that all cams start on frame ID 0 # (note loggerd also needs to start up fast enough to catch it) assert set(first_fid.values()) == {0, }, "Cameras don't start on frame ID 0" else: # encoder guarantees all cams start on the same frame ID assert len(set(first_fid.values())) == 1, "Cameras don't start on same frame ID" # we don't do a full segment rotation, so these might not match exactly last_fid = {c: max(self.ts[c]['frameId']) for c in cams} assert max(last_fid.values()) - min(last_fid.values()) < 10 start, end = min(first_fid.values()), min(last_fid.values()) for i in range(end-start): ts = {c: round(self.ts[c]['timestampSof'][i]/1e6, 1) for c in cams} diff = (max(ts.values()) - min(ts.values())) assert diff < 2, f"Cameras not synced properly: frame_id={start+i}, {diff=:.1f}ms, {ts=}" def test_camera_encoder_matches(self, subtests): # sanity check that the frame metadata is consistent with the encoded frames pairs = [('roadCameraState', 'roadEncodeIdx'), ('wideRoadCameraState', 'wideRoadEncodeIdx'), ('driverCameraState', 'driverEncodeIdx')] for cam, enc in pairs: with subtests.test(camera=cam, encoder=enc): cam_frames = {fid: (sof, eof) for fid, sof, eof in zip( self.ts[cam]['frameId'], self.ts[cam]['timestampSof'], self.ts[cam]['timestampEof'], strict=True, )} for i, fid in enumerate(self.ts[enc]['frameId']): cam_sof, cam_eof = cam_frames[fid] enc_sof, enc_eof = self.ts[enc]['timestampSof'][i], self.ts[enc]['timestampEof'][i] assert enc_sof == cam_sof, f"SOF mismatch: frameId={fid}, enc_sof={enc_sof}, cam_sof={cam_sof}" assert enc_eof == cam_eof, f"EOF mismatch: frameId={fid}, enc_eof={enc_eof}, cam_eof={cam_eof}" def test_mpc_execution_timings(self): result = "\n" result += "------------------------------------------------\n" result += "----------------- MPC Timing ------------------\n" result += "------------------------------------------------\n" cfgs = [("longitudinalPlan", 0.05, 0.05),] for (s, instant_max, avg_max) in cfgs: ts = [getattr(m, s).solverExecutionTime for m in self.msgs[s]] assert max(ts) < instant_max, f"high '{s}' execution time: {max(ts)}" assert np.mean(ts) < avg_max, f"high avg '{s}' execution time: {np.mean(ts)}" result += f"'{s}' execution time: min {min(ts):.5f}s\n" result += f"'{s}' execution time: max {max(ts):.5f}s\n" result += f"'{s}' execution time: mean {np.mean(ts):.5f}s\n" result += "------------------------------------------------\n" print(result) def test_model_execution_timings(self, subtests): result = "\n" result += "------------------------------------------------\n" result += "----------------- Model Timing -----------------\n" result += "------------------------------------------------\n" cfgs = [ ("modelV2", 0.045, 0.040), # TODO: this should be stricter but it's hard to measure exactly ("driverStateV2", 0.045, 0.035), ] for (s, instant_max, avg_max) in cfgs: ts = [getattr(m, s).modelExecutionTime for m in self.msgs[s]] # TODO some init can happen in first iteration ts = ts[1:] result += f"'{s}' execution time: min {min(ts):.5f}s\n" result += f"'{s}' execution time: max {max(ts):.5f}s\n" result += f"'{s}' execution time: mean {np.mean(ts):.5f}s\n" with subtests.test(s): assert max(ts) < instant_max, f"high '{s}' execution time: {max(ts)}" assert np.mean(ts) < avg_max, f"high avg '{s}' execution time: {np.mean(ts)}" result += "------------------------------------------------\n" print(result) def test_timings(self): passed = True print("\n------------------------------------------------") print("----------------- Service Timings --------------") print("------------------------------------------------") header = ['service', 'max', 'min', 'mean', 'expected mean', 'rsd', 'max allowed rsd', 'test result'] rows = [] for s, (maxmin, rsd) in TIMINGS.items(): offset = int(SERVICE_LIST[s].frequency * LOG_OFFSET) msgs = [m.logMonoTime for m in self.msgs[s][offset:]] if not len(msgs): raise Exception(f"missing {s}") ts = np.diff(msgs) / 1e9 dt = 1 / SERVICE_LIST[s].frequency errors = [] if not np.allclose(np.mean(ts), dt, rtol=0.03, atol=0): errors.append("❌ FAILED MEAN TIMING CHECK ❌") if not np.allclose([np.max(ts), np.min(ts)], dt, rtol=maxmin, atol=0): errors.append("❌ FAILED MAX/MIN TIMING CHECK ❌") if (np.std(ts)/dt) > rsd: errors.append("❌ FAILED RSD TIMING CHECK ❌") passed = not errors and passed rows.append([s, *(np.array([np.max(ts), np.min(ts), np.mean(ts), dt])*1e3), np.std(ts)/dt, rsd, "\n".join(errors) or "✅"]) print(tabulate(rows, header, tablefmt="simple_grid", stralign="center", numalign="center", floatfmt=".2f")) assert passed @release_only def test_startup(self): startup_alert = self.ts['selfdriveState']['alertText1'][0] expected = EVENTS[log.OnroadEvent.EventName.startup][ET.PERMANENT].alert_text_1 assert startup_alert == expected, "wrong startup alert" def test_engagable(self): no_entries = Counter() for m in self.msgs['onroadEvents']: for evt in m.onroadEvents: if evt.noEntry: no_entries[evt.name] += 1 offset = int(SERVICE_LIST['selfdriveState'].frequency * LOG_OFFSET) eng = [m.selfdriveState.engageable for m in self.msgs['selfdriveState'][offset:]] assert all(eng), \ f"Not engageable for whole segment:\n- selfdriveState.engageable: {Counter(eng)}\n- No entry events: {no_entries}"