diff --git a/selfdrive/car/fw_query_definitions.py b/selfdrive/car/fw_query_definitions.py index fcd0845e14..e069fad13b 100755 --- a/selfdrive/car/fw_query_definitions.py +++ b/selfdrive/car/fw_query_definitions.py @@ -96,3 +96,16 @@ class FwQueryConfig: new_request = copy.deepcopy(self.requests[i]) new_request.bus += 4 self.requests.append(new_request) + + def get_all_ecus(self, offline_fw_versions: OfflineFwVersions, include_ecu_type: bool = True, + include_extra_ecus: bool = True) -> set[EcuAddrSubAddr] | set[AddrType]: + # Add ecus in database + extra ecus + brand_ecus = {ecu for ecus in offline_fw_versions.values() for ecu in ecus} + + if include_extra_ecus: + brand_ecus |= set(self.extra_ecus) + + if not include_ecu_type: + return {(addr, subaddr) for _, addr, subaddr in brand_ecus} + + return brand_ecus diff --git a/selfdrive/car/fw_versions.py b/selfdrive/car/fw_versions.py index ab4255a8d2..c9ca249ceb 100755 --- a/selfdrive/car/fw_versions.py +++ b/selfdrive/car/fw_versions.py @@ -45,16 +45,6 @@ def build_fw_dict(fw_versions: List[capnp.lib.capnp._DynamicStructBuilder], return dict(fw_versions_dict) -def get_brand_addrs() -> Dict[str, Set[AddrType]]: - brand_addrs: DefaultDict[str, Set[AddrType]] = defaultdict(set) - for brand, cars in VERSIONS.items(): - # Add ecus in database + extra ecus to match against - brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in FW_QUERY_CONFIGS[brand].extra_ecus} - for fw in cars.values(): - brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in fw.keys()} - return dict(brand_addrs) - - def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude=None): """Do a fuzzy FW match. This function will return a match, and the number of firmware version that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars @@ -184,22 +174,21 @@ def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]: if r.bus > num_pandas * 4 - 1: continue - for brand_versions in VERSIONS[brand].values(): - for ecu_type, addr, sub_addr in list(brand_versions) + config.extra_ecus: - # Only query ecus in whitelist if whitelist is not empty - if len(r.whitelist_ecus) == 0 or ecu_type in r.whitelist_ecus: - a = (addr, sub_addr, r.bus) - # Build set of queries - if sub_addr is None: - if a not in parallel_queries[r.obd_multiplexing]: - parallel_queries[r.obd_multiplexing].append(a) - else: # subaddresses must be queried one by one - if [a] not in queries[r.obd_multiplexing]: - queries[r.obd_multiplexing].append([a]) - - # Build set of expected responses to filter - response_addr = uds.get_rx_addr_for_tx_addr(addr, r.rx_offset) - responses.add((response_addr, sub_addr, r.bus)) + for ecu_type, addr, sub_addr in config.get_all_ecus(VERSIONS[brand]): + # Only query ecus in whitelist if whitelist is not empty + if len(r.whitelist_ecus) == 0 or ecu_type in r.whitelist_ecus: + a = (addr, sub_addr, r.bus) + # Build set of queries + if sub_addr is None: + if a not in parallel_queries[r.obd_multiplexing]: + parallel_queries[r.obd_multiplexing].append(a) + else: # subaddresses must be queried one by one + if [a] not in queries[r.obd_multiplexing]: + queries[r.obd_multiplexing].append([a]) + + # Build set of expected responses to filter + response_addr = uds.get_rx_addr_for_tx_addr(addr, r.rx_offset) + responses.add((response_addr, sub_addr, r.bus)) for obd_multiplexing in queries: queries[obd_multiplexing].insert(0, parallel_queries[obd_multiplexing]) @@ -215,7 +204,8 @@ def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]: def get_brand_ecu_matches(ecu_rx_addrs): """Returns dictionary of brands and matches with ECUs in their FW versions""" - brand_addrs = get_brand_addrs() + brand_addrs = {brand: config.get_all_ecus(VERSIONS[brand], include_ecu_type=False) for + brand, config in FW_QUERY_CONFIGS.items()} brand_matches = {brand: set() for brand, _, _ in REQUESTS} brand_rx_offsets = {(brand, r.rx_offset) for brand, _, r in REQUESTS} @@ -280,19 +270,17 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, for brand, brand_versions in versions.items(): config = FW_QUERY_CONFIGS[brand] - for ecu in brand_versions.values(): - # Each brand can define extra ECUs to query for data collection - for ecu_type, addr, sub_addr in list(ecu) + config.extra_ecus: - a = (brand, addr, sub_addr) - if a not in ecu_types: - ecu_types[a] = ecu_type - - if sub_addr is None: - if a not in parallel_addrs: - parallel_addrs.append(a) - else: - if [a] not in addrs: - addrs.append([a]) + for ecu_type, addr, sub_addr in config.get_all_ecus(brand_versions): + a = (brand, addr, sub_addr) + if a not in ecu_types: + ecu_types[a] = ecu_type + + if sub_addr is None: + if a not in parallel_addrs: + parallel_addrs.append(a) + else: + if [a] not in addrs: + addrs.append([a]) addrs.insert(0, parallel_addrs) diff --git a/selfdrive/car/tests/test_fw_fingerprint.py b/selfdrive/car/tests/test_fw_fingerprint.py index 09b05d5166..7cd670cf7c 100755 --- a/selfdrive/car/tests/test_fw_fingerprint.py +++ b/selfdrive/car/tests/test_fw_fingerprint.py @@ -147,6 +147,12 @@ class TestFwFingerprint(unittest.TestCase): with self.subTest(): self.fail(f"Brands do not implement FW_QUERY_CONFIG: {brand_versions - brand_configs}") + # Ensure each brand has at least 1 ECU to query, and extra ECU retrieval + for brand, config in FW_QUERY_CONFIGS.items(): + self.assertEqual(len(config.get_all_ecus({}, include_extra_ecus=False)), 0) + self.assertEqual(config.get_all_ecus({}, include_ecu_type=True), set(config.extra_ecus)) + self.assertGreater(len(config.get_all_ecus(VERSIONS[brand])), 0) + def test_fw_request_ecu_whitelist(self): for brand, config in FW_QUERY_CONFIGS.items(): with self.subTest(brand=brand):