@ -1,13 +1,16 @@ 
			
		
	
		
		
			
				
					
					#!/usr/bin/env python3 #!/usr/bin/env python3  
			
		
	
		
		
			
				
					
					import  random import  random  
			
		
	
		
		
			
				
					
					import  time  
			
		
	
		
		
			
				
					
					import  unittest import  unittest  
			
		
	
		
		
			
				
					
					from  collections  import  defaultdict from  collections  import  defaultdict  
			
		
	
		
		
			
				
					
					from  parameterized  import  parameterized from  parameterized  import  parameterized  
			
		
	
		
		
			
				
					
					import  threading  
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					from  cereal  import  car from  cereal  import  car  
			
		
	
		
		
			
				
					
					from  common . params  import  Params  
			
		
	
		
		
			
				
					
					from  selfdrive . car . car_helpers  import  interfaces from  selfdrive . car . car_helpers  import  interfaces  
			
		
	
		
		
			
				
					
					from  selfdrive . car . fingerprints  import  FW_VERSIONS from  selfdrive . car . fingerprints  import  FW_VERSIONS  
			
		
	
		
		
			
				
					
					from  selfdrive . car . fw_versions  import  FW_QUERY_CONFIGS ,  VERSIONS ,  match_fw_to_car from  selfdrive . car . fw_versions  import  FW_QUERY_CONFIGS ,  VERSIONS ,  match_fw_to_car ,  get_fw_versions  
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					CarFw  =  car . CarParams . CarFw CarFw  =  car . CarParams . CarFw  
			
		
	
		
		
			
				
					
					Ecu  =  car . CarParams . Ecu Ecu  =  car . CarParams . Ecu  
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -15,6 +18,14 @@ Ecu = car.CarParams.Ecu 
			
		
	
		
		
			
				
					
					ECU_NAME  =  { v :  k  for  k ,  v  in  Ecu . schema . enumerants . items ( ) } ECU_NAME  =  { v :  k  for  k ,  v  in  Ecu . schema . enumerants . items ( ) }  
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					class  FakeSocket :  
			
		
	
		
		
			
				
					
					  def  receive ( self ,  non_blocking = False ) :   
			
		
	
		
		
			
				
					
					    pass   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  def  send ( self ,  msg ) :   
			
		
	
		
		
			
				
					
					    pass   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					class  TestFwFingerprint ( unittest . TestCase ) : class  TestFwFingerprint ( unittest . TestCase ) :  
			
		
	
		
		
			
				
					
					  def  assertFingerprints ( self ,  candidates ,  expected ) :    def  assertFingerprints ( self ,  candidates ,  expected ) :   
			
		
	
		
		
			
				
					
					    candidates  =  list ( candidates )      candidates  =  list ( candidates )   
			
		
	
	
		
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
					@ -103,5 +114,58 @@ class TestFwFingerprint(unittest.TestCase): 
			
		
	
		
		
			
				
					
					                         f ' { brand . title ( ) } : FW query whitelist missing ecus:  { ecu_strings } ' )                           f ' { brand . title ( ) } : FW query whitelist missing ecus:  { ecu_strings } ' )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					class  TestFwFingerprintTiming ( unittest . TestCase ) :  
			
		
	
		
		
			
				
					
					  def  _benchmark ( self ,  brand ,  num_pandas ,  ref_time ,  tol ,  n ) :   
			
		
	
		
		
			
				
					
					    params  =  Params ( )   
			
		
	
		
		
			
				
					
					    fake_socket  =  FakeSocket ( )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    times  =  [ ]   
			
		
	
		
		
			
				
					
					    for  _  in  range ( n ) :   
			
		
	
		
		
			
				
					
					      params . put_bool ( " ObdMultiplexingEnabled " ,  True )   
			
		
	
		
		
			
				
					
					      thread  =  threading . Thread ( target = get_fw_versions ,  args = ( fake_socket ,  fake_socket ,  brand ) ,  kwargs = dict ( num_pandas = num_pandas ) )   
			
		
	
		
		
			
				
					
					      thread . start ( )   
			
		
	
		
		
			
				
					
					      t  =  time . perf_counter ( )   
			
		
	
		
		
			
				
					
					      while  thread . is_alive ( ) :   
			
		
	
		
		
			
				
					
					        time . sleep ( 0.02 )   
			
		
	
		
		
			
				
					
					        if  not  params . get_bool ( " ObdMultiplexingChanged " ) :   
			
		
	
		
		
			
				
					
					          params . put_bool ( " ObdMultiplexingChanged " ,  True )   
			
		
	
		
		
			
				
					
					      times . append ( time . perf_counter ( )  -  t )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    avg_time  =  round ( sum ( times )  /  len ( times ) ,  2 )   
			
		
	
		
		
			
				
					
					    self . assertLess ( avg_time ,  ref_time  +  tol )   
			
		
	
		
		
			
				
					
					    self . assertGreater ( avg_time ,  ref_time  -  tol ,  " Performance seems to have improved, update test refs. " )   
			
		
	
		
		
			
				
					
					    return  avg_time   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  @parameterized . expand ( [ ( 1 , ) ,  ( 2 , ) ,  ] )   
			
		
	
		
		
			
				
					
					  def  test_fw_query_timing ( self ,  num_pandas ) :   
			
		
	
		
		
			
				
					
					    brand_ref_times  =  {   
			
		
	
		
		
			
				
					
					      1 :  {   
			
		
	
		
		
			
				
					
					        ' body ' :  0.1 ,   
			
		
	
		
		
			
				
					
					        ' chrysler ' :  0.3 ,   
			
		
	
		
		
			
				
					
					        ' ford ' :  0.2 ,   
			
		
	
		
		
			
				
					
					        ' honda ' :  0.5 ,   
			
		
	
		
		
			
				
					
					        ' hyundai ' :  0.7 ,   
			
		
	
		
		
			
				
					
					        ' mazda ' :  0.1 ,   
			
		
	
		
		
			
				
					
					        ' nissan ' :  0.3 ,   
			
		
	
		
		
			
				
					
					        ' subaru ' :  0.1 ,   
			
		
	
		
		
			
				
					
					        ' tesla ' :  0.2 ,   
			
		
	
		
		
			
				
					
					        ' toyota ' :  0.7 ,   
			
		
	
		
		
			
				
					
					        ' volkswagen ' :  0.2 ,   
			
		
	
		
		
			
				
					
					      } ,   
			
		
	
		
		
			
				
					
					      2 :  {   
			
		
	
		
		
			
				
					
					        ' hyundai ' :  1.1 ,   
			
		
	
		
		
			
				
					
					      }   
			
		
	
		
		
			
				
					
					    }   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    for  brand ,  config  in  FW_QUERY_CONFIGS . items ( ) :   
			
		
	
		
		
			
				
					
					      with  self . subTest ( brand = brand ,  num_pandas = num_pandas ) :   
			
		
	
		
		
			
				
					
					        multi_panda_requests  =  [ r  for  r  in  config . requests  if  r . bus  >  3 ]   
			
		
	
		
		
			
				
					
					        if  not  len ( multi_panda_requests )  and  num_pandas  >  1 :   
			
		
	
		
		
			
				
					
					          raise  unittest . SkipTest ( " No multi-panda FW queries " )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					        avg_time  =  self . _benchmark ( brand ,  num_pandas ,  brand_ref_times [ num_pandas ] [ brand ] ,  0.1 ,  10 )   
			
		
	
		
		
			
				
					
					        print ( f ' { brand =} ,  { num_pandas =} ,  { len ( config . requests ) =} , avg FW query time= { avg_time }  seconds ' )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					if  __name__  ==  " __main__ " : if  __name__  ==  " __main__ " :  
			
		
	
		
		
			
				
					
					  unittest . main ( )    unittest . main ( )