#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  subprocess 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  unittest 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  pathlib  import  Path 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  cereal . messaging  as  messaging 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  cereal . services  import  service_list 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  common . basedir  import  BASEDIR 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  common . timeout  import  Timeout 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  selfdrive . loggerd . config  import  ROOT 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  selfdrive . manager  as  manager 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  selfdrive . test . helpers  import  set_params_enabled 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  tools . lib . logreader  import  LogReader 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								PROCS  =  [ 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " selfdrive.controls.controlsd " ,  47.0 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " ./loggerd " ,  45.0 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " selfdrive.locationd.locationd " ,  35.0 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " selfdrive.controls.plannerd " ,  20.0 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " selfdrive.locationd.paramsd " ,  12.0 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " ./camerad " ,  7.07 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " ./_sensord " ,  6.17 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " ./_ui " ,  5.82 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " selfdrive.controls.radard " ,  5.67 ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  ( " ./_modeld " ,  4.48 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " ./boardd " ,  3.63 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " ./_dmonitoringmodeld " ,  2.67 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " selfdrive.logmessaged " ,  1.7 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " selfdrive.thermald.thermald " ,  2.41 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " selfdrive.locationd.calibrationd " ,  2.0 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " selfdrive.monitoring.dmonitoringd " ,  1.90 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " ./proclogd " ,  1.54 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " ./clocksd " ,  0.02 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " ./ubloxd " ,  0.02 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " selfdrive.tombstoned " ,  0 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ( " ./logcatd " ,  0 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								] 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# ***** test helpers ***** 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  cputime_total ( ct ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  ct . cpuUser  +  ct . cpuSystem  +  ct . cpuChildrenUser  +  ct . cpuChildrenSystem 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								def  check_cpu_usage ( first_proc ,  last_proc ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  result  =   " ------------------------------------------------ \n " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  result  + =  " ------------------ CPU Usage ------------------- \n " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  result  + =  " ------------------------------------------------ \n " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  r  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  dt  =  ( last_proc . logMonoTime  -  first_proc . logMonoTime )  /  1e9 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  for  proc_name ,  normal_cpu_usage  in  PROCS : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    first ,  last  =  None ,  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      first  =  [ p  for  p  in  first_proc . procLog . procs  if  proc_name  in  p . cmdline ] [ 0 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      last  =  [ p  for  p  in  last_proc . procLog . procs  if  proc_name  in  p . cmdline ] [ 0 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cpu_time  =  cputime_total ( last )  -  cputime_total ( first ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cpu_usage  =  cpu_time  /  dt  *  100. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  cpu_usage  >  max ( normal_cpu_usage  *  1.1 ,  normal_cpu_usage  +  5.0 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        result  + =  f " Warning  { proc_name }  using more CPU than normal \n " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        r  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      elif  cpu_usage  <  min ( normal_cpu_usage  *  0.65 ,  max ( normal_cpu_usage  -  1.0 ,  0.0 ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        result  + =  f " Warning  { proc_name }  using less CPU than normal \n " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        r  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      result  + =  f " { proc_name . ljust ( 35 ) }    { cpu_usage : .2f } % \n " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    except  IndexError : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      result  + =  f " { proc_name . ljust ( 35 ) }   NO METRICS FOUND  { first =}   { last =} \n " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      r  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  result  + =  " ------------------------------------------------ \n " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  print ( result ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  r 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestOnroad ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @classmethod 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUpClass ( cls ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    os . environ [ ' SKIP_FW_QUERY ' ]  =  " 1 " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    os . environ [ ' FINGERPRINT ' ]  =  " TOYOTA COROLLA TSS2 2019 " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    set_params_enabled ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    initial_segments  =  set ( Path ( ROOT ) . iterdir ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # start manager and run openpilot for a minute 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      manager . build ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      manager . manager_prepare ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      manager_path  =  os . path . join ( BASEDIR ,  " selfdrive/manager.py " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      proc  =  subprocess . Popen ( [ " python " ,  manager_path ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      sm  =  messaging . SubMaster ( [ ' carState ' ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      with  Timeout ( 150 ,  " controls didn ' t start " ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        while  sm . rcv_frame [ ' carState ' ]  <  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          sm . update ( 1000 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      # make sure we get at least two full segments 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      cls . segments  =  [ ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      with  Timeout ( 300 ,  " timed out waiting for logs " ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        while  len ( cls . segments )  <  3 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          new_paths  =  set ( Path ( ROOT ) . iterdir ( ) )  -  initial_segments 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          segs  =  [ p  for  p  in  new_paths  if  " -- "  in  str ( p ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          cls . segments  =  sorted ( segs ,  key = lambda  s :  int ( str ( s ) . rsplit ( ' -- ' ) [ - 1 ] ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          time . sleep ( 5 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    finally : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      proc . terminate ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      if  proc . wait ( 60 )  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        proc . kill ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    cls . lr  =  list ( LogReader ( os . path . join ( str ( cls . segments [ 1 ] ) ,  " rlog.bz2 " ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_cpu_usage ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    proclogs  =  [ m  for  m  in  self . lr  if  m . which ( )  ==  ' procLog ' ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . assertGreater ( len ( proclogs ) ,  service_list [ ' procLog ' ] . frequency  *  45 ,  " insufficient samples " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cpu_ok  =  check_cpu_usage ( proclogs [ 0 ] ,  proclogs [ - 1 ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertTrue ( cpu_ok ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  unittest . main ( )