|  |  | @ -54,13 +54,11 @@ def get_brand_addrs() -> Dict[str, Set[Tuple[int, Optional[int]]]]: | 
			
		
	
		
		
			
				
					
					|  |  |  |   return dict(brand_addrs) |  |  |  |   return dict(brand_addrs) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | def match_fw_to_car_fuzzy(fw_versions_dict, config=None, log=True, exclude=None): |  |  |  | def match_fw_to_car_fuzzy(fw_versions_dict, config, log=True, exclude=None): | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |   """Do a fuzzy FW match. This function will return a match, and the number of firmware version |  |  |  |   """Do a fuzzy FW match. This function will return a match, and the number of firmware version | 
			
		
	
		
		
			
				
					
					|  |  |  |   that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars |  |  |  |   that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars | 
			
		
	
		
		
			
				
					
					|  |  |  |   the match is rejected.""" |  |  |  |   the match is rejected.""" | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   use_config = config is not None and config.fuzzy_get_platform_codes is not None |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   # Build lookup table from (addr, sub_addr, fw) to list of candidate cars |  |  |  |   # Build lookup table from (addr, sub_addr, fw) to list of candidate cars | 
			
		
	
		
		
			
				
					
					|  |  |  |   all_fw_versions = defaultdict(list) |  |  |  |   all_fw_versions = defaultdict(list) | 
			
		
	
		
		
			
				
					
					|  |  |  |   # Platform codes are brand-specific unique identifiers for each platform, less specific than a FW version |  |  |  |   # Platform codes are brand-specific unique identifiers for each platform, less specific than a FW version | 
			
		
	
	
		
		
			
				
					|  |  | @ -79,27 +77,28 @@ def match_fw_to_car_fuzzy(fw_versions_dict, config=None, log=True, exclude=None) | 
			
		
	
		
		
			
				
					
					|  |  |  |           all_fw_versions[(addr[1], addr[2], f)].append(candidate) |  |  |  |           all_fw_versions[(addr[1], addr[2], f)].append(candidate) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |       # Add platform codes to lookup dict if config specifies a function |  |  |  |       # Add platform codes to lookup dict if config specifies a function | 
			
		
	
		
		
			
				
					
					|  |  |  |       if use_config and addr[0] in config.fuzzy_ecus: |  |  |  |       if config.fuzzy_get_platform_codes is not None and addr[0] in config.fuzzy_ecus: | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |         for platform_code in config.fuzzy_get_platform_codes(fws): |  |  |  |         for platform_code in config.fuzzy_get_platform_codes(fws): | 
			
		
	
		
		
			
				
					
					|  |  |  |           all_platform_codes[(addr[1], addr[2], platform_code)].append(candidate) |  |  |  |           all_platform_codes[(addr[1], addr[2], platform_code)].append(candidate) | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   matched_ecus = set() |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   candidate = None |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |   # TODO: we really want to support both ways of fuzzy FP, but we don't want to partially match with both. |  |  |  |   # TODO: we really want to support both ways of fuzzy FP, but we don't want to partially match with both. | 
			
		
	
		
		
			
				
					
					|  |  |  |   #  meaning we get one radar platform code match, no camera platform code match, and a random exact FW ECU match. |  |  |  |   #  meaning we get one radar platform code match, no camera platform code match, and a random exact FW ECU match. | 
			
		
	
		
		
			
				
					
					|  |  |  |   #  when matching with platform codes, it should be all ECUs specified in the config |  |  |  |   #  when matching with platform codes, it should be all ECUs specified in the config | 
			
		
	
		
		
			
				
					
					|  |  |  |   #  and if we don't match all ECUs with platform codes, we should fall back on normal full FW fuzzy matching, not mixing both |  |  |  |   #  and if we don't match all ECUs with platform codes, we should fall back on normal full FW fuzzy matching, not mixing both | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   def fuzzy_match(use_config): | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     matched_ecus = set() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     candidate = None | 
			
		
	
		
		
			
				
					
					|  |  |  |     for addr, versions in fw_versions_dict.items(): |  |  |  |     for addr, versions in fw_versions_dict.items(): | 
			
		
	
		
		
			
				
					
					|  |  |  |       ecu_key = (addr[0], addr[1]) |  |  |  |       ecu_key = (addr[0], addr[1]) | 
			
		
	
		
		
			
				
					
					|  |  |  |       for version in versions: |  |  |  |       for version in versions: | 
			
		
	
		
		
			
				
					
					|  |  |  |       # Fall back to matching with full FW versions if brand does not implement platform codes |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |         candidates = set() |  |  |  |         candidates = set() | 
			
		
	
		
		
			
				
					
					|  |  |  |       if not use_config: |  |  |  |         if use_config: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |         candidates = all_fw_versions[(*ecu_key, version)] |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |       else: |  |  |  |  | 
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |           # Returns one or none, all cars that have this platform code |  |  |  |           # Returns one or none, all cars that have this platform code | 
			
		
	
		
		
			
				
					
					|  |  |  |           for platform_code in config.fuzzy_get_platform_codes([version]): |  |  |  |           for platform_code in config.fuzzy_get_platform_codes([version]): | 
			
		
	
		
		
			
				
					
					|  |  |  |             candidates = all_platform_codes[(*ecu_key, platform_code)] |  |  |  |             candidates = all_platform_codes[(*ecu_key, platform_code)] | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |         else: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |           # All cars that have this FW response on the specified address | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |           candidates = all_fw_versions[(*ecu_key, version)] | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |         if len(candidates) == 1: |  |  |  |         if len(candidates) == 1: | 
			
		
	
		
		
			
				
					
					|  |  |  |           matched_ecus.add(ecu_key) |  |  |  |           matched_ecus.add(ecu_key) | 
			
		
	
	
		
		
			
				
					|  |  | @ -107,16 +106,24 @@ def match_fw_to_car_fuzzy(fw_versions_dict, config=None, log=True, exclude=None) | 
			
		
	
		
		
			
				
					
					|  |  |  |             candidate = candidates[0] |  |  |  |             candidate = candidates[0] | 
			
		
	
		
		
			
				
					
					|  |  |  |           # We uniquely matched two different cars. No fuzzy match possible |  |  |  |           # We uniquely matched two different cars. No fuzzy match possible | 
			
		
	
		
		
			
				
					
					|  |  |  |           elif candidate != candidates[0]: |  |  |  |           elif candidate != candidates[0]: | 
			
		
	
		
		
			
				
					
					|  |  |  |           return set() |  |  |  |             return None, matched_ecus | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     return candidate, matched_ecus | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   # Try to fuzzy fingerprint both with and without platform codes independently | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |   for use_config in {config.fuzzy_get_platform_codes is not None}: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     candidate, matched_ecus = fuzzy_match(use_config) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |     if candidate is None: | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |       continue | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     # Note that it is possible to match to a candidate without all its ECUs being present |  |  |  |     # Note that it is possible to match to a candidate without all its ECUs being present | 
			
		
	
		
		
			
				
					
					|  |  |  |     # if there are enough matches. FIXME: parameterize this or require all ECUs to exist like exact matching |  |  |  |     # if there are enough matches. FIXME: parameterize this or require all ECUs to exist like exact matching | 
			
		
	
		
		
			
				
					
					|  |  |  |   fuzzy_min_match_count = config.fuzzy_min_match_count if use_config else 2 |  |  |  |     if len(matched_ecus) >= config.fuzzy_min_match_count: | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |   if len(matched_ecus) >= fuzzy_min_match_count: |  |  |  |  | 
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |       if log: |  |  |  |       if log: | 
			
		
	
		
		
			
				
					
					|  |  |  |         cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {len(matched_ecus)} matching ECUs") |  |  |  |         cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {len(matched_ecus)} matching ECUs") | 
			
		
	
		
		
			
				
					
					|  |  |  |       return {candidate} |  |  |  |       return {candidate} | 
			
		
	
		
		
			
				
					
					|  |  |  |   else: |  |  |  | 
 | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |   return set() |  |  |  |   return set() | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
	
		
		
			
				
					|  |  | @ -162,16 +169,14 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True): | 
			
		
	
		
		
			
				
					
					|  |  |  |     exact_matches = [(True, match_fw_to_car_exact)] |  |  |  |     exact_matches = [(True, match_fw_to_car_exact)] | 
			
		
	
		
		
			
				
					
					|  |  |  |   if allow_fuzzy: |  |  |  |   if allow_fuzzy: | 
			
		
	
		
		
			
				
					
					|  |  |  |     # Try first with standard fuzzy fingerprinting, then with config |  |  |  |     # Try first with standard fuzzy fingerprinting, then with config | 
			
		
	
		
		
			
				
					
					|  |  |  |     exact_matches.extend([(False, False, match_fw_to_car_fuzzy), |  |  |  |     exact_matches.append((False, match_fw_to_car_fuzzy)) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |                           (False, True, match_fw_to_car_fuzzy)]) |  |  |  |  | 
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |   for exact_match, use_config, match_func in exact_matches: |  |  |  |   for exact_match, match_func in exact_matches: | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |     # For each brand, attempt to fingerprint using all FW returned from its queries |  |  |  |     # For each brand, attempt to fingerprint using all FW returned from its queries | 
			
		
	
		
		
			
				
					
					|  |  |  |     matches = set() |  |  |  |     matches = set() | 
			
		
	
		
		
			
				
					
					|  |  |  |     for brand in VERSIONS.keys(): |  |  |  |     for brand in VERSIONS.keys(): | 
			
		
	
		
		
			
				
					
					|  |  |  |       fw_versions_dict = build_fw_dict(fw_versions, filter_brand=brand) |  |  |  |       fw_versions_dict = build_fw_dict(fw_versions, filter_brand=brand) | 
			
		
	
		
		
			
				
					
					|  |  |  |       config = FW_QUERY_CONFIGS[brand] if use_config else None |  |  |  |       matches |= match_func(fw_versions_dict, FW_QUERY_CONFIGS[brand], log=log) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  |       matches |= match_func(fw_versions_dict, config, log=log) |  |  |  |  | 
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 
 |  |  |  | 
 | 
			
		
	
		
		
			
				
					
					|  |  |  |     if len(matches): |  |  |  |     if len(matches): | 
			
		
	
		
		
			
				
					
					|  |  |  |       return exact_match, matches |  |  |  |       return exact_match, matches | 
			
		
	
	
		
		
			
				
					|  |  | 
 |