|
|
|
@ -15,6 +15,7 @@ from system.swaglog import cloudlog |
|
|
|
|
|
|
|
|
|
Ecu = car.CarParams.Ecu |
|
|
|
|
ESSENTIAL_ECUS = [Ecu.engine, Ecu.eps, Ecu.abs, Ecu.fwdRadar, Ecu.fwdCamera, Ecu.vsa] |
|
|
|
|
FUZZY_EXCLUDE_ECUS = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug] |
|
|
|
|
|
|
|
|
|
FW_QUERY_CONFIGS = get_interface_attr('FW_QUERY_CONFIG', ignore_none=True) |
|
|
|
|
VERSIONS = get_interface_attr('FW_VERSIONS', ignore_none=True) |
|
|
|
@ -28,11 +29,16 @@ def chunks(l, n=128): |
|
|
|
|
yield l[i:i + n] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_brand(brand: str, filter_brand: Optional[str]) -> bool: |
|
|
|
|
"""Returns if brand matches filter_brand or no brand filter is specified""" |
|
|
|
|
return filter_brand is None or brand == filter_brand |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_fw_dict(fw_versions: List[capnp.lib.capnp._DynamicStructBuilder], |
|
|
|
|
filter_brand: Optional[str] = None) -> Dict[Tuple[int, Optional[int]], Set[bytes]]: |
|
|
|
|
fw_versions_dict = defaultdict(set) |
|
|
|
|
for fw in fw_versions: |
|
|
|
|
if (filter_brand is None or fw.brand == filter_brand) and not fw.logging: |
|
|
|
|
if is_brand(fw.brand, filter_brand) and not fw.logging: |
|
|
|
|
sub_addr = fw.subAddress if fw.subAddress != 0 else None |
|
|
|
|
fw_versions_dict[(fw.address, sub_addr)].add(fw.fwVersion) |
|
|
|
|
return dict(fw_versions_dict) |
|
|
|
@ -53,12 +59,6 @@ def match_fw_to_car_fuzzy(fw_versions_dict, log=True, exclude=None): |
|
|
|
|
that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars |
|
|
|
|
the match is rejected.""" |
|
|
|
|
|
|
|
|
|
# These ECUs are known to be shared between models (EPS only between hybrid/ICE version) |
|
|
|
|
# Getting this exactly right isn't crucial, but excluding camera and radar makes it almost |
|
|
|
|
# impossible to get 3 matching versions, even if two models with shared parts are released at the same |
|
|
|
|
# time and only one is in our database. |
|
|
|
|
exclude_types = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug] |
|
|
|
|
|
|
|
|
|
# Build lookup table from (addr, sub_addr, fw) to list of candidate cars |
|
|
|
|
all_fw_versions = defaultdict(list) |
|
|
|
|
for candidate, fw_by_addr in FW_VERSIONS.items(): |
|
|
|
@ -66,35 +66,40 @@ def match_fw_to_car_fuzzy(fw_versions_dict, log=True, exclude=None): |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
for addr, fws in fw_by_addr.items(): |
|
|
|
|
if addr[0] in exclude_types: |
|
|
|
|
# These ECUs are known to be shared between models (EPS only between hybrid/ICE version) |
|
|
|
|
# Getting this exactly right isn't crucial, but excluding camera and radar makes it almost |
|
|
|
|
# impossible to get 3 matching versions, even if two models with shared parts are released at the same |
|
|
|
|
# time and only one is in our database. |
|
|
|
|
if addr[0] in FUZZY_EXCLUDE_ECUS: |
|
|
|
|
continue |
|
|
|
|
for f in fws: |
|
|
|
|
all_fw_versions[(addr[1], addr[2], f)].append(candidate) |
|
|
|
|
|
|
|
|
|
match_count = 0 |
|
|
|
|
matched_ecus = set() |
|
|
|
|
candidate = None |
|
|
|
|
for addr, versions in fw_versions_dict.items(): |
|
|
|
|
ecu_key = (addr[0], addr[1]) |
|
|
|
|
for version in versions: |
|
|
|
|
# All cars that have this FW response on the specified address |
|
|
|
|
candidates = all_fw_versions[(addr[0], addr[1], version)] |
|
|
|
|
candidates = all_fw_versions[(*ecu_key, version)] |
|
|
|
|
|
|
|
|
|
if len(candidates) == 1: |
|
|
|
|
match_count += 1 |
|
|
|
|
matched_ecus.add(ecu_key) |
|
|
|
|
if candidate is None: |
|
|
|
|
candidate = candidates[0] |
|
|
|
|
# We uniquely matched two different cars. No fuzzy match possible |
|
|
|
|
elif candidate != candidates[0]: |
|
|
|
|
return set() |
|
|
|
|
|
|
|
|
|
if match_count >= 2: |
|
|
|
|
if len(matched_ecus) >= 2: |
|
|
|
|
if log: |
|
|
|
|
cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {match_count} matching ECUs") |
|
|
|
|
cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {len(matched_ecus)} matching ECUs") |
|
|
|
|
return {candidate} |
|
|
|
|
else: |
|
|
|
|
return set() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def match_fw_to_car_exact(fw_versions_dict) -> Set[str]: |
|
|
|
|
def match_fw_to_car_exact(fw_versions_dict, log=True) -> Set[str]: |
|
|
|
|
"""Do an exact FW match. Returns all cars that match the given |
|
|
|
|
FW versions for a list of "essential" ECUs. If an ECU is not considered |
|
|
|
|
essential the FW version can be missing to get a fingerprint, but if it's present it |
|
|
|
@ -129,7 +134,7 @@ def match_fw_to_car_exact(fw_versions_dict) -> Set[str]: |
|
|
|
|
return set(candidates.keys()) - set(invalid) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True): |
|
|
|
|
def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True): |
|
|
|
|
# Try exact matching first |
|
|
|
|
exact_matches = [] |
|
|
|
|
if allow_exact: |
|
|
|
@ -142,7 +147,7 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True): |
|
|
|
|
matches = set() |
|
|
|
|
for brand in VERSIONS.keys(): |
|
|
|
|
fw_versions_dict = build_fw_dict(fw_versions, filter_brand=brand) |
|
|
|
|
matches |= match_func(fw_versions_dict) |
|
|
|
|
matches |= match_func(fw_versions_dict, log=log) |
|
|
|
|
|
|
|
|
|
if len(matches): |
|
|
|
|
return exact_match, matches |
|
|
|
@ -277,7 +282,7 @@ def get_fw_versions(logcan, sendcan, query_brand=None, extra=None, timeout=0.1, |
|
|
|
|
|
|
|
|
|
# Get versions and build capnp list to put into CarParams |
|
|
|
|
car_fw = [] |
|
|
|
|
requests = [(brand, config, r) for brand, config, r in REQUESTS if query_brand is None or brand == query_brand] |
|
|
|
|
requests = [(brand, config, r) for brand, config, r in REQUESTS if is_brand(brand, query_brand)] |
|
|
|
|
for addr in tqdm(addrs, disable=not progress): |
|
|
|
|
for addr_chunk in chunks(addr): |
|
|
|
|
for brand, config, r in requests: |
|
|
|
|