import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  pytest 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  signal 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal  import  car 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . common . params  import  Params 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  openpilot . system . manager . manager  as  manager 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . system . manager . process  import  ensure_running 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . system . manager . process_config  import  managed_processes ,  procs 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . system . hardware  import  HARDWARE 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								os . environ [ ' FAKEUPLOAD ' ]  =  " 1 " 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								MAX_STARTUP_TIME  =  3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								BLACKLIST_PROCS  =  [ ' manage_athenad ' ,  ' pandad ' ,  ' pigeond ' ] 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestManager : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setup_method ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    HARDWARE . set_power_save ( False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # ensure clean CarParams 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    params  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    params . clear_all ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  teardown_method ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    manager . manager_cleanup ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_manager_prepare ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    os . environ [ ' PREPAREONLY ' ]  =  ' 1 ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    manager . main ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_duplicate_procs ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  len ( procs )  ==  len ( managed_processes ) ,  " Duplicate process names " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_blacklisted_procs ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # TODO: ensure there are blacklisted procs until we have a dedicated test 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  len ( BLACKLIST_PROCS ) ,  " No blacklisted procs to test not_run " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @pytest . mark . skip ( " this test is flaky the way it ' s currently written, should be moved to test_onroad " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_clean_exit ( self ,  subtests ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      Ensure  all  processes  exit  cleanly  when  stopped . 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    HARDWARE . set_power_save ( False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    manager . manager_init ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    CP  =  car . CarParams . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    procs  =  ensure_running ( managed_processes . values ( ) ,  True ,  Params ( ) ,  CP ,  not_run = BLACKLIST_PROCS ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 10 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  p  in  procs : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      with  subtests . test ( proc = p . name ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        state  =  p . get_process_state_msg ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        assert  state . running ,  f " { p . name }  not running " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        exit_code  =  p . stop ( retry = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        assert  p . name  not  in  BLACKLIST_PROCS ,  f " { p . name }  was started " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        assert  exit_code  is  not  None ,  f " { p . name }  failed to exit " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # TODO: interrupted blocking read exits with 1 in cereal. use a more unique return code 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        exit_codes  =  [ 0 ,  1 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  p . sigkill : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          exit_codes  =  [ - signal . SIGKILL ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        assert  exit_code  in  exit_codes ,  f " { p . name }  died with  { exit_code } "