diff --git a/selfdrive/car/toyota/values.py b/selfdrive/car/toyota/values.py index 50f9ca6ca3..2d607c88c4 100644 --- a/selfdrive/car/toyota/values.py +++ b/selfdrive/car/toyota/values.py @@ -247,17 +247,10 @@ def get_platform_codes(fw_versions: List[bytes]) -> Set[Tuple[bytes, Optional[by # FW versions returned from UDS queries can return multiple fields/chunks of data (different ECU calibrations, different data?) # and are prefixed with a byte that describes how many chunks of data there are. # But FW returned from KWP requires querying of each sub-data id and does not have a length prefix. - # fw = bytearray(fw) - # fw[0] = random.randint(0, 20) + length_code = 1 length_code_match = FW_LEN_CODE.search(fw) - # has_n_chunks = fw[0] <= 0x5 # max seen is 3 chunks, 16 bytes each - # assert (length_code is not None) == bool(has_n_chunks), fw - # continue - # n_chunks = 1 - # print(f'{has_n_chunks=}') if length_code_match is not None: - # n_chunks = length_code.group()[0] # fw[0] length_code = length_code_match.group()[0] fw = fw[1:] @@ -265,89 +258,98 @@ def get_platform_codes(fw_versions: List[bytes]) -> Set[Tuple[bytes, Optional[by if length_code * 16 != len(fw): continue - chunks = [fw[16 * i:16 * i + 16] for i in range(length_code)] - chunks = [c.strip(b'\x00 ') for c in chunks] - # chunks = [s.strip(b'\x00 ') for s in fw.split(b'\x00') if len(s)] + chunks = [fw[16 * i:16 * i + 16].strip(b'\x00 ') for i in range(length_code)] # Ensure not all empty bytes - # TODO: needed since we use if not len(chunks): continue - # if chunks not in ([b'896634A13000', b''], [b'896634A23000', b'']): - # assert chunks == chunks_new, (chunks, chunks_new) + # a = list(map(len, chunks)) + # print(fw, chunks, a) - a = list(map(len, chunks)) - print(fw, chunks, a) - # assert len(set(a)) == 1, (a, fw) # only first is considered for now since second is commonly shared (TODO: understand that) - first_chunk = chunks[0] - # doesn't have a part encoded in version (OBD query?) - # short_version = sum(b > 0xf for b in first_chunk) == 8 - # short_version = len(first_chunk) == 8 if len(first_chunk) == 8: - # TODO: some short chunks have the part number in subsequent chunks - print('short version') - code_match = SHORT_FW_PATTERN.search(first_chunk) - if code_match is not None: - platform, version = code_match.groups() - print('platform code, version', platform, version) - codes.add((platform, version)) + # TODO: no part number, but some short chunks have it in subsequent chunks + # print('short version', fw) + fw_match = SHORT_FW_PATTERN.search(first_chunk) + if fw_match is not None: + platform, major_version, sub_version = fw_match.groups() + # print('platform code, version', platform, major_version, sub_version) + codes.add((platform, major_version)) + elif len(first_chunk) == 10: - code_match = MEDIUM_FW_PATTERN.search(first_chunk) - if code_match is not None: + fw_match = MEDIUM_FW_PATTERN.search(first_chunk) + if fw_match is not None: # TODO: platform is a loose term here - part, platform, version = code_match.groups() + part, platform, version = fw_match.groups() codes.add((part + b'-' + platform, version)) - print('not done', first_chunk) - # not done - pass + elif len(first_chunk) == 12: - print(LONG_FW_PATTERN) - print('long, searching', first_chunk) - code_match = LONG_FW_PATTERN.search(first_chunk) - if code_match is not None: - print('got long match!') - part, platform, major_version, sub_version = code_match.groups() - print(first_chunk, code_match, code_match.groups()) + # print(LONG_FW_PATTERN) + # print('long, searching', first_chunk) + fw_match = LONG_FW_PATTERN.search(first_chunk) + if fw_match is not None: + # print('got long match!') + part, platform, major_version, sub_version = fw_match.groups() + # print(first_chunk, fw_match, fw_match.groups()) codes.add((part + b'-' + platform, major_version + b'-' + sub_version)) - # else: - # assert False, f'invalid length: {len(first_chunk)}' + return codes - continue +def match_fw_to_car_fuzzy(live_fw_versions) -> Set[str]: + # Non-electric CAN FD platforms often do not have platform code specifiers needed + # to distinguish between hybrid and ICE. All EVs so far are either exclusively + # electric or specify electric in the platform code. + # TODO: whitelist platforms that we've seen hybrid and ICE versions of that have these specifiers + fuzzy_platform_blacklist = set() # set(CANFD_CAR - EV_CAR) + candidates = set() - sections = [s for s in fw.split(b'\x00') if len(s)] - print('sections', sections, 'fw', fw) - assert len(sections) > 0 - assert len(sections[0]) > 0 + for candidate, fws in FW_VERSIONS.items(): + print('\n\ncandidate:', candidate) + # Keep track of ECUs which pass all checks (platform codes, within date range) + valid_found_ecus = set() + valid_expected_ecus = {ecu[1:] for ecu in fws if ecu[0] in PLATFORM_CODE_ECUS} + for ecu, expected_versions in fws.items(): + addr = ecu[1:] + # Only check ECUs expected to have platform codes + if ecu[0] not in PLATFORM_CODE_ECUS: + continue - has_n_chunks = sections[0][0] < 0xf - if has_n_chunks: - assert len(sections) == sections[0][0] - sections[0] = sections[0][1:] + # Expected platform codes & dates + codes = get_platform_codes(expected_versions) + expected_platform_codes = {code for code, _ in codes} + # expected_dates = {date for _, date in codes if date is not None} - print(sections) - continue + # Found platform codes & dates + codes = get_platform_codes(live_fw_versions.get(addr, set())) + found_platform_codes = {code for code, _ in codes} + # found_dates = {date for _, date in codes if date is not None} + print(ecu, expected_platform_codes, found_platform_codes) + # Check platform code + part number matches for any found versions + if not any(found_platform_code in expected_platform_codes for found_platform_code in found_platform_codes): + break + # if ecu[0] in DATE_FW_ECUS: + # # If ECU can have a FW date, require it to exist + # # (this excludes candidates in the database without dates) + # if not len(expected_dates) or not len(found_dates): + # break + # + # # Check any date within range in the database, format is %y%m%d + # if not any(min(expected_dates) <= found_date <= max(expected_dates) for found_date in found_dates): + # break + # + valid_found_ecus.add(addr) - code_match = PLATFORM_CODE_FW_PATTERN.search(fw) - part_match = PART_NUMBER_FW_PATTERN.search(fw) - date_match = DATE_FW_PATTERN.search(fw) - if code_match is not None: - code: bytes = code_match.group() - part = part_match.group() if part_match else None - date = date_match.group() if date_match else None - if part is not None: - # part number starts with generic ECU part type, add what is specific to platform - code += b"-" + part[-5:] + # If all live ECUs pass all checks for candidate, add it as a match + if valid_expected_ecus.issubset(valid_found_ecus): + candidates.add(candidate) - codes.add((code, date)) - return codes + return candidates - fuzzy_platform_blacklist # Some ECUs that use KWP2000 have their FW versions on non-standard data identifiers. @@ -364,9 +366,9 @@ TOYOTA_VERSION_RESPONSE_KWP = b'\x5a\x88\x01' # List of ECUs expected to have platform codes, camera and radar should exist on all cars # TODO: use abs, it has the platform code and part number on many platforms -PLATFORM_CODE_ECUS = [Ecu.abs, Ecu.engine] +PLATFORM_CODE_ECUS = [Ecu.abs, Ecu.engine, Ecu.eps, Ecu.dsu, Ecu.fwdCamera, Ecu.fwdRadar] # So far we've only seen dates in fwdCamera -DATE_FW_ECUS = [Ecu.fwdCamera] +# DATE_FW_ECUS = [Ecu.fwdCamera] FW_QUERY_CONFIG = FwQueryConfig( # TODO: look at data to whitelist new ECUs effectively @@ -431,6 +433,7 @@ FW_QUERY_CONFIG = FwQueryConfig( (Ecu.combinationMeter, 0x7c0, None), (Ecu.hvac, 0x7c4, None), ], + match_fw_to_car_fuzzy=match_fw_to_car_fuzzy, ) # FW_PATTERN = re.compile(b'[0-9]{4}[0-9A-Z][0-9A-Z]')