FW_QUERY_CONFIGS: type annotate (#31265)

* annotate

* fix

* clean up

* test

* clean up

* space

* fix
old-commit-hash: c9bd4e4c0d
chrysler-long2
Shane Smiskol 1 year ago committed by GitHub
parent 53659bbb43
commit fee938a524
  1. 7
      selfdrive/car/fw_query_definitions.py
  2. 10
      selfdrive/car/fw_versions.py
  3. 2
      selfdrive/car/tests/test_fw_fingerprint.py

@ -97,15 +97,12 @@ class FwQueryConfig:
new_request.bus += 4 new_request.bus += 4
self.requests.append(new_request) self.requests.append(new_request)
def get_all_ecus(self, offline_fw_versions: OfflineFwVersions, include_ecu_type: bool = True, def get_all_ecus(self, offline_fw_versions: OfflineFwVersions,
include_extra_ecus: bool = True) -> set[EcuAddrSubAddr] | set[AddrType]: include_extra_ecus: bool = True) -> set[EcuAddrSubAddr]:
# Add ecus in database + extra ecus # Add ecus in database + extra ecus
brand_ecus = {ecu for ecus in offline_fw_versions.values() for ecu in ecus} brand_ecus = {ecu for ecus in offline_fw_versions.values() for ecu in ecus}
if include_extra_ecus: if include_extra_ecus:
brand_ecus |= set(self.extra_ecus) brand_ecus |= set(self.extra_ecus)
if not include_ecu_type:
return {(addr, subaddr) for _, addr, subaddr in brand_ecus}
return brand_ecus return brand_ecus

@ -8,7 +8,7 @@ import panda.python.uds as uds
from cereal import car from cereal import car
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig
from openpilot.selfdrive.car.interfaces import get_interface_attr from openpilot.selfdrive.car.interfaces import get_interface_attr
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS from openpilot.selfdrive.car.fingerprints import FW_VERSIONS
from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
@ -18,7 +18,7 @@ Ecu = car.CarParams.Ecu
ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa] ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa]
FUZZY_EXCLUDE_ECUS = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug] FUZZY_EXCLUDE_ECUS = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug]
FW_QUERY_CONFIGS = get_interface_attr('FW_QUERY_CONFIG', ignore_none=True) FW_QUERY_CONFIGS: dict[str, FwQueryConfig] = get_interface_attr('FW_QUERY_CONFIG', ignore_none=True)
VERSIONS = get_interface_attr('FW_VERSIONS', ignore_none=True) VERSIONS = get_interface_attr('FW_VERSIONS', ignore_none=True)
MODEL_TO_BRAND = {c: b for b, e in VERSIONS.items() for c in e} MODEL_TO_BRAND = {c: b for b, e in VERSIONS.items() for c in e}
@ -204,7 +204,7 @@ def get_present_ecus(logcan, sendcan, num_pandas=1) -> Set[EcuAddrBusType]:
def get_brand_ecu_matches(ecu_rx_addrs: Set[EcuAddrBusType]) -> dict[str, set[AddrType]]: def get_brand_ecu_matches(ecu_rx_addrs: Set[EcuAddrBusType]) -> dict[str, set[AddrType]]:
"""Returns dictionary of brands and matches with ECUs in their FW versions""" """Returns dictionary of brands and matches with ECUs in their FW versions"""
brand_addrs = {brand: config.get_all_ecus(VERSIONS[brand], include_ecu_type=False) for brand_addrs = {brand: {(addr, subaddr) for _, addr, subaddr in config.get_all_ecus(VERSIONS[brand])} for
brand, config in FW_QUERY_CONFIGS.items()} brand, config in FW_QUERY_CONFIGS.items()}
brand_matches: dict[str, set[AddrType]] = {brand: set() for brand, _, _ in REQUESTS} brand_matches: dict[str, set[AddrType]] = {brand: set() for brand, _, _ in REQUESTS}
@ -287,8 +287,8 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1,
# Get versions and build capnp list to put into CarParams # Get versions and build capnp list to put into CarParams
car_fw = [] car_fw = []
requests = [(brand, config, r) for brand, config, r in REQUESTS if is_brand(brand, query_brand)] requests = [(brand, config, r) for brand, config, r in REQUESTS if is_brand(brand, query_brand)]
for addr in tqdm(addrs, disable=not progress): for addr_group in tqdm(addrs, disable=not progress): # split by subaddr, if any
for addr_chunk in chunks(addr): for addr_chunk in chunks(addr_group):
for brand, config, r in requests: for brand, config, r in requests:
# Skip query if no panda available # Skip query if no panda available
if r.bus > num_pandas * 4 - 1: if r.bus > num_pandas * 4 - 1:

@ -150,7 +150,7 @@ class TestFwFingerprint(unittest.TestCase):
# Ensure each brand has at least 1 ECU to query, and extra ECU retrieval # Ensure each brand has at least 1 ECU to query, and extra ECU retrieval
for brand, config in FW_QUERY_CONFIGS.items(): for brand, config in FW_QUERY_CONFIGS.items():
self.assertEqual(len(config.get_all_ecus({}, include_extra_ecus=False)), 0) self.assertEqual(len(config.get_all_ecus({}, include_extra_ecus=False)), 0)
self.assertEqual(config.get_all_ecus({}, include_ecu_type=True), set(config.extra_ecus)) self.assertEqual(config.get_all_ecus({}), set(config.extra_ecus))
self.assertGreater(len(config.get_all_ecus(VERSIONS[brand])), 0) self.assertGreater(len(config.get_all_ecus(VERSIONS[brand])), 0)
def test_fw_request_ecu_whitelist(self): def test_fw_request_ecu_whitelist(self):

Loading…
Cancel
Save