|
|
|
@ -247,17 +247,10 @@ def get_platform_codes(fw_versions: List[bytes]) -> Set[Tuple[bytes, Optional[by |
|
|
|
|
# FW versions returned from UDS queries can return multiple fields/chunks of data (different ECU calibrations, different data?) |
|
|
|
|
# and are prefixed with a byte that describes how many chunks of data there are. |
|
|
|
|
# But FW returned from KWP requires querying of each sub-data id and does not have a length prefix. |
|
|
|
|
# fw = bytearray(fw) |
|
|
|
|
# fw[0] = random.randint(0, 20) |
|
|
|
|
|
|
|
|
|
length_code = 1 |
|
|
|
|
length_code_match = FW_LEN_CODE.search(fw) |
|
|
|
|
# has_n_chunks = fw[0] <= 0x5 # max seen is 3 chunks, 16 bytes each |
|
|
|
|
# assert (length_code is not None) == bool(has_n_chunks), fw |
|
|
|
|
# continue |
|
|
|
|
# n_chunks = 1 |
|
|
|
|
# print(f'{has_n_chunks=}') |
|
|
|
|
if length_code_match is not None: |
|
|
|
|
# n_chunks = length_code.group()[0] # fw[0] |
|
|
|
|
length_code = length_code_match.group()[0] |
|
|
|
|
fw = fw[1:] |
|
|
|
|
|
|
|
|
@ -265,89 +258,98 @@ def get_platform_codes(fw_versions: List[bytes]) -> Set[Tuple[bytes, Optional[by |
|
|
|
|
if length_code * 16 != len(fw): |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
chunks = [fw[16 * i:16 * i + 16] for i in range(length_code)] |
|
|
|
|
chunks = [c.strip(b'\x00 ') for c in chunks] |
|
|
|
|
# chunks = [s.strip(b'\x00 ') for s in fw.split(b'\x00') if len(s)] |
|
|
|
|
chunks = [fw[16 * i:16 * i + 16].strip(b'\x00 ') for i in range(length_code)] |
|
|
|
|
|
|
|
|
|
# Ensure not all empty bytes |
|
|
|
|
# TODO: needed since we use |
|
|
|
|
if not len(chunks): |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
# if chunks not in ([b'896634A13000', b''], [b'896634A23000', b'']): |
|
|
|
|
# assert chunks == chunks_new, (chunks, chunks_new) |
|
|
|
|
# a = list(map(len, chunks)) |
|
|
|
|
# print(fw, chunks, a) |
|
|
|
|
|
|
|
|
|
a = list(map(len, chunks)) |
|
|
|
|
print(fw, chunks, a) |
|
|
|
|
# assert len(set(a)) == 1, (a, fw) |
|
|
|
|
# only first is considered for now since second is commonly shared (TODO: understand that) |
|
|
|
|
|
|
|
|
|
first_chunk = chunks[0] |
|
|
|
|
# doesn't have a part encoded in version (OBD query?) |
|
|
|
|
# short_version = sum(b > 0xf for b in first_chunk) == 8 |
|
|
|
|
# short_version = len(first_chunk) == 8 |
|
|
|
|
if len(first_chunk) == 8: |
|
|
|
|
# TODO: some short chunks have the part number in subsequent chunks |
|
|
|
|
print('short version') |
|
|
|
|
code_match = SHORT_FW_PATTERN.search(first_chunk) |
|
|
|
|
if code_match is not None: |
|
|
|
|
platform, version = code_match.groups() |
|
|
|
|
print('platform code, version', platform, version) |
|
|
|
|
codes.add((platform, version)) |
|
|
|
|
# TODO: no part number, but some short chunks have it in subsequent chunks |
|
|
|
|
# print('short version', fw) |
|
|
|
|
fw_match = SHORT_FW_PATTERN.search(first_chunk) |
|
|
|
|
if fw_match is not None: |
|
|
|
|
platform, major_version, sub_version = fw_match.groups() |
|
|
|
|
# print('platform code, version', platform, major_version, sub_version) |
|
|
|
|
codes.add((platform, major_version)) |
|
|
|
|
|
|
|
|
|
elif len(first_chunk) == 10: |
|
|
|
|
code_match = MEDIUM_FW_PATTERN.search(first_chunk) |
|
|
|
|
if code_match is not None: |
|
|
|
|
fw_match = MEDIUM_FW_PATTERN.search(first_chunk) |
|
|
|
|
if fw_match is not None: |
|
|
|
|
# TODO: platform is a loose term here |
|
|
|
|
part, platform, version = code_match.groups() |
|
|
|
|
part, platform, version = fw_match.groups() |
|
|
|
|
codes.add((part + b'-' + platform, version)) |
|
|
|
|
print('not done', first_chunk) |
|
|
|
|
# not done |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
elif len(first_chunk) == 12: |
|
|
|
|
print(LONG_FW_PATTERN) |
|
|
|
|
print('long, searching', first_chunk) |
|
|
|
|
code_match = LONG_FW_PATTERN.search(first_chunk) |
|
|
|
|
if code_match is not None: |
|
|
|
|
print('got long match!') |
|
|
|
|
part, platform, major_version, sub_version = code_match.groups() |
|
|
|
|
print(first_chunk, code_match, code_match.groups()) |
|
|
|
|
# print(LONG_FW_PATTERN) |
|
|
|
|
# print('long, searching', first_chunk) |
|
|
|
|
fw_match = LONG_FW_PATTERN.search(first_chunk) |
|
|
|
|
if fw_match is not None: |
|
|
|
|
# print('got long match!') |
|
|
|
|
part, platform, major_version, sub_version = fw_match.groups() |
|
|
|
|
# print(first_chunk, fw_match, fw_match.groups()) |
|
|
|
|
codes.add((part + b'-' + platform, major_version + b'-' + sub_version)) |
|
|
|
|
|
|
|
|
|
# else: |
|
|
|
|
# assert False, f'invalid length: {len(first_chunk)}' |
|
|
|
|
return codes |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
continue |
|
|
|
|
def match_fw_to_car_fuzzy(live_fw_versions) -> Set[str]: |
|
|
|
|
# Non-electric CAN FD platforms often do not have platform code specifiers needed |
|
|
|
|
# to distinguish between hybrid and ICE. All EVs so far are either exclusively |
|
|
|
|
# electric or specify electric in the platform code. |
|
|
|
|
# TODO: whitelist platforms that we've seen hybrid and ICE versions of that have these specifiers |
|
|
|
|
fuzzy_platform_blacklist = set() # set(CANFD_CAR - EV_CAR) |
|
|
|
|
candidates = set() |
|
|
|
|
|
|
|
|
|
sections = [s for s in fw.split(b'\x00') if len(s)] |
|
|
|
|
print('sections', sections, 'fw', fw) |
|
|
|
|
assert len(sections) > 0 |
|
|
|
|
assert len(sections[0]) > 0 |
|
|
|
|
for candidate, fws in FW_VERSIONS.items(): |
|
|
|
|
print('\n\ncandidate:', candidate) |
|
|
|
|
# Keep track of ECUs which pass all checks (platform codes, within date range) |
|
|
|
|
valid_found_ecus = set() |
|
|
|
|
valid_expected_ecus = {ecu[1:] for ecu in fws if ecu[0] in PLATFORM_CODE_ECUS} |
|
|
|
|
for ecu, expected_versions in fws.items(): |
|
|
|
|
addr = ecu[1:] |
|
|
|
|
# Only check ECUs expected to have platform codes |
|
|
|
|
if ecu[0] not in PLATFORM_CODE_ECUS: |
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
has_n_chunks = sections[0][0] < 0xf |
|
|
|
|
if has_n_chunks: |
|
|
|
|
assert len(sections) == sections[0][0] |
|
|
|
|
sections[0] = sections[0][1:] |
|
|
|
|
# Expected platform codes & dates |
|
|
|
|
codes = get_platform_codes(expected_versions) |
|
|
|
|
expected_platform_codes = {code for code, _ in codes} |
|
|
|
|
# expected_dates = {date for _, date in codes if date is not None} |
|
|
|
|
|
|
|
|
|
print(sections) |
|
|
|
|
continue |
|
|
|
|
# Found platform codes & dates |
|
|
|
|
codes = get_platform_codes(live_fw_versions.get(addr, set())) |
|
|
|
|
found_platform_codes = {code for code, _ in codes} |
|
|
|
|
# found_dates = {date for _, date in codes if date is not None} |
|
|
|
|
|
|
|
|
|
print(ecu, expected_platform_codes, found_platform_codes) |
|
|
|
|
|
|
|
|
|
# Check platform code + part number matches for any found versions |
|
|
|
|
if not any(found_platform_code in expected_platform_codes for found_platform_code in found_platform_codes): |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
# if ecu[0] in DATE_FW_ECUS: |
|
|
|
|
# # If ECU can have a FW date, require it to exist |
|
|
|
|
# # (this excludes candidates in the database without dates) |
|
|
|
|
# if not len(expected_dates) or not len(found_dates): |
|
|
|
|
# break |
|
|
|
|
# |
|
|
|
|
# # Check any date within range in the database, format is %y%m%d |
|
|
|
|
# if not any(min(expected_dates) <= found_date <= max(expected_dates) for found_date in found_dates): |
|
|
|
|
# break |
|
|
|
|
# |
|
|
|
|
valid_found_ecus.add(addr) |
|
|
|
|
|
|
|
|
|
code_match = PLATFORM_CODE_FW_PATTERN.search(fw) |
|
|
|
|
part_match = PART_NUMBER_FW_PATTERN.search(fw) |
|
|
|
|
date_match = DATE_FW_PATTERN.search(fw) |
|
|
|
|
if code_match is not None: |
|
|
|
|
code: bytes = code_match.group() |
|
|
|
|
part = part_match.group() if part_match else None |
|
|
|
|
date = date_match.group() if date_match else None |
|
|
|
|
if part is not None: |
|
|
|
|
# part number starts with generic ECU part type, add what is specific to platform |
|
|
|
|
code += b"-" + part[-5:] |
|
|
|
|
# If all live ECUs pass all checks for candidate, add it as a match |
|
|
|
|
if valid_expected_ecus.issubset(valid_found_ecus): |
|
|
|
|
candidates.add(candidate) |
|
|
|
|
|
|
|
|
|
codes.add((code, date)) |
|
|
|
|
return codes |
|
|
|
|
return candidates - fuzzy_platform_blacklist |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Some ECUs that use KWP2000 have their FW versions on non-standard data identifiers. |
|
|
|
@ -364,9 +366,9 @@ TOYOTA_VERSION_RESPONSE_KWP = b'\x5a\x88\x01' |
|
|
|
|
|
|
|
|
|
# List of ECUs expected to have platform codes, camera and radar should exist on all cars |
|
|
|
|
# TODO: use abs, it has the platform code and part number on many platforms |
|
|
|
|
PLATFORM_CODE_ECUS = [Ecu.abs, Ecu.engine] |
|
|
|
|
PLATFORM_CODE_ECUS = [Ecu.abs, Ecu.engine, Ecu.eps, Ecu.dsu, Ecu.fwdCamera, Ecu.fwdRadar] |
|
|
|
|
# So far we've only seen dates in fwdCamera |
|
|
|
|
DATE_FW_ECUS = [Ecu.fwdCamera] |
|
|
|
|
# DATE_FW_ECUS = [Ecu.fwdCamera] |
|
|
|
|
|
|
|
|
|
FW_QUERY_CONFIG = FwQueryConfig( |
|
|
|
|
# TODO: look at data to whitelist new ECUs effectively |
|
|
|
@ -431,6 +433,7 @@ FW_QUERY_CONFIG = FwQueryConfig( |
|
|
|
|
(Ecu.combinationMeter, 0x7c0, None), |
|
|
|
|
(Ecu.hvac, 0x7c4, None), |
|
|
|
|
], |
|
|
|
|
match_fw_to_car_fuzzy=match_fw_to_car_fuzzy, |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# FW_PATTERN = re.compile(b'[0-9]{4}[0-9A-Z][0-9A-Z]') |
|
|
|
|