diff --git a/selfdrive/car/tests/test_fw_fingerprint.py b/selfdrive/car/tests/test_fw_fingerprint.py index 9d90766239..09330ec0e9 100644 --- a/selfdrive/car/tests/test_fw_fingerprint.py +++ b/selfdrive/car/tests/test_fw_fingerprint.py @@ -4,18 +4,16 @@ import time from collections import defaultdict from parameterized import parameterized -from cereal import car from openpilot.selfdrive.car.can_definitions import CanData from openpilot.selfdrive.car.car_helpers import interfaces +from openpilot.selfdrive.car.data_structures import CarParams from openpilot.selfdrive.car.fingerprints import FW_VERSIONS from openpilot.selfdrive.car.fw_versions import ESSENTIAL_ECUS, FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, build_fw_dict, \ match_fw_to_car, get_brand_ecu_matches, get_fw_versions, get_fw_versions_ordered, get_present_ecus from openpilot.selfdrive.car.vin import get_vin -CarFw = car.CarParams.CarFw -Ecu = car.CarParams.Ecu - -ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()} +CarFw = CarParams.CarFw +Ecu = CarParams.Ecu class TestFwFingerprint: @@ -27,7 +25,7 @@ class TestFwFingerprint: @parameterized.expand([(b, c, e[c], n) for b, e in VERSIONS.items() for c in e for n in (True, False)]) def test_exact_match(self, brand, car_model, ecus, test_non_essential): config = FW_QUERY_CONFIGS[brand] - CP = car.CarParams.new_message() + CP = CarParams() for _ in range(100): fw = [] for ecu, fw_versions in ecus.items(): @@ -37,8 +35,8 @@ class TestFwFingerprint: continue ecu_name, addr, sub_addr = ecu - fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions), 'brand': brand, - "address": addr, "subAddress": 0 if sub_addr is None else sub_addr}) + fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(fw_versions), brand=brand, + address=addr, subAddress=0 if sub_addr is None else sub_addr)) CP.carFw = fw _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_fuzzy=False) if not test_non_essential: @@ -55,13 +53,13 @@ class TestFwFingerprint: if config.match_fw_to_car_fuzzy is None: pytest.skip("Brand does not implement custom fuzzy fingerprinting function") - CP = car.CarParams.new_message() + CP = CarParams() for _ in range(5): fw = [] for ecu, fw_versions in ecus.items(): ecu_name, addr, sub_addr = ecu - fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions), 'brand': brand, - "address": addr, "subAddress": 0 if sub_addr is None else sub_addr}) + fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(fw_versions), brand=brand, + address=addr, subAddress=0 if sub_addr is None else sub_addr)) CP.carFw = fw _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_exact=False, log=False) brand_matches = config.match_fw_to_car_fuzzy(build_fw_dict(CP.carFw), CP.carVin, VERSIONS[brand]) @@ -82,13 +80,13 @@ class TestFwFingerprint: ecu_name, addr, sub_addr = ecu for _ in range(5): # Add multiple FW versions to simulate ECU returning to multiple queries in a brand - fw.append({"ecu": ecu_name, "fwVersion": random.choice(ecus[ecu]), 'brand': brand, - "address": addr, "subAddress": 0 if sub_addr is None else sub_addr}) - CP = car.CarParams.new_message(carFw=fw) + fw.append(CarFw(ecu=ecu_name, fwVersion=random.choice(ecus[ecu]), brand=brand, + address=addr, subAddress=0 if sub_addr is None else sub_addr)) + CP = CarParams(carFw=fw) _, matches = match_fw_to_car(CP.carFw, CP.carVin, allow_exact=False, log=False) # Assert no match if there are not enough unique ECUs - unique_ecus = {(f['address'], f['subAddress']) for f in fw} + unique_ecus = {(f.address, f.subAddress) for f in fw} if len(unique_ecus) < 2: assert len(matches) == 0, car_model # There won't always be a match due to shared FW, but if there is it should be correct @@ -101,8 +99,8 @@ class TestFwFingerprint: for ecu, ecu_fw in ecus.items(): with subtests.test((ecu[0].value, ecu[1], ecu[2])): duplicates = {fw for fw in ecu_fw if ecu_fw.count(fw) > 1} - assert not len(duplicates), f'{car_model}: Duplicate FW versions: Ecu.{ECU_NAME[ecu[0]]}, {duplicates}' - assert len(ecu_fw) > 0, f'{car_model}: No FW versions: Ecu.{ECU_NAME[ecu[0]]}' + assert not len(duplicates), f'{car_model}: Duplicate FW versions: Ecu.{ecu[0]}, {duplicates}' + assert len(ecu_fw) > 0, f'{car_model}: No FW versions: Ecu.{ecu[0]}' def test_all_addrs_map_to_one_ecu(self): for brand, cars in VERSIONS.items(): @@ -111,7 +109,7 @@ class TestFwFingerprint: for ecu_type, addr, sub_addr in ecus.keys(): addr_to_ecu[(addr, sub_addr)].add(ecu_type) ecus_for_addr = addr_to_ecu[(addr, sub_addr)] - ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_for_addr]) + ecu_strings = ", ".join([f'Ecu.{ecu}' for ecu in ecus_for_addr]) assert len(ecus_for_addr) <= 1, f"{brand} has multiple ECUs that map to one address: {ecu_strings} -> ({hex(addr)}, {sub_addr})" def test_data_collection_ecus(self, subtests): @@ -129,13 +127,13 @@ class TestFwFingerprint: CP = interfaces[car_model][0].get_non_essential_params(car_model) if CP.carName == 'subaru': for ecu in ecus.keys(): - assert ecu[1] not in blacklisted_addrs, f'{car_model}: Blacklisted ecu: (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])})' + assert ecu[1] not in blacklisted_addrs, f'{car_model}: Blacklisted ecu: (Ecu.{ecu[0]}, {hex(ecu[1])})' elif CP.carName == "chrysler": # Some HD trucks have a combined TCM and ECM - if CP.carFingerprint.startswith("RAM HD"): + if CP.carFingerprint.startswith("RAM_HD"): for ecu in ecus.keys(): - assert ecu[0] != Ecu.transmission, f"{car_model}: Blacklisted ecu: (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])})" + assert ecu[0] != Ecu.transmission, f"{car_model}: Blacklisted ecu: (Ecu.{ecu[0]}, {hex(ecu[1])})" def test_non_essential_ecus(self, subtests): for brand, config in FW_QUERY_CONFIGS.items(): @@ -143,7 +141,7 @@ class TestFwFingerprint: # These ECUs are already not in ESSENTIAL_ECUS which the fingerprint functions give a pass if missing unnecessary_non_essential_ecus = set(config.non_essential_ecus) - set(ESSENTIAL_ECUS) assert unnecessary_non_essential_ecus == set(), "Declaring non-essential ECUs non-essential is not required: " + \ - f"{', '.join([f'Ecu.{ECU_NAME[ecu]}' for ecu in unnecessary_non_essential_ecus])}" + f"{', '.join([f'Ecu.{ecu}' for ecu in unnecessary_non_essential_ecus])}" def test_missing_versions_and_configs(self, subtests): brand_versions = set(VERSIONS.keys()) @@ -172,7 +170,7 @@ class TestFwFingerprint: # each ecu in brand's fw versions + extra ecus needs to be whitelisted at least once ecus_not_whitelisted = brand_ecus - whitelisted_ecus - ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_not_whitelisted]) + ecu_strings = ", ".join([f'Ecu.{ecu}' for ecu in ecus_not_whitelisted]) assert not (len(whitelisted_ecus) and len(ecus_not_whitelisted)), \ f'{brand.title()}: ECUs not in any FW query whitelists: {ecu_strings}'