FPv2: support FwQueryConfig with no FW versions (#31227)

* bump

* from https://github.com/commaai/openpilot/pull/27929

* get VIN on bolt!

* might as well try on other gms

* remove vin

* ugh gm is going to be slow

* fix

* should really fix this

* revert

* happy?1

* fix unit test

* bump

* functional addressing must be an OBD gateway feature, this does nothing

* fix vin response

* fix addr!

* finally fix fw_versions bugs since boardd IsOnroad refactor

* for

* only bus 0

* clean up

* Update selfdrive/car/gm/values.py

* ChatGPT re-write

* filter out did

* todo

* oof

* preview: what multiple DIDs per ECU would look like in the future

* Revert "preview: what multiple DIDs per ECU would look like in the future"

This reverts commit 88f0d8611e.

* function to get all ecus

* we can remove this!

* can also do this!

* and this one too :o

* consistency

* yay

* add tests

* revert GM stuff

* another PR

* reads better

* revert rest of gm

* use that

* interesting

* these are exactly the same (with ordering differences)

* Revert "these are exactly the same (with ordering differences)"

This reverts commit a9e918dc35.

* flip
pull/31221/head
Shane Smiskol 1 year ago committed by GitHub
parent c3fcf75737
commit 2c0f7b8727
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 13
      selfdrive/car/fw_query_definitions.py
  2. 68
      selfdrive/car/fw_versions.py
  3. 6
      selfdrive/car/tests/test_fw_fingerprint.py

@ -96,3 +96,16 @@ class FwQueryConfig:
new_request = copy.deepcopy(self.requests[i])
new_request.bus += 4
self.requests.append(new_request)
def get_all_ecus(self, offline_fw_versions: OfflineFwVersions, include_ecu_type: bool = True,
include_extra_ecus: bool = True) -> set[EcuAddrSubAddr] | set[AddrType]:
# Add ecus in database + extra ecus
brand_ecus = {ecu for ecus in offline_fw_versions.values() for ecu in ecus}
if include_extra_ecus:
brand_ecus |= set(self.extra_ecus)
if not include_ecu_type:
return {(addr, subaddr) for _, addr, subaddr in brand_ecus}
return brand_ecus

@ -45,16 +45,6 @@ def build_fw_dict(fw_versions: List[capnp.lib.capnp._DynamicStructBuilder],
return dict(fw_versions_dict)
def get_brand_addrs() -> Dict[str, Set[AddrType]]:
brand_addrs: DefaultDict[str, Set[AddrType]] = defaultdict(set)
for brand, cars in VERSIONS.items():
# Add ecus in database + extra ecus to match against
brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in FW_QUERY_CONFIGS[brand].extra_ecus}
for fw in cars.values():
brand_addrs[brand] |= {(addr, sub_addr) for _, addr, sub_addr in fw.keys()}
return dict(brand_addrs)
def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude=None):
"""Do a fuzzy FW match. This function will return a match, and the number of firmware version
that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars
@ -184,22 +174,21 @@ def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]:
if r.bus > num_pandas * 4 - 1:
continue
for brand_versions in VERSIONS[brand].values():
for ecu_type, addr, sub_addr in list(brand_versions) + config.extra_ecus:
# Only query ecus in whitelist if whitelist is not empty
if len(r.whitelist_ecus) == 0 or ecu_type in r.whitelist_ecus:
a = (addr, sub_addr, r.bus)
# Build set of queries
if sub_addr is None:
if a not in parallel_queries[r.obd_multiplexing]:
parallel_queries[r.obd_multiplexing].append(a)
else: # subaddresses must be queried one by one
if [a] not in queries[r.obd_multiplexing]:
queries[r.obd_multiplexing].append([a])
# Build set of expected responses to filter
response_addr = uds.get_rx_addr_for_tx_addr(addr, r.rx_offset)
responses.add((response_addr, sub_addr, r.bus))
for ecu_type, addr, sub_addr in config.get_all_ecus(VERSIONS[brand]):
# Only query ecus in whitelist if whitelist is not empty
if len(r.whitelist_ecus) == 0 or ecu_type in r.whitelist_ecus:
a = (addr, sub_addr, r.bus)
# Build set of queries
if sub_addr is None:
if a not in parallel_queries[r.obd_multiplexing]:
parallel_queries[r.obd_multiplexing].append(a)
else: # subaddresses must be queried one by one
if [a] not in queries[r.obd_multiplexing]:
queries[r.obd_multiplexing].append([a])
# Build set of expected responses to filter
response_addr = uds.get_rx_addr_for_tx_addr(addr, r.rx_offset)
responses.add((response_addr, sub_addr, r.bus))
for obd_multiplexing in queries:
queries[obd_multiplexing].insert(0, parallel_queries[obd_multiplexing])
@ -215,7 +204,8 @@ def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]:
def get_brand_ecu_matches(ecu_rx_addrs):
"""Returns dictionary of brands and matches with ECUs in their FW versions"""
brand_addrs = get_brand_addrs()
brand_addrs = {brand: config.get_all_ecus(VERSIONS[brand], include_ecu_type=False) for
brand, config in FW_QUERY_CONFIGS.items()}
brand_matches = {brand: set() for brand, _, _ in REQUESTS}
brand_rx_offsets = {(brand, r.rx_offset) for brand, _, r in REQUESTS}
@ -280,19 +270,17 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1,
for brand, brand_versions in versions.items():
config = FW_QUERY_CONFIGS[brand]
for ecu in brand_versions.values():
# Each brand can define extra ECUs to query for data collection
for ecu_type, addr, sub_addr in list(ecu) + config.extra_ecus:
a = (brand, addr, sub_addr)
if a not in ecu_types:
ecu_types[a] = ecu_type
if sub_addr is None:
if a not in parallel_addrs:
parallel_addrs.append(a)
else:
if [a] not in addrs:
addrs.append([a])
for ecu_type, addr, sub_addr in config.get_all_ecus(brand_versions):
a = (brand, addr, sub_addr)
if a not in ecu_types:
ecu_types[a] = ecu_type
if sub_addr is None:
if a not in parallel_addrs:
parallel_addrs.append(a)
else:
if [a] not in addrs:
addrs.append([a])
addrs.insert(0, parallel_addrs)

@ -147,6 +147,12 @@ class TestFwFingerprint(unittest.TestCase):
with self.subTest():
self.fail(f"Brands do not implement FW_QUERY_CONFIG: {brand_versions - brand_configs}")
# Ensure each brand has at least 1 ECU to query, and extra ECU retrieval
for brand, config in FW_QUERY_CONFIGS.items():
self.assertEqual(len(config.get_all_ecus({}, include_extra_ecus=False)), 0)
self.assertEqual(config.get_all_ecus({}, include_ecu_type=True), set(config.extra_ecus))
self.assertGreater(len(config.get_all_ecus(VERSIONS[brand])), 0)
def test_fw_request_ecu_whitelist(self):
for brand, config in FW_QUERY_CONFIGS.items():
with self.subTest(brand=brand):

Loading…
Cancel
Save