#!/usr/bin/env python3 import os import time import subprocess import unittest from pathlib import Path import cereal.messaging as messaging from cereal.services import service_list from common.basedir import BASEDIR from common.timeout import Timeout from selfdrive.loggerd.config import ROOT from selfdrive.test.helpers import set_params_enabled from tools.lib.logreader import LogReader PROCS = [ ("selfdrive.controls.controlsd", 47.0), ("./loggerd", 45.0), ("selfdrive.locationd.locationd", 35.0), ("selfdrive.controls.plannerd", 20.0), ("selfdrive.locationd.paramsd", 12.0), ("./camerad", 7.07), ("./_sensord", 6.17), ("./_ui", 5.82), ("selfdrive.controls.radard", 5.67), ("./_modeld", 4.48), ("./boardd", 3.63), ("./_dmonitoringmodeld", 2.67), ("selfdrive.logmessaged", 1.7), ("selfdrive.thermald.thermald", 2.41), ("selfdrive.locationd.calibrationd", 2.0), ("selfdrive.monitoring.dmonitoringd", 1.90), ("./proclogd", 1.54), ("./clocksd", 0.02), ("./ubloxd", 0.02), ("selfdrive.tombstoned", 0), ("./logcatd", 0), ] def cputime_total(ct): return ct.cpuUser + ct.cpuSystem + ct.cpuChildrenUser + ct.cpuChildrenSystem def check_cpu_usage(first_proc, last_proc): result = "------------------------------------------------\n" result += "------------------ CPU Usage -------------------\n" result += "------------------------------------------------\n" r = True dt = (last_proc.logMonoTime - first_proc.logMonoTime) / 1e9 for proc_name, normal_cpu_usage in PROCS: first, last = None, None try: first = [p for p in first_proc.procLog.procs if proc_name in p.cmdline][0] last = [p for p in last_proc.procLog.procs if proc_name in p.cmdline][0] cpu_time = cputime_total(last) - cputime_total(first) cpu_usage = cpu_time / dt * 100. if cpu_usage > max(normal_cpu_usage * 1.1, normal_cpu_usage + 5.0): result += f"Warning {proc_name} using more CPU than normal\n" r = False elif cpu_usage < min(normal_cpu_usage * 0.65, max(normal_cpu_usage - 1.0, 0.0)): result += f"Warning {proc_name} using less CPU than normal\n" r = False result += f"{proc_name.ljust(35)} {cpu_usage:.2f}%\n" except IndexError: result += f"{proc_name.ljust(35)} NO METRICS FOUND {first=} {last=}\n" r = False result += "------------------------------------------------\n" print(result) return r class TestOnroad(unittest.TestCase): @classmethod def setUpClass(cls): os.environ['SKIP_FW_QUERY'] = "1" os.environ['FINGERPRINT'] = "TOYOTA COROLLA TSS2 2019" set_params_enabled() initial_segments = set(Path(ROOT).iterdir()) # start manager and run openpilot for a minute try: manager_path = os.path.join(BASEDIR, "selfdrive/manager/manager.py") proc = subprocess.Popen(["python", manager_path]) sm = messaging.SubMaster(['carState']) with Timeout(150, "controls didn't start"): while sm.rcv_frame['carState'] < 0: sm.update(1000) # make sure we get at least two full segments cls.segments = [] with Timeout(300, "timed out waiting for logs"): while len(cls.segments) < 3: new_paths = set(Path(ROOT).iterdir()) - initial_segments segs = [p for p in new_paths if "--" in str(p)] cls.segments = sorted(segs, key=lambda s: int(str(s).rsplit('--')[-1])) time.sleep(5) finally: proc.terminate() if proc.wait(60) is None: proc.kill() cls.lr = list(LogReader(os.path.join(str(cls.segments[1]), "rlog.bz2"))) def test_cpu_usage(self): proclogs = [m for m in self.lr if m.which() == 'procLog'] self.assertGreater(len(proclogs), service_list['procLog'].frequency * 45, "insufficient samples") cpu_ok = check_cpu_usage(proclogs[0], proclogs[-1]) self.assertTrue(cpu_ok) if __name__ == "__main__": unittest.main()