#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  pytest 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  random 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  unittest 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  collections  import  defaultdict 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  parameterized  import  parameterized 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  threading 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal  import  car 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . params  import  Params 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . selfdrive . car . car_helpers  import  interfaces 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . selfdrive . car . fingerprints  import  FW_VERSIONS 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . selfdrive . car . fw_versions  import  FW_QUERY_CONFIGS ,  FUZZY_EXCLUDE_ECUS ,  VERSIONS ,  build_fw_dict ,  \
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                                                match_fw_to_car ,  get_fw_versions ,  get_present_ecus 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . selfdrive . car . vin  import  get_vin 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								CarFw  =  car . CarParams . CarFw 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								Ecu  =  car . CarParams . Ecu 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								ECU_NAME  =  { v :  k  for  k ,  v  in  Ecu . schema . enumerants . items ( ) } 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  FakeSocket : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  receive ( self ,  non_blocking = False ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  send ( self ,  msg ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestFwFingerprint ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  assertFingerprints ( self ,  candidates ,  expected ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    candidates  =  list ( candidates ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . assertEqual ( len ( candidates ) ,  1 ,  f " got more than one candidate:  { candidates } " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . assertEqual ( candidates [ 0 ] ,  expected ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @parameterized . expand ( [ ( b ,  c ,  e [ c ] )  for  b ,  e  in  VERSIONS . items ( )  for  c  in  e ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_exact_match ( self ,  brand ,  car_model ,  ecus ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    CP  =  car . CarParams . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    for  _  in  range ( 200 ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      fw  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  ecu ,  fw_versions  in  ecus . items ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        ecu_name ,  addr ,  sub_addr  =  ecu 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        fw . append ( { " ecu " :  ecu_name ,  " fwVersion " :  random . choice ( fw_versions ) ,  ' brand ' :  brand , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								                   " address " :  addr ,  " subAddress " :  0  if  sub_addr  is  None  else  sub_addr } ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      CP . carFw  =  fw 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      _ ,  matches  =  match_fw_to_car ( CP . carFw ,  allow_fuzzy = False ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . assertFingerprints ( matches ,  car_model ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
											 
										
											
												HKG: use platform codes to fuzzy fingerprint (#28531)
* get gas/ev/hev from FW (not all correct, poc)
* add test for essential ecus for fuzzy fingerprinting
* kinda works
* stash
* clean up
* add code
* simpler
* use the function
* test it with our cars
* no re
no re
no re
* debugging
* handle empty dict
* simpl
* this is promising
start on making existing fingerprinting functions use the config, instead of entirely replacing them
* needs to allow 1 match
* lay out how this should look
* changes
* executable
* some work
* use config
* fuzzy ecus
* config test
* comment and some clean up
* test platform codes
* use regex, simpler and fixes bug
* in func
* rm bad func
* typing for new func and remove old from dc
* todo done
* tested!
* remove fake platform codes
* thought we needed this, but actually...
* not needed
* not applicable any more
* use config for essential ecus
* first draft of test to make adding/removing fuzzy FP platform intentional
* compile
* clean up test
* even cleaner
* fix default ecus type
* temp fix
* this is mostly in tests now
* test every fuzzy ecu fw returns one platform code
* experiment with dates
* Revert "experiment with dates"
This reverts commit 3251b9cc5c3ca41ca92c8b75ad9b2234b720aa0b.
* clean that up
* comment
* test
* work on all cars
* fix fuzz_fw_fingerprint
* comment
* get first by search
* bit more clean up
* and more
* use compiled pattern for nicer syntax
* default
* flip dat around, much cleaner
* clean up hyundai test a bit
* flip order
same here
* rename test and flip subTest
* fix pylint
* revert fw changes
revert fw changes
* line
* add original functions to test
* needs to be a list
* cmt
* draft (need to count one ecu as a match)
* tiny clean up
* todo: date range
* only in notebook
* remove comment (still can be either list or set)
* same, only notebook
* more consistent signature
* copilot inspired
* copilot no good
* test for date parsing
* better name
* good, now we don't have to worry about the dates mismatching in another test/logic
* comment up+
* some stuff
* clean up
fix test
fix test
* test
* comment
* use utils
* clean up (utils are cleaner and less buggy)
* clean up (utils are cleaner and less buggy)
* fixup test
* use a dash (prettier) and remove some platforms that can fingerprint now!
* compile global pattern
* same as what we do in values
* remove comments
* fuzzy_get_platform_codes is one or none here
* more clean up
* sort imports
* woah woah woah
* add comment
* fix reassigning different types
* add types
* adapt fuzzy test recently added (nice it caught this!)
* update lock
* options
comments
* stash
* comments and fixes
* better comments
* better
* test: run on exact fuzzy matching logic, same results!
* use match_fw_to_car
* test all fw
* ex
* unused random
* this is a possibility
* this is more clear
* fix
* revert
* revert to needing both ECUs to match to reduce false positives, test
* fix excluded platform test :( but it's ok
* add comment
* we actually want to only test fuzzy ecus to make it explicit
* fix mypy
* comment for tomorrow
* just add matches with fuzzy FP
* add comment
* this was the cleanest I could think of, but still working on it. not very easy to understand
* think this is better, but also worse...
* comment: reframing how this works
* revert back to what we did before
* was swapped
* else set
* remove old comment
* fixes from merge
* remove fuzzy_min_match_count from this pr
* fix static analysis
* also unused
* different method first draft
* copy existing fuzzy func
* check all possible ecus exist, only platform codes, slightly refactor main loop
* fix
* Revert recent
Revert "fix"
This reverts commit 5cdb7bda835f1e48e750ab4195e891afe08e11ea.
Revert "check all possible ecus exist, only platform codes, slightly refactor main loop"
This reverts commit d3e918fa20fa4ce881445850f5f7428a3c11adf8.
Revert "copy existing fuzzy func"
This reverts commit 34c8c0545097c84f55f4b4f61907606c93760ddd.
Revert "different method first draft"
This reverts commit b91139055d7d1802c1eb726504798c156a183c9c.
* new func
* fixup test
* remove changes from v1 from fw_versions.py
* clean up a bit
* return part as part of code
* fix test
* add original fuzzy function
* add an ecu match if the date is within range (or date doesn't exist)
* add format for what we're going to do
* not working stash
* the exact matching function does more of what we want with less code and less custom logic
* we don't care about found versions, only codes and dates
* actually we do have an exception
* this works pretty nicely now
* up here
* this is better
* some minor clean up
* old function=now junk
* fix platform code test
* remove old platform code function
* now rename _new to
* use FW_QUERY_CONFIG
* clean up imports
* rename that too
* one line
* correct typing
correct typing
* draft tests
* so that works
* fixup excluded platform test now too
* this is tested by excluded platform test
* test parts and dates
* remove old comment
* old import
* take platform code stuff out of FwQueryConfig
* fix test
* revert debug script
* flip order
* make this a set by default
* revert this part
* correct typing
* clean up comments
* clean that test up too/pylint
* combine these three tests ina clean way
* not right
* more general
* be consistent with quotes
* comment
* comment
* comment in fw_versions
* flip order
* this is more readable
* could test all this, but it's tested in test_hyundai and doesn't do a lot here
* only assert brands which use this
* invalidate all CAN FD ICE and hybrid
* tuple
* can get away without filtering
* add comment reasons
* fix
* some review suggestions
* this works (first draft)
* this is better
* script to print platform codes and dates
* sanity check for dates are in correct ecus and platforms
* mypy
* better variable name and comment
* rename
* same
* slightly better name
* subset
* exclude platforms and live car without dates
* consistent
* self explan
* better name
* test to make sure the functions agree
* clean that up
* comment
* we get other responses from queries not in DB, only check any
* not used or typed
old-commit-hash: f788edb6a5c8d3f516076f886fe0d831ee43b580
											 
										 
										
											2 years ago 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @parameterized . expand ( [ ( b ,  c ,  e [ c ] )  for  b ,  e  in  VERSIONS . items ( )  for  c  in  e ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_custom_fuzzy_match ( self ,  brand ,  car_model ,  ecus ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Assert brand-specific fuzzy fingerprinting function doesn't disagree with standard fuzzy function 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    config  =  FW_QUERY_CONFIGS [ brand ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  config . match_fw_to_car_fuzzy  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      raise  unittest . SkipTest ( " Brand does not implement custom fuzzy fingerprinting function " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    CP  =  car . CarParams . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  _  in  range ( 5 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      fw  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  ecu ,  fw_versions  in  ecus . items ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        ecu_name ,  addr ,  sub_addr  =  ecu 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        fw . append ( { " ecu " :  ecu_name ,  " fwVersion " :  random . choice ( fw_versions ) ,  ' brand ' :  brand , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                   " address " :  addr ,  " subAddress " :  0  if  sub_addr  is  None  else  sub_addr } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      CP . carFw  =  fw 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      _ ,  matches  =  match_fw_to_car ( CP . carFw ,  allow_exact = False ,  log = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      brand_matches  =  config . match_fw_to_car_fuzzy ( build_fw_dict ( CP . carFw ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # If both have matches, they must agree 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  len ( matches )  ==  1  and  len ( brand_matches )  ==  1 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertEqual ( matches ,  brand_matches ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @parameterized . expand ( [ ( b ,  c ,  e [ c ] )  for  b ,  e  in  VERSIONS . items ( )  for  c  in  e ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_fuzzy_match_ecu_count ( self ,  brand ,  car_model ,  ecus ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Asserts that fuzzy matching does not count matching FW, but ECU address keys 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    valid_ecus  =  [ e  for  e  in  ecus  if  e [ 0 ]  not  in  FUZZY_EXCLUDE_ECUS ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  not  len ( valid_ecus ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      raise  unittest . SkipTest ( " Car model has no compatible ECUs for fuzzy matching " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    fw  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  ecu  in  valid_ecus : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ecu_name ,  addr ,  sub_addr  =  ecu 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  _  in  range ( 5 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # Add multiple FW versions to simulate ECU returning to multiple queries in a brand 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        fw . append ( { " ecu " :  ecu_name ,  " fwVersion " :  random . choice ( ecus [ ecu ] ) ,  ' brand ' :  brand , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								                   " address " :  addr ,  " subAddress " :  0  if  sub_addr  is  None  else  sub_addr } ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      CP  =  car . CarParams . new_message ( carFw = fw ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      _ ,  matches  =  match_fw_to_car ( CP . carFw ,  allow_exact = False ,  log = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      # Assert no match if there are not enough unique ECUs 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      unique_ecus  =  { ( f [ ' address ' ] ,  f [ ' subAddress ' ] )  for  f  in  fw } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  len ( unique_ecus )  <  2 : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . assertEqual ( len ( matches ) ,  0 ,  car_model ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      # There won't always be a match due to shared FW, but if there is it should be correct 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      elif  len ( matches ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertFingerprints ( matches ,  car_model ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_fw_version_lists ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    for  car_model ,  ecus  in  FW_VERSIONS . items ( ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      with  self . subTest ( car_model = car_model . value ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        for  ecu ,  ecu_fw  in  ecus . items ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          with  self . subTest ( ecu ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            duplicates  =  { fw  for  fw  in  ecu_fw  if  ecu_fw . count ( fw )  >  1 } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								            self . assertFalse ( len ( duplicates ) ,  f ' { car_model } : Duplicate FW versions: Ecu. { ECU_NAME [ ecu [ 0 ] ] } ,  { duplicates } ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            self . assertGreater ( len ( ecu_fw ) ,  0 ,  f ' { car_model } : No FW versions: Ecu. { ECU_NAME [ ecu [ 0 ] ] } ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_all_addrs_map_to_one_ecu ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  brand ,  cars  in  VERSIONS . items ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      addr_to_ecu  =  defaultdict ( set ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  ecus  in  cars . values ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        for  ecu_type ,  addr ,  sub_addr  in  ecus . keys ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          addr_to_ecu [ ( addr ,  sub_addr ) ] . add ( ecu_type ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          ecus_for_addr  =  addr_to_ecu [ ( addr ,  sub_addr ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          ecu_strings  =  " ,  " . join ( [ f ' Ecu. { ECU_NAME [ ecu ] } '  for  ecu  in  ecus_for_addr ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . assertLessEqual ( len ( ecus_for_addr ) ,  1 ,  f " { brand }  has multiple ECUs that map to one address:  { ecu_strings }  -> ( { hex ( addr ) } ,  { sub_addr } ) " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_data_collection_ecus ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # Asserts no extra ECUs are in the fingerprinting database 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    for  brand ,  config  in  FW_QUERY_CONFIGS . items ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  car_model ,  ecus  in  VERSIONS [ brand ] . items ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        bad_ecus  =  set ( ecus ) . intersection ( config . extra_ecus ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        with  self . subTest ( car_model = car_model . value ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          self . assertFalse ( len ( bad_ecus ) ,  f ' { car_model } : Fingerprints contain ECUs added for data collection:  { bad_ecus } ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_blacklisted_ecus ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    blacklisted_addrs  =  ( 0x7c4 ,  0x7d0 )   # includes A/C ecu and an unknown ecu 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  car_model ,  ecus  in  FW_VERSIONS . items ( ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      with  self . subTest ( car_model = car_model . value ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        CP  =  interfaces [ car_model ] [ 0 ] . get_non_essential_params ( car_model ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        if  CP . carName  ==  ' subaru ' : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          for  ecu  in  ecus . keys ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            self . assertNotIn ( ecu [ 1 ] ,  blacklisted_addrs ,  f ' { car_model } : Blacklisted ecu: (Ecu. { ECU_NAME [ ecu [ 0 ] ] } ,  { hex ( ecu [ 1 ] ) } ) ' ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        elif  CP . carName  ==  " chrysler " : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          # Some HD trucks have a combined TCM and ECM 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  CP . carFingerprint . startswith ( " RAM HD " ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            for  ecu  in  ecus . keys ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              self . assertNotEqual ( ecu [ 0 ] ,  Ecu . transmission ,  f " { car_model } : Blacklisted ecu: (Ecu. { ECU_NAME [ ecu [ 0 ] ] } ,  { hex ( ecu [ 1 ] ) } ) " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_missing_versions_and_configs ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    brand_versions  =  set ( VERSIONS . keys ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    brand_configs  =  set ( FW_QUERY_CONFIGS . keys ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  len ( brand_configs  -  brand_versions ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      with  self . subTest ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . fail ( f " Brands do not implement FW_VERSIONS:  { brand_configs  -  brand_versions } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  len ( brand_versions  -  brand_configs ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      with  self . subTest ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . fail ( f " Brands do not implement FW_QUERY_CONFIG:  { brand_versions  -  brand_configs } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_fw_request_ecu_whitelist ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    for  brand ,  config  in  FW_QUERY_CONFIGS . items ( ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      with  self . subTest ( brand = brand ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        whitelisted_ecus  =  { ecu  for  r  in  config . requests  for  ecu  in  r . whitelist_ecus } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        brand_ecus  =  { fw [ 0 ]  for  car_fw  in  VERSIONS [ brand ] . values ( )  for  fw  in  car_fw } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        brand_ecus  | =  { ecu [ 0 ]  for  ecu  in  config . extra_ecus } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        # each ecu in brand's fw versions + extra ecus needs to be whitelisted at least once 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ecus_not_whitelisted  =  brand_ecus  -  whitelisted_ecus 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ecu_strings  =  " ,  " . join ( [ f ' Ecu. { ECU_NAME [ ecu ] } '  for  ecu  in  ecus_not_whitelisted ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertFalse ( len ( whitelisted_ecus )  and  len ( ecus_not_whitelisted ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								                         f ' { brand . title ( ) } : ECUs not in any FW query whitelists:  { ecu_strings } ' ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_fw_requests ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Asserts equal length request and response lists 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  brand ,  config  in  FW_QUERY_CONFIGS . items ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      with  self . subTest ( brand = brand ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        for  request_obj  in  config . requests : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . assertEqual ( len ( request_obj . request ) ,  len ( request_obj . response ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          # No request on the OBD port (bus 1, multiplexed) should be run on an aux panda 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . assertFalse ( request_obj . auxiliary  and  request_obj . bus  ==  1  and  request_obj . obd_multiplexing , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                           f " { brand . title ( ) } : OBD multiplexed request is marked auxiliary:  { request_obj } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  TestFwFingerprintTiming ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  N :  int  =  5 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  TOL :  float  =  0.12 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @staticmethod 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  _run_thread ( thread :  threading . Thread )  - >  float : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    params  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    params . put_bool ( " ObdMultiplexingEnabled " ,  True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    thread . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    t  =  time . perf_counter ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    while  thread . is_alive ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      time . sleep ( 0.02 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  not  params . get_bool ( " ObdMultiplexingChanged " ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        params . put_bool ( " ObdMultiplexingChanged " ,  True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  time . perf_counter ( )  -  t 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _benchmark_brand ( self ,  brand ,  num_pandas ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    fake_socket  =  FakeSocket ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    brand_time  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  _  in  range ( self . N ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      thread  =  threading . Thread ( target = get_fw_versions ,  args = ( fake_socket ,  fake_socket ,  brand ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                                kwargs = dict ( num_pandas = num_pandas ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      brand_time  + =  self . _run_thread ( thread ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  brand_time  /  self . N 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  _assert_timing ( self ,  avg_time ,  ref_time ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertLess ( avg_time ,  ref_time  +  self . TOL ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertGreater ( avg_time ,  ref_time  -  self . TOL ,  " Performance seems to have improved, update test refs. " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_startup_timing ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Tests worse-case VIN query time and typical present ECU query time 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    vin_ref_time  =  1.0 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    present_ecu_ref_time  =  0.8 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    fake_socket  =  FakeSocket ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    present_ecu_time  =  0.0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  _  in  range ( self . N ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      thread  =  threading . Thread ( target = get_present_ecus ,  args = ( fake_socket ,  fake_socket ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                                kwargs = dict ( num_pandas = 2 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      present_ecu_time  + =  self . _run_thread ( thread ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _assert_timing ( present_ecu_time  /  self . N ,  present_ecu_ref_time ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    print ( f ' get_present_ecus, query time= { present_ecu_time  /  self . N }  seconds ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    vin_time  =  0.0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  _  in  range ( self . N ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      thread  =  threading . Thread ( target = get_vin ,  args = ( fake_socket ,  fake_socket ,  1 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      vin_time  + =  self . _run_thread ( thread ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _assert_timing ( vin_time  /  self . N ,  vin_ref_time ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    print ( f ' get_vin, query time= { vin_time  /  self . N }  seconds ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @pytest . mark . timeout ( 60 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_fw_query_timing ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    total_ref_time  =  6.41 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    brand_ref_times  =  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      1 :  { 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' body ' :  0.11 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' chrysler ' :  0.3 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' ford ' :  0.2 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' honda ' :  0.52 , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        ' hyundai ' :  0.72 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' mazda ' :  0.2 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' nissan ' :  0.4 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' subaru ' :  0.52 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' tesla ' :  0.2 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' toyota ' :  1.6 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' volkswagen ' :  0.2 , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      } , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      2 :  { 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' ford ' :  0.3 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        ' hyundai ' :  1.12 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    total_time  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  num_pandas  in  ( 1 ,  2 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  brand ,  config  in  FW_QUERY_CONFIGS . items ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        with  self . subTest ( brand = brand ,  num_pandas = num_pandas ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          multi_panda_requests  =  [ r  for  r  in  config . requests  if  r . bus  >  3 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          if  not  len ( multi_panda_requests )  and  num_pandas  >  1 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            raise  unittest . SkipTest ( " No multi-panda FW queries " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          avg_time  =  self . _benchmark_brand ( brand ,  num_pandas ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          total_time  + =  avg_time 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          avg_time  =  round ( avg_time ,  2 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          self . _assert_timing ( avg_time ,  brand_ref_times [ num_pandas ] [ brand ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          print ( f ' { brand =} ,  { num_pandas =} ,  { len ( config . requests ) =} , avg FW query time= { avg_time }  seconds ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    with  self . subTest ( brand = ' all_brands ' ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      total_time  =  round ( total_time ,  2 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . _assert_timing ( total_time ,  total_ref_time ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      print ( f ' all brands, total FW query time= { total_time }  seconds ' ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  unittest . main ( )