dragonpilot - 基於 openpilot 的開源駕駛輔助系統
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

103 lines
3.2 KiB

#!/usr/bin/env python3
import time
import threading
import _thread
import signal
import sys
import cereal.messaging as messaging
import selfdrive.manager as manager
def cputime_total(ct):
return ct.cpuUser + ct.cpuSystem + ct.cpuChildrenUser + ct.cpuChildrenSystem
def print_cpu_usage(first_proc, last_proc):
procs = [
("selfdrive.controls.controlsd", 59.46),
("selfdrive.locationd.locationd", 34.38),
("./loggerd", 33.90),
("selfdrive.controls.plannerd", 19.77),
("./_modeld", 12.74),
("selfdrive.locationd.paramsd", 11.53),
("selfdrive.controls.radard", 9.54),
("./_ui", 9.54),
("./camerad", 7.07),
("selfdrive.locationd.calibrationd", 6.81),
("./_sensord", 6.17),
("selfdrive.monitoring.dmonitoringd", 5.48),
("./boardd", 3.63),
("./_dmonitoringmodeld", 2.67),
("selfdrive.logmessaged", 2.71),
("selfdrive.thermald.thermald", 2.41),
("./proclogd", 1.54),
("./_gpsd", 0.09),
("./clocksd", 0.02),
("./ubloxd", 0.02),
("selfdrive.tombstoned", 0),
("./logcatd", 0),
]
r = 0
dt = (last_proc.logMonoTime - first_proc.logMonoTime) / 1e9
result = "------------------------------------------------\n"
for proc_name, normal_cpu_usage in procs:
try:
first = [p for p in first_proc.procLog.procs if proc_name in p.cmdline][0]
last = [p for p in last_proc.procLog.procs if proc_name in p.cmdline][0]
cpu_time = cputime_total(last) - cputime_total(first)
cpu_usage = cpu_time / dt * 100.
if cpu_usage > max(normal_cpu_usage * 1.1, normal_cpu_usage + 5.0):
result += f"Warning {proc_name} using more CPU than normal\n"
r = 1
elif cpu_usage < min(normal_cpu_usage * 0.3, max(normal_cpu_usage - 1.0, 0.0)):
result += f"Warning {proc_name} using less CPU than normal\n"
r = 1
result += f"{proc_name.ljust(35)} {cpu_usage:.2f}%\n"
except IndexError:
result += f"{proc_name.ljust(35)} NO METRICS FOUND\n"
r = 1
result += "------------------------------------------------\n"
print(result)
return r
def all_running():
running = manager.get_running()
return all(p in running and running[p].is_alive() for p in manager.car_started_processes)
return_code = 1
def test_thread():
try:
global return_code
proc_sock = messaging.sub_sock('procLog', conflate=True, timeout=1000)
# wait until everything's started and get first sample
time.sleep(30)
first_proc = messaging.recv_sock(proc_sock, wait=True)
if first_proc is None or not all_running():
print("\n\nTEST FAILED: all car started processes not running\n\n")
raise Exception
# run for a minute and get last sample
time.sleep(60)
last_proc = messaging.recv_sock(proc_sock, wait=True)
return_code = print_cpu_usage(first_proc, last_proc)
if not all_running():
return_code = 1
finally:
_thread.interrupt_main()
if __name__ == "__main__":
# setup signal handler to exit with test status
def handle_exit(sig, frame):
sys.exit(return_code)
signal.signal(signal.SIGINT, handle_exit)
# start manager and test thread
t = threading.Thread(target=test_thread)
t.daemon = True
t.start()
manager.main()