import  os 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								os . environ [ ' FAKEUPLOAD ' ]  =  " 1 " 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  common . testing  import  phone_only 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  selfdrive . manager  import  manager_init ,  manager_prepare 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  selfdrive . manager  import  start_managed_process ,  kill_managed_process ,  get_running 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  selfdrive . config  import  CruiseButtons 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								from  functools  import  wraps 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								import  time 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								DID_INIT  =  False 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# must run first 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								@phone_only 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  test_manager_prepare ( ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  global  DID_INIT 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  manager_init ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  manager_prepare ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  DID_INIT  =  True 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  with_processes ( processes ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  def  wrapper ( func ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    @wraps ( func ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    def  wrap ( ) : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      if  not  DID_INIT : 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								        test_manager_prepare ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # start and assert started 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      [ start_managed_process ( p )  for  p  in  processes ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      assert  all ( get_running ( ) [ name ] . exitcode  is  None  for  name  in  processes ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # call the function 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      func ( ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # assert processes are still started 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      assert  all ( get_running ( ) [ name ] . exitcode  is  None  for  name  in  processes ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      # kill and assert all stopped 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      [ kill_managed_process ( p )  for  p  in  processes ] 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								      assert  len ( get_running ( ) )  ==  0 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								    return  wrap 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  return  wrapper 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
									 
								
							 
							
								 
							 
							
							
								#@phone_only 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#@with_processes(['controlsd', 'radard']) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#def test_controls(): 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#  from selfdrive.test.plant.plant import Plant 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#  # start the fake car for 2 seconds 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#  plant = Plant(100) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#  for i in range(200): 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#    if plant.rk.frame >= 20 and plant.rk.frame <= 25: 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#      cruise_buttons = CruiseButtons.RES_ACCEL 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#      # rolling forward 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#      assert plant.speed > 0 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#    else: 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#      cruise_buttons = 0 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#    plant.step(cruise_buttons = cruise_buttons) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#  plant.close() 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#  # assert that we stopped 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								#  assert plant.speed == 0.0 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								@phone_only 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								@with_processes ( [ ' loggerd ' ,  ' logmessaged ' ,  ' tombstoned ' ,  ' proclogd ' ,  ' logcatd ' ] ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  test_logging ( ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  print  " LOGGING IS SET UP " 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  time . sleep ( 1.0 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								@phone_only 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								@with_processes ( [ ' visiond ' ] ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  test_visiond ( ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  print  " VISIOND IS SET UP " 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  time . sleep ( 5.0 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								@phone_only 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								@with_processes ( [ ' sensord ' ] ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  test_sensord ( ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  print  " SENSORS ARE SET UP " 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  time . sleep ( 1.0 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								@phone_only 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								@with_processes ( [ ' ui ' ] ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  test_ui ( ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  print  " RUNNING UI " 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  time . sleep ( 1.0 ) 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# will have one thing to upload if loggerd ran 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								# TODO: assert it actually uploaded 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								@phone_only 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								@with_processes ( [ ' uploader ' ] ) 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								def  test_uploader ( ) : 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  print  " UPLOADER " 
 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							 
							
							
								  time . sleep ( 10.0 )