| 
						
						
							
								
							
						
						
					 | 
				
				 | 
				 | 
				
					@ -4,18 +4,16 @@ import time | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					from collections import defaultdict | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					from parameterized import parameterized | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					from cereal import car | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					from openpilot.selfdrive.car.can_definitions import CanData | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					from openpilot.selfdrive.car.car_helpers import interfaces | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					from openpilot.selfdrive.car.data_structures import CarParams | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					from openpilot.selfdrive.car.fingerprints import FW_VERSIONS | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					from openpilot.selfdrive.car.fw_versions import ESSENTIAL_ECUS, FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, build_fw_dict, \ | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					                                                match_fw_to_car, get_brand_ecu_matches, get_fw_versions, get_fw_versions_ordered, get_present_ecus | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					from openpilot.selfdrive.car.vin import get_vin | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					CarFw = car.CarParams.CarFw | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					Ecu = car.CarParams.Ecu | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()} | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					CarFw = CarParams.CarFw | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					Ecu = CarParams.Ecu | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					class TestFwFingerprint: | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				 | 
				
					@ -27,7 +25,7 @@ class TestFwFingerprint: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					  @parameterized.expand([(b, c, e[c], n) for b, e in VERSIONS.items() for c in e for n in (True, False)]) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					  def test_exact_match(self, brand, car_model, ecus, test_non_essential): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					    config = FW_QUERY_CONFIGS[brand] | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					    CP = car.CarParams.new_message() | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					    CP = CarParams() | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					    for _ in range(100): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      fw = [] | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      for ecu, fw_versions in ecus.items(): | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				 | 
				
					@ -37,8 +35,8 @@ class TestFwFingerprint: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					          continue | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        ecu_name, addr, sub_addr = ecu | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions), 'brand': brand, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					                   "address": addr, "subAddress": 0 if sub_addr is None else sub_addr}) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(fw_versions), brand=brand, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					                        address=addr, subAddress=0 if sub_addr is None else sub_addr)) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      CP.carFw = fw | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_fuzzy=False) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      if not test_non_essential: | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				 | 
				
					@ -55,13 +53,13 @@ class TestFwFingerprint: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					    if config.match_fw_to_car_fuzzy is None: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      pytest.skip("Brand does not implement custom fuzzy fingerprinting function") | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					    CP = car.CarParams.new_message() | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					    CP = CarParams() | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					    for _ in range(5): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      fw = [] | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      for ecu, fw_versions in ecus.items(): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        ecu_name, addr, sub_addr = ecu | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions), 'brand': brand, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					                   "address": addr, "subAddress": 0 if sub_addr is None else sub_addr}) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(fw_versions), brand=brand, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					                        address=addr, subAddress=0 if sub_addr is None else sub_addr)) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      CP.carFw = fw | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_exact=False, log=False) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      brand_matches = config.match_fw_to_car_fuzzy(build_fw_dict(CP.carFw), CP.carVin, VERSIONS[brand]) | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				 | 
				
					@ -82,13 +80,13 @@ class TestFwFingerprint: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      ecu_name, addr, sub_addr = ecu | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      for _ in range(5): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        # Add multiple FW versions to simulate ECU returning to multiple queries in a brand | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        fw.append({"ecu": ecu_name, "fwVersion": random.choice(ecus[ecu]), 'brand': brand, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					                   "address": addr, "subAddress": 0 if sub_addr is None else sub_addr}) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      CP = car.CarParams.new_message(carFw=fw) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(ecus[ecu]), brand=brand, | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					                        address=addr, subAddress=0 if sub_addr is None else sub_addr)) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      CP = CarParams(carFw=fw) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_exact=False, log=False) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      # Assert no match if there are not enough unique ECUs | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      unique_ecus = {(f['address'], f['subAddress']) for f in fw} | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      unique_ecus = {(f.address, f.subAddress) for f in fw} | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      if len(unique_ecus) < 2: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        assert len(matches) == 0, car_model | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					      # There won't always be a match due to shared FW, but if there is it should be correct | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				 | 
				
					@ -101,8 +99,8 @@ class TestFwFingerprint: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        for ecu, ecu_fw in ecus.items(): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					          with subtests.test((ecu[0].value, ecu[1], ecu[2])): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					            duplicates = {fw for fw in ecu_fw if ecu_fw.count(fw) > 1} | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					            assert not len(duplicates), f'{car_model}: Duplicate FW versions: Ecu.{ECU_NAME[ecu[0]]}, {duplicates}' | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					            assert len(ecu_fw) > 0, f'{car_model}: No FW versions: Ecu.{ECU_NAME[ecu[0]]}' | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					            assert not len(duplicates), f'{car_model}: Duplicate FW versions: Ecu.{ecu[0]}, {duplicates}' | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					            assert len(ecu_fw) > 0, f'{car_model}: No FW versions: Ecu.{ecu[0]}' | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					  def test_all_addrs_map_to_one_ecu(self): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					    for brand, cars in VERSIONS.items(): | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				 | 
				
					@ -111,7 +109,7 @@ class TestFwFingerprint: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        for ecu_type, addr, sub_addr in ecus.keys(): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					          addr_to_ecu[(addr, sub_addr)].add(ecu_type) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					          ecus_for_addr = addr_to_ecu[(addr, sub_addr)] | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					          ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_for_addr]) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					          ecu_strings = ", ".join([f'Ecu.{ecu}' for ecu in ecus_for_addr]) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					          assert len(ecus_for_addr) <= 1, f"{brand} has multiple ECUs that map to one address: {ecu_strings} -> ({hex(addr)}, {sub_addr})" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					  def test_data_collection_ecus(self, subtests): | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				 | 
				
					@ -129,13 +127,13 @@ class TestFwFingerprint: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        CP = interfaces[car_model][0].get_non_essential_params(car_model) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        if CP.carName == 'subaru': | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					          for ecu in ecus.keys(): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					            assert ecu[1] not in blacklisted_addrs, f'{car_model}: Blacklisted ecu: (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])})' | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					            assert ecu[1] not in blacklisted_addrs, f'{car_model}: Blacklisted ecu: (Ecu.{ecu[0]}, {hex(ecu[1])})' | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        elif CP.carName == "chrysler": | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					          # Some HD trucks have a combined TCM and ECM | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					          if CP.carFingerprint.startswith("RAM HD"): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					          if CP.carFingerprint.startswith("RAM_HD"): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					            for ecu in ecus.keys(): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					              assert ecu[0] != Ecu.transmission, f"{car_model}: Blacklisted ecu: (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])})" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					              assert ecu[0] != Ecu.transmission, f"{car_model}: Blacklisted ecu: (Ecu.{ecu[0]}, {hex(ecu[1])})" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					  def test_non_essential_ecus(self, subtests): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					    for brand, config in FW_QUERY_CONFIGS.items(): | 
				
			
			
		
	
	
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
				
				 | 
				 | 
				
					@ -143,7 +141,7 @@ class TestFwFingerprint: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        # These ECUs are already not in ESSENTIAL_ECUS which the fingerprint functions give a pass if missing | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        unnecessary_non_essential_ecus = set(config.non_essential_ecus) - set(ESSENTIAL_ECUS) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        assert unnecessary_non_essential_ecus == set(), "Declaring non-essential ECUs non-essential is not required: " + \ | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					                                                                f"{', '.join([f'Ecu.{ECU_NAME[ecu]}' for ecu in unnecessary_non_essential_ecus])}" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					                                                                f"{', '.join([f'Ecu.{ecu}' for ecu in unnecessary_non_essential_ecus])}" | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					  def test_missing_versions_and_configs(self, subtests): | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					    brand_versions = set(VERSIONS.keys()) | 
				
			
			
		
	
	
		
			
				
					| 
						
							
								
							
						
						
							
								
							
						
						
					 | 
				
				 | 
				 | 
				
					@ -172,7 +170,7 @@ class TestFwFingerprint: | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        # each ecu in brand's fw versions + extra ecus needs to be whitelisted at least once | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        ecus_not_whitelisted = brand_ecus - whitelisted_ecus | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_not_whitelisted]) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        ecu_strings = ", ".join([f'Ecu.{ecu}' for ecu in ecus_not_whitelisted]) | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					        assert not (len(whitelisted_ecus) and len(ecus_not_whitelisted)), \ | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					                         f'{brand.title()}: ECUs not in any FW query whitelists: {ecu_strings}' | 
				
			
			
		
	
		
			
				
					 | 
					 | 
				
				 | 
				 | 
				
					
 | 
				
			
			
		
	
	
		
			
				
					| 
						
							
								
							
						
						
						
					 | 
				
				 | 
				 | 
				
					
  |