From c5d4bb7ad2f6daf9729bcf87d131c159ceeb5a2d Mon Sep 17 00:00:00 2001 From: Cameron Clough Date: Tue, 5 Mar 2024 23:33:39 +0000 Subject: [PATCH] fw_versions: add more type hints (#31577) * fw_versions: add more type hints * cleanup * fixes * more implicit optional * Update selfdrive/car/fw_versions.py * backslash is unnecessary inside parenthesis --------- Co-authored-by: Shane Smiskol old-commit-hash: fb81cfe3c41fabb33effa4027bb5eed95ced9f19 --- selfdrive/car/fw_versions.py | 54 ++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/selfdrive/car/fw_versions.py b/selfdrive/car/fw_versions.py index 7673814195..c200528ca6 100755 --- a/selfdrive/car/fw_versions.py +++ b/selfdrive/car/fw_versions.py @@ -1,19 +1,20 @@ #!/usr/bin/env python3 from collections import defaultdict -from typing import Any, TypeVar from collections.abc import Iterator +from typing import Any, Protocol, TypeVar + from tqdm import tqdm import capnp import panda.python.uds as uds from cereal import car from openpilot.common.params import Params +from openpilot.common.swaglog import cloudlog from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs -from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig -from openpilot.selfdrive.car.interfaces import get_interface_attr from openpilot.selfdrive.car.fingerprints import FW_VERSIONS +from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig, LiveFwVersions, OfflineFwVersions +from openpilot.selfdrive.car.interfaces import get_interface_attr from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery -from openpilot.common.swaglog import cloudlog Ecu = car.CarParams.Ecu ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa] @@ -38,8 +39,7 @@ def is_brand(brand: str, filter_brand: str | None) -> bool: return filter_brand is None or brand == filter_brand -def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], - filter_brand: str = None) -> dict[AddrType, set[bytes]]: +def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], filter_brand: str = None) -> dict[AddrType, set[bytes]]: fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set) for fw in fw_versions: if is_brand(fw.brand, filter_brand) and not fw.logging: @@ -48,7 +48,12 @@ def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], return dict(fw_versions_dict) -def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude=None): +class MatchFwToCar(Protocol): + def __call__(self, live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True) -> set[str]: + ... + + +def match_fw_to_car_fuzzy(live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True, exclude: str = None) -> set[str]: """Do a fuzzy FW match. This function will return a match, and the number of firmware version that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars the match is rejected.""" @@ -73,7 +78,7 @@ def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude= all_fw_versions[(addr[1], addr[2], f)].append(candidate) matched_ecus = set() - candidate = None + match: str | None = None for addr, versions in live_fw_versions.items(): ecu_key = (addr[0], addr[1]) for version in versions: @@ -82,23 +87,23 @@ def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude= if len(candidates) == 1: matched_ecus.add(ecu_key) - if candidate is None: - candidate = candidates[0] + if match is None: + match = candidates[0] # We uniquely matched two different cars. No fuzzy match possible - elif candidate != candidates[0]: + elif match != candidates[0]: return set() # Note that it is possible to match to a candidate without all its ECUs being present # if there are enough matches. FIXME: parameterize this or require all ECUs to exist like exact matching - if len(matched_ecus) >= 2: + if match and len(matched_ecus) >= 2: if log: - cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {len(matched_ecus)} matching ECUs") - return {candidate} + cloudlog.error(f"Fingerprinted {match} using fuzzy match. {len(matched_ecus)} matching ECUs") + return {match} else: return set() -def match_fw_to_car_exact(live_fw_versions, match_brand=None, log=True, extra_fw_versions=None) -> set[str]: +def match_fw_to_car_exact(live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True, extra_fw_versions: dict = None) -> set[str]: """Do an exact FW match. Returns all cars that match the given FW versions for a list of "essential" ECUs. If an ECU is not considered essential the FW version can be missing to get a fingerprint, but if it's present it @@ -139,9 +144,10 @@ def match_fw_to_car_exact(live_fw_versions, match_brand=None, log=True, extra_fw return set(candidates.keys()) - invalid -def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True): +def match_fw_to_car(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], allow_exact: bool = True, allow_fuzzy: bool = True, + log: bool = True) -> tuple[bool, set[str]]: # Try exact matching first - exact_matches = [] + exact_matches: list[tuple[bool, MatchFwToCar]] = [] if allow_exact: exact_matches = [(True, match_fw_to_car_exact)] if allow_fuzzy: @@ -149,7 +155,7 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True): for exact_match, match_func in exact_matches: # For each brand, attempt to fingerprint using all FW returned from its queries - matches = set() + matches: set[str] = set() for brand in VERSIONS.keys(): fw_versions_dict = build_fw_dict(fw_versions, filter_brand=brand) matches |= match_func(fw_versions_dict, match_brand=brand, log=log) @@ -165,12 +171,12 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True): return True, set() -def get_present_ecus(logcan, sendcan, num_pandas=1) -> set[EcuAddrBusType]: +def get_present_ecus(logcan, sendcan, num_pandas: int = 1) -> set[EcuAddrBusType]: params = Params() # queries are split by OBD multiplexing mode queries: dict[bool, list[list[EcuAddrBusType]]] = {True: [], False: []} parallel_queries: dict[bool, list[EcuAddrBusType]] = {True: [], False: []} - responses = set() + responses: set[EcuAddrBusType] = set() for brand, config, r in REQUESTS: # Skip query if no panda available @@ -231,8 +237,8 @@ def set_obd_multiplexing(params: Params, obd_multiplexing: bool): cloudlog.warning("OBD multiplexing set successfully") -def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \ - list[capnp.lib.capnp._DynamicStructBuilder]: +def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1, + debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: """Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" all_car_fw = [] @@ -254,8 +260,8 @@ def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pand return all_car_fw -def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \ - list[capnp.lib.capnp._DynamicStructBuilder]: +def get_fw_versions(logcan, sendcan, query_brand: str = None, extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1, + debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: versions = VERSIONS.copy() params = Params()