@ -1,13 +1,12 @@
#!/usr/bin/env python3
import os
import time
import threading
import _thread
import signal
import sys
import subprocess
import cereal . messaging as messaging
from common . basedir import BASEDIR
from common . params import Params
import selfdrive . manager as manager
from selfdrive . test . helpers import set_params_enabled
def cputime_total ( ct ) :
@ -40,7 +39,7 @@ def print_cpu_usage(first_proc, last_proc):
( " ./logcatd " , 0 ) ,
]
r = 0
r = True
dt = ( last_proc . logMonoTime - first_proc . logMonoTime ) / 1e9
result = " ------------------------------------------------ \n "
for proc_name , normal_cpu_usage in procs :
@ -51,26 +50,25 @@ def print_cpu_usage(first_proc, last_proc):
cpu_usage = cpu_time / dt * 100.
if cpu_usage > max ( normal_cpu_usage * 1.1 , normal_cpu_usage + 5.0 ) :
result + = f " Warning { proc_name } using more CPU than normal \n "
r = 1
r = False
elif cpu_usage < min ( normal_cpu_usage * 0.3 , max ( normal_cpu_usage - 1.0 , 0.0 ) ) :
result + = f " Warning { proc_name } using less CPU than normal \n "
r = 1
r = False
result + = f " { proc_name . ljust ( 35 ) } { cpu_usage : .2f } % \n "
except IndexError :
result + = f " { proc_name . ljust ( 35 ) } NO METRICS FOUND \n "
r = 1
r = False
result + = " ------------------------------------------------ \n "
print ( result )
return r
def all_running ( ) :
running = manager . get_running ( )
return all ( p in running and running [ p ] . is_alive ( ) for p in manager . car_started_processes )
def test_cpu_usage ( ) :
cpu_ok = False
return_code = 1
def test_thread ( ) :
# start manager
manager_path = os . path . join ( BASEDIR , " selfdrive/manager.py " )
manager_proc = subprocess . Popen ( [ " python " , manager_path ] )
try :
global return_code
proc_sock = messaging . sub_sock ( ' procLog ' , conflate = True , timeout = 2000 )
# wait until everything's started and get first sample
@ -80,33 +78,26 @@ def test_thread():
break
time . sleep ( 2 )
first_proc = messaging . recv_sock ( proc_sock , wait = True )
if first_proc is None or not all_running ( ) :
err_msg = " procLog recv timed out " if first_proc is None else " all car started process not running "
print ( f " \n \n TEST FAILED: { err_msg } \n \n " )
raise Exception
if first_proc is None :
raise Exception ( " \n \n TEST FAILED: progLog recv timed out \n \n " )
# run for a minute and get last sample
time . sleep ( 60 )
last_proc = messaging . recv_sock ( proc_sock , wait = True )
return_code = print_cpu_usage ( first_proc , last_proc )
if not all_running ( ) :
return_code = 1
cpu_ok = print_cpu_usage ( first_proc , last_proc )
finally :
_thread . interrupt_main ( )
manager_proc . terminate ( )
ret = manager_proc . wait ( 20 )
if ret is None :
manager_proc . kill ( )
return cpu_ok
if __name__ == " __main__ " :
# setup signal handler to exit with test status
def handle_exit ( sig , frame ) :
sys . exit ( return_code )
signal . signal ( signal . SIGINT , handle_exit )
# start manager and test thread
set_params_enabled ( )
Params ( ) . delete ( " CarParams " )
t = threading . Thread ( target = test_thread )
t . daemon = True
t . start ( )
manager . main ( )
passed = False
try :
passed = test_cpu_usage ( )
finally :
sys . exit ( int ( not passed ) )