|  |  |  | @ -1,19 +1,20 @@ | 
			
		
	
		
			
				
					|  |  |  |  | #!/usr/bin/env python3 | 
			
		
	
		
			
				
					|  |  |  |  | from collections import defaultdict | 
			
		
	
		
			
				
					|  |  |  |  | from typing import Any, TypeVar | 
			
		
	
		
			
				
					|  |  |  |  | from collections.abc import Iterator | 
			
		
	
		
			
				
					|  |  |  |  | from typing import Any, Protocol, TypeVar | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | from tqdm import tqdm | 
			
		
	
		
			
				
					|  |  |  |  | import capnp | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | import panda.python.uds as uds | 
			
		
	
		
			
				
					|  |  |  |  | from cereal import car | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.common.params import Params | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.common.swaglog import cloudlog | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.selfdrive.car.ecu_addrs import get_ecu_addrs | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.selfdrive.car.interfaces import get_interface_attr | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.selfdrive.car.fingerprints import FW_VERSIONS | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.selfdrive.car.fw_query_definitions import AddrType, EcuAddrBusType, FwQueryConfig, LiveFwVersions, OfflineFwVersions | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.selfdrive.car.interfaces import get_interface_attr | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.selfdrive.car.isotp_parallel_query import IsoTpParallelQuery | 
			
		
	
		
			
				
					|  |  |  |  | from openpilot.common.swaglog import cloudlog | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | Ecu = car.CarParams.Ecu | 
			
		
	
		
			
				
					|  |  |  |  | ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa] | 
			
		
	
	
		
			
				
					|  |  |  | @ -38,8 +39,7 @@ def is_brand(brand: str, filter_brand: str | None) -> bool: | 
			
		
	
		
			
				
					|  |  |  |  |   return filter_brand is None or brand == filter_brand | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], | 
			
		
	
		
			
				
					|  |  |  |  |                   filter_brand: str = None) -> dict[AddrType, set[bytes]]: | 
			
		
	
		
			
				
					|  |  |  |  | def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], filter_brand: str = None) -> dict[AddrType, set[bytes]]: | 
			
		
	
		
			
				
					|  |  |  |  |   fw_versions_dict: defaultdict[AddrType, set[bytes]] = defaultdict(set) | 
			
		
	
		
			
				
					|  |  |  |  |   for fw in fw_versions: | 
			
		
	
		
			
				
					|  |  |  |  |     if is_brand(fw.brand, filter_brand) and not fw.logging: | 
			
		
	
	
		
			
				
					|  |  |  | @ -48,7 +48,12 @@ def build_fw_dict(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], | 
			
		
	
		
			
				
					|  |  |  |  |   return dict(fw_versions_dict) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude=None): | 
			
		
	
		
			
				
					|  |  |  |  | class MatchFwToCar(Protocol): | 
			
		
	
		
			
				
					|  |  |  |  |   def __call__(self, live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True) -> set[str]: | 
			
		
	
		
			
				
					|  |  |  |  |     ... | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def match_fw_to_car_fuzzy(live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True, exclude: str = None) -> set[str]: | 
			
		
	
		
			
				
					|  |  |  |  |   """Do a fuzzy FW match. This function will return a match, and the number of firmware version | 
			
		
	
		
			
				
					|  |  |  |  |   that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars | 
			
		
	
		
			
				
					|  |  |  |  |   the match is rejected.""" | 
			
		
	
	
		
			
				
					|  |  |  | @ -73,7 +78,7 @@ def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude= | 
			
		
	
		
			
				
					|  |  |  |  |         all_fw_versions[(addr[1], addr[2], f)].append(candidate) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   matched_ecus = set() | 
			
		
	
		
			
				
					|  |  |  |  |   candidate = None | 
			
		
	
		
			
				
					|  |  |  |  |   match: str | None = None | 
			
		
	
		
			
				
					|  |  |  |  |   for addr, versions in live_fw_versions.items(): | 
			
		
	
		
			
				
					|  |  |  |  |     ecu_key = (addr[0], addr[1]) | 
			
		
	
		
			
				
					|  |  |  |  |     for version in versions: | 
			
		
	
	
		
			
				
					|  |  |  | @ -82,23 +87,23 @@ def match_fw_to_car_fuzzy(live_fw_versions, match_brand=None, log=True, exclude= | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       if len(candidates) == 1: | 
			
		
	
		
			
				
					|  |  |  |  |         matched_ecus.add(ecu_key) | 
			
		
	
		
			
				
					|  |  |  |  |         if candidate is None: | 
			
		
	
		
			
				
					|  |  |  |  |           candidate = candidates[0] | 
			
		
	
		
			
				
					|  |  |  |  |         if match is None: | 
			
		
	
		
			
				
					|  |  |  |  |           match = candidates[0] | 
			
		
	
		
			
				
					|  |  |  |  |         # We uniquely matched two different cars. No fuzzy match possible | 
			
		
	
		
			
				
					|  |  |  |  |         elif candidate != candidates[0]: | 
			
		
	
		
			
				
					|  |  |  |  |         elif match != candidates[0]: | 
			
		
	
		
			
				
					|  |  |  |  |           return set() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   # Note that it is possible to match to a candidate without all its ECUs being present | 
			
		
	
		
			
				
					|  |  |  |  |   # if there are enough matches. FIXME: parameterize this or require all ECUs to exist like exact matching | 
			
		
	
		
			
				
					|  |  |  |  |   if len(matched_ecus) >= 2: | 
			
		
	
		
			
				
					|  |  |  |  |   if match and len(matched_ecus) >= 2: | 
			
		
	
		
			
				
					|  |  |  |  |     if log: | 
			
		
	
		
			
				
					|  |  |  |  |       cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {len(matched_ecus)} matching ECUs") | 
			
		
	
		
			
				
					|  |  |  |  |     return {candidate} | 
			
		
	
		
			
				
					|  |  |  |  |       cloudlog.error(f"Fingerprinted {match} using fuzzy match. {len(matched_ecus)} matching ECUs") | 
			
		
	
		
			
				
					|  |  |  |  |     return {match} | 
			
		
	
		
			
				
					|  |  |  |  |   else: | 
			
		
	
		
			
				
					|  |  |  |  |     return set() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def match_fw_to_car_exact(live_fw_versions, match_brand=None, log=True, extra_fw_versions=None) -> set[str]: | 
			
		
	
		
			
				
					|  |  |  |  | def match_fw_to_car_exact(live_fw_versions: LiveFwVersions, match_brand: str = None, log: bool = True, extra_fw_versions: dict = None) -> set[str]: | 
			
		
	
		
			
				
					|  |  |  |  |   """Do an exact FW match. Returns all cars that match the given | 
			
		
	
		
			
				
					|  |  |  |  |   FW versions for a list of "essential" ECUs. If an ECU is not considered | 
			
		
	
		
			
				
					|  |  |  |  |   essential the FW version can be missing to get a fingerprint, but if it's present it | 
			
		
	
	
		
			
				
					|  |  |  | @ -139,9 +144,10 @@ def match_fw_to_car_exact(live_fw_versions, match_brand=None, log=True, extra_fw | 
			
		
	
		
			
				
					|  |  |  |  |   return set(candidates.keys()) - invalid | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True): | 
			
		
	
		
			
				
					|  |  |  |  | def match_fw_to_car(fw_versions: list[capnp.lib.capnp._DynamicStructBuilder], allow_exact: bool = True, allow_fuzzy: bool = True, | 
			
		
	
		
			
				
					|  |  |  |  |                     log: bool = True) -> tuple[bool, set[str]]: | 
			
		
	
		
			
				
					|  |  |  |  |   # Try exact matching first | 
			
		
	
		
			
				
					|  |  |  |  |   exact_matches = [] | 
			
		
	
		
			
				
					|  |  |  |  |   exact_matches: list[tuple[bool, MatchFwToCar]] = [] | 
			
		
	
		
			
				
					|  |  |  |  |   if allow_exact: | 
			
		
	
		
			
				
					|  |  |  |  |     exact_matches = [(True, match_fw_to_car_exact)] | 
			
		
	
		
			
				
					|  |  |  |  |   if allow_fuzzy: | 
			
		
	
	
		
			
				
					|  |  |  | @ -149,7 +155,7 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True): | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   for exact_match, match_func in exact_matches: | 
			
		
	
		
			
				
					|  |  |  |  |     # For each brand, attempt to fingerprint using all FW returned from its queries | 
			
		
	
		
			
				
					|  |  |  |  |     matches = set() | 
			
		
	
		
			
				
					|  |  |  |  |     matches: set[str] = set() | 
			
		
	
		
			
				
					|  |  |  |  |     for brand in VERSIONS.keys(): | 
			
		
	
		
			
				
					|  |  |  |  |       fw_versions_dict = build_fw_dict(fw_versions, filter_brand=brand) | 
			
		
	
		
			
				
					|  |  |  |  |       matches |= match_func(fw_versions_dict, match_brand=brand, log=log) | 
			
		
	
	
		
			
				
					|  |  |  | @ -165,12 +171,12 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True): | 
			
		
	
		
			
				
					|  |  |  |  |   return True, set() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def get_present_ecus(logcan, sendcan, num_pandas=1) -> set[EcuAddrBusType]: | 
			
		
	
		
			
				
					|  |  |  |  | def get_present_ecus(logcan, sendcan, num_pandas: int = 1) -> set[EcuAddrBusType]: | 
			
		
	
		
			
				
					|  |  |  |  |   params = Params() | 
			
		
	
		
			
				
					|  |  |  |  |   # queries are split by OBD multiplexing mode | 
			
		
	
		
			
				
					|  |  |  |  |   queries: dict[bool, list[list[EcuAddrBusType]]] = {True: [], False: []} | 
			
		
	
		
			
				
					|  |  |  |  |   parallel_queries: dict[bool, list[EcuAddrBusType]] = {True: [], False: []} | 
			
		
	
		
			
				
					|  |  |  |  |   responses = set() | 
			
		
	
		
			
				
					|  |  |  |  |   responses: set[EcuAddrBusType] = set() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   for brand, config, r in REQUESTS: | 
			
		
	
		
			
				
					|  |  |  |  |     # Skip query if no panda available | 
			
		
	
	
		
			
				
					|  |  |  | @ -231,8 +237,8 @@ def set_obd_multiplexing(params: Params, obd_multiplexing: bool): | 
			
		
	
		
			
				
					|  |  |  |  |     cloudlog.warning("OBD multiplexing set successfully") | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \ | 
			
		
	
		
			
				
					|  |  |  |  |   list[capnp.lib.capnp._DynamicStructBuilder]: | 
			
		
	
		
			
				
					|  |  |  |  | def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs: set[EcuAddrBusType], timeout: float = 0.1, num_pandas: int = 1, | 
			
		
	
		
			
				
					|  |  |  |  |                             debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: | 
			
		
	
		
			
				
					|  |  |  |  |   """Queries for FW versions ordering brands by likelihood, breaks when exact match is found""" | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   all_car_fw = [] | 
			
		
	
	
		
			
				
					|  |  |  | @ -254,8 +260,8 @@ def get_fw_versions_ordered(logcan, sendcan, ecu_rx_addrs, timeout=0.1, num_pand | 
			
		
	
		
			
				
					|  |  |  |  |   return all_car_fw | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, num_pandas=1, debug=False, progress=False) -> \ | 
			
		
	
		
			
				
					|  |  |  |  |   list[capnp.lib.capnp._DynamicStructBuilder]: | 
			
		
	
		
			
				
					|  |  |  |  | def get_fw_versions(logcan, sendcan, query_brand: str = None, extra: OfflineFwVersions = None, timeout: float = 0.1, num_pandas: int = 1, | 
			
		
	
		
			
				
					|  |  |  |  |                     debug: bool = False, progress: bool = False) -> list[capnp.lib.capnp._DynamicStructBuilder]: | 
			
		
	
		
			
				
					|  |  |  |  |   versions = VERSIONS.copy() | 
			
		
	
		
			
				
					|  |  |  |  |   params = Params() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  |  | 
 |