|
|
@ -48,50 +48,50 @@ def get_brand_addrs() -> Dict[str, Set[Tuple[int, Optional[int]]]]: |
|
|
|
return dict(brand_addrs) |
|
|
|
return dict(brand_addrs) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def match_fw_to_car_fuzzy_orig(fw_versions_dict, log=True, exclude=None): |
|
|
|
# def match_fw_to_car_fuzzy_orig(fw_versions_dict, log=True, exclude=None): |
|
|
|
"""Do a fuzzy FW match. This function will return a match, and the number of firmware version |
|
|
|
# """Do a fuzzy FW match. This function will return a match, and the number of firmware version |
|
|
|
that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars |
|
|
|
# that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars |
|
|
|
the match is rejected.""" |
|
|
|
# the match is rejected.""" |
|
|
|
|
|
|
|
# |
|
|
|
# These ECUs are known to be shared between models (EPS only between hybrid/ICE version) |
|
|
|
# # These ECUs are known to be shared between models (EPS only between hybrid/ICE version) |
|
|
|
# Getting this exactly right isn't crucial, but excluding camera and radar makes it almost |
|
|
|
# # Getting this exactly right isn't crucial, but excluding camera and radar makes it almost |
|
|
|
# impossible to get 3 matching versions, even if two models with shared parts are released at the same |
|
|
|
# # impossible to get 3 matching versions, even if two models with shared parts are released at the same |
|
|
|
# time and only one is in our database. |
|
|
|
# # time and only one is in our database. |
|
|
|
exclude_types = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug] |
|
|
|
# exclude_types = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps, Ecu.debug] |
|
|
|
|
|
|
|
# |
|
|
|
# Build lookup table from (addr, sub_addr, fw) to list of candidate cars |
|
|
|
# # Build lookup table from (addr, sub_addr, fw) to list of candidate cars |
|
|
|
all_fw_versions = defaultdict(list) |
|
|
|
# all_fw_versions = defaultdict(list) |
|
|
|
for candidate, fw_by_addr in FW_VERSIONS.items(): |
|
|
|
# for candidate, fw_by_addr in FW_VERSIONS.items(): |
|
|
|
if candidate == exclude: |
|
|
|
# if candidate == exclude: |
|
|
|
continue |
|
|
|
# continue |
|
|
|
|
|
|
|
# |
|
|
|
for addr, fws in fw_by_addr.items(): |
|
|
|
# for addr, fws in fw_by_addr.items(): |
|
|
|
if addr[0] in exclude_types: |
|
|
|
# if addr[0] in exclude_types: |
|
|
|
continue |
|
|
|
# continue |
|
|
|
for f in fws: |
|
|
|
# for f in fws: |
|
|
|
all_fw_versions[(addr[1], addr[2], f)].append(candidate) |
|
|
|
# all_fw_versions[(addr[1], addr[2], f)].append(candidate) |
|
|
|
|
|
|
|
# |
|
|
|
match_count = 0 |
|
|
|
# match_count = 0 |
|
|
|
candidate = None |
|
|
|
# candidate = None |
|
|
|
for addr, versions in fw_versions_dict.items(): |
|
|
|
# for addr, versions in fw_versions_dict.items(): |
|
|
|
for version in versions: |
|
|
|
# for version in versions: |
|
|
|
# All cars that have this FW response on the specified address |
|
|
|
# # All cars that have this FW response on the specified address |
|
|
|
candidates = all_fw_versions[(addr[0], addr[1], version)] |
|
|
|
# candidates = all_fw_versions[(addr[0], addr[1], version)] |
|
|
|
|
|
|
|
# |
|
|
|
if len(candidates) == 1: |
|
|
|
# if len(candidates) == 1: |
|
|
|
match_count += 1 |
|
|
|
# match_count += 1 |
|
|
|
if candidate is None: |
|
|
|
# if candidate is None: |
|
|
|
candidate = candidates[0] |
|
|
|
# candidate = candidates[0] |
|
|
|
# We uniquely matched two different cars. No fuzzy match possible |
|
|
|
# # We uniquely matched two different cars. No fuzzy match possible |
|
|
|
elif candidate != candidates[0]: |
|
|
|
# elif candidate != candidates[0]: |
|
|
|
return set() |
|
|
|
# return set() |
|
|
|
|
|
|
|
# |
|
|
|
if match_count >= 2: |
|
|
|
# if match_count >= 2: |
|
|
|
if log: |
|
|
|
# if log: |
|
|
|
cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {match_count} matching ECUs") |
|
|
|
# cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {match_count} matching ECUs") |
|
|
|
return {candidate} |
|
|
|
# return {candidate} |
|
|
|
else: |
|
|
|
# else: |
|
|
|
return set() |
|
|
|
# return set() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def match_fw_to_car_fuzzy(fw_versions_dict, config, log=True, exclude=None): |
|
|
|
def match_fw_to_car_fuzzy(fw_versions_dict, config, log=True, exclude=None): |
|
|
@ -125,20 +125,32 @@ def match_fw_to_car_fuzzy(fw_versions_dict, config, log=True, exclude=None): |
|
|
|
for platform_code in config.fuzzy_get_platform_codes(fws): |
|
|
|
for platform_code in config.fuzzy_get_platform_codes(fws): |
|
|
|
all_platform_codes[(addr[1], addr[2], platform_code)].append(candidate) |
|
|
|
all_platform_codes[(addr[1], addr[2], platform_code)].append(candidate) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
matches = defaultdict(set) |
|
|
|
match_count = 0 |
|
|
|
match_count = 0 |
|
|
|
candidate = None |
|
|
|
candidate = None |
|
|
|
|
|
|
|
print('herererer') |
|
|
|
|
|
|
|
print(fw_versions_dict) |
|
|
|
for addr, versions in fw_versions_dict.items(): |
|
|
|
for addr, versions in fw_versions_dict.items(): |
|
|
|
for version in versions: |
|
|
|
for version in versions: |
|
|
|
# All cars that have this FW response on the specified address |
|
|
|
# All cars that have this FW response on the specified address |
|
|
|
candidates = all_fw_versions[(addr[0], addr[1], version)] |
|
|
|
candidates = all_fw_versions[(addr[0], addr[1], version)] |
|
|
|
|
|
|
|
# matches[(addr[0], addr[1])] |= candidates |
|
|
|
|
|
|
|
print(candidates) |
|
|
|
|
|
|
|
|
|
|
|
# If no exact FW matches, try brand-specific fuzzy fingerprinting |
|
|
|
# If no exact FW matches, try brand-specific fuzzy fingerprinting |
|
|
|
if len(candidates) != 1 and config.fuzzy_get_platform_codes is not None: |
|
|
|
if len(candidates) != 1 and config.fuzzy_get_platform_codes is not None: |
|
|
|
|
|
|
|
print('no candidates, trying hkg') |
|
|
|
platform_codes = config.fuzzy_get_platform_codes([version]) |
|
|
|
platform_codes = config.fuzzy_get_platform_codes([version]) |
|
|
|
|
|
|
|
print(addr, version, platform_codes) |
|
|
|
if len(platform_codes) == 1: |
|
|
|
if len(platform_codes) == 1: |
|
|
|
|
|
|
|
print('one platform code') |
|
|
|
platform_code = list(platform_codes)[0] |
|
|
|
platform_code = list(platform_codes)[0] |
|
|
|
candidates = all_platform_codes[(addr[0], addr[1], platform_code)] |
|
|
|
candidates = all_platform_codes[(addr[0], addr[1], platform_code)] |
|
|
|
|
|
|
|
# matches[(addr[0], addr[1])] |= candidates |
|
|
|
|
|
|
|
print('candidates', candidates) |
|
|
|
|
|
|
|
matches[(addr[0], addr[1])] |= set(candidates) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print() |
|
|
|
if len(candidates) == 1: |
|
|
|
if len(candidates) == 1: |
|
|
|
match_count += 1 |
|
|
|
match_count += 1 |
|
|
|
if candidate is None: |
|
|
|
if candidate is None: |
|
|
@ -147,6 +159,20 @@ def match_fw_to_car_fuzzy(fw_versions_dict, config, log=True, exclude=None): |
|
|
|
elif candidate != candidates[0]: |
|
|
|
elif candidate != candidates[0]: |
|
|
|
return set() |
|
|
|
return set() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print('matches', matches) |
|
|
|
|
|
|
|
if len(matches) > 0: |
|
|
|
|
|
|
|
candidates = set.union(*matches.values()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print('final candidates', candidates) |
|
|
|
|
|
|
|
print('final matches', dict(matches)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
return set() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# for addr, candidates in matches.items(): |
|
|
|
|
|
|
|
# if |
|
|
|
|
|
|
|
# print(addr, candidates) |
|
|
|
|
|
|
|
|
|
|
|
if match_count >= config.fuzzy_min_match_count: |
|
|
|
if match_count >= config.fuzzy_min_match_count: |
|
|
|
if log: |
|
|
|
if log: |
|
|
|
cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {match_count} matching ECUs") |
|
|
|
cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {match_count} matching ECUs") |
|
|
|