fw_versions: add more type hints (#31577)

* fw_versions: add more type hints

* cleanup

* fixes

* more implicit optional

* Update selfdrive/car/fw_versions.py

* backslash is unnecessary inside parenthesis

---------

Co-authored-by: Shane Smiskol <shane@smiskol.com>
old-commit-hash: fb81cfe3c4
chrysler-long2
Cameron Clough 1 year ago committed by GitHub
parent be4375a3ae
commit c5d4bb7ad2
  1. 54
      selfdrive/car/fw_versions.py

@ -1,19 +1,20 @@
#!/usr/bin/env python3
from collections import defaultdict
from typing import Any, TypeVar
from collections.abc import Iterator
from typing import Any, Protocol, TypeVar
from tqdm import tqdm
import capnp
import panda.python.uds as uds
from cereal import car
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig
from openpilot.selfdrive.car.interfaces import get_interface_attr
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS
from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig, LiveFwVersions, OfflineFwVersions
from openpilot.selfdrive.car.interfaces import get_interface_attr
from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery
from openpilot.common.swaglog import cloudlog
Ecu = car.CarParams.Ecu
ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa]
@ -38,8 +39,7 @@ def is_brand(brand: str, filter_brand: str | None) -> bool:
return filter_brand is None or brand == filter_brand
def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder],
filter_brand: str = None) -> dict[AddrType, set[bytes]]:
def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], filter_brand: str = None) -> dict[AddrType, set[bytes]]:
fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set)
for fw in fw_versions:
if is_brand(fw.brand, filter_brand) and not fw.logging:
@ -48,7 +48,12 @@ def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder],
return dict(fw_versions_dict)
def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude=None):
class MatchFwToCar(Protocol):
def __call__(self, live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True) -> set[str]:
...
def match_fw_to_car_fuzzy(live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True, exclude: str = None) -> set[str]:
"""Do a fuzzy FW match. This function will return a match, and the number of firmware version
that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars
the match is rejected."""
@ -73,7 +78,7 @@ def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude=
all_fw_versions[(addr[1], addr[2], f)].append(candidate)
matched_ecus = set()
candidate = None
match: str | None = None
for addr, versions in live_fw_versions.items():
ecu_key = (addr[0], addr[1])
for version in versions:
@ -82,23 +87,23 @@ def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude=
if len(candidates) == 1:
matched_ecus.add(ecu_key)
if candidate is None:
candidate = candidates[0]
if match is None:
match = candidates[0]
# We uniquely matched two different cars. No fuzzy match possible
elif candidate != candidates[0]:
elif match != candidates[0]:
return set()
# Note that it is possible to match to a candidate without all its ECUs being present
# if there are enough matches. FIXME: parameterize this or require all ECUs to exist like exact matching
if len(matched_ecus) >= 2:
if match and len(matched_ecus) >= 2:
if log:
cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {len(matched_ecus)} matching ECUs")
return {candidate}
cloudlog.error(f"Fingerprinted {match} using fuzzy match. {len(matched_ecus)} matching ECUs")
return {match}
else:
return set()
def match_fw_to_car_exact(live_fw_versions, match_brand=None, log=True, extra_fw_versions=None) -> set[str]:
def match_fw_to_car_exact(live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True, extra_fw_versions: dict = None) -> set[str]:
"""Do an exact FW match. Returns all cars that match the given
FW versions for a list of "essential" ECUs. If an ECU is not considered
essential the FW version can be missing to get a fingerprint, but if it's present it
@ -139,9 +144,10 @@ def match_fw_to_car_exact(live_fw_versions, match_brand=None, log=True, extra_fw
return set(candidates.keys()) - invalid
def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True):
def match_fw_to_car(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], allow_exact: bool = True, allow_fuzzy: bool = True,
log: bool = True) -> tuple[bool, set[str]]:
# Try exact matching first
exact_matches = []
exact_matches: list[tuple[bool, MatchFwToCar]] = []
if allow_exact:
exact_matches = [(True, match_fw_to_car_exact)]
if allow_fuzzy:
@ -149,7 +155,7 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True):
for exact_match, match_func in exact_matches:
# For each brand, attempt to fingerprint using all FW returned from its queries
matches = set()
matches: set[str] = set()
for brand in VERSIONS.keys():
fw_versions_dict = build_fw_dict(fw_versions, filter_brand=brand)
matches |= match_func(fw_versions_dict, match_brand=brand, log=log)
@ -165,12 +171,12 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True):
return True, set()
def get_present_ecus(logcan, sendcan, num_pandas=1) -> set[EcuAddrBusType]:
def get_present_ecus(logcan, sendcan, num_pandas: int = 1) -> set[EcuAddrBusType]:
params = Params()
# queries are split by OBD multiplexing mode
queries: dict[bool, list[list[EcuAddrBusType]]] = {True: [], False: []}
parallel_queries: dict[bool, list[EcuAddrBusType]] = {True: [], False: []}
responses = set()
responses: set[EcuAddrBusType] = set()
for brand, config, r in REQUESTS:
# Skip query if no panda available
@ -231,8 +237,8 @@ def set_obd_multiplexing(params: Params, obd_multiplexing: bool):
cloudlog.warning("OBD multiplexing set successfully")
def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \
list[capnp.lib.capnp._DynamicStructBuilder]:
def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1,
debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]:
"""Queries for FW versions ordering brands by likelihood, breaks when exact match is found"""
all_car_fw = []
@ -254,8 +260,8 @@ def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pand
return all_car_fw
def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \
list[capnp.lib.capnp._DynamicStructBuilder]:
def get_fw_versions(logcan, sendcan, query_brand: str = None, extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1,
debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]:
versions = VERSIONS.copy()
params = Params()

Loading…
Cancel
Save