some conceptual clean up

pull/28641/head
Shane Smiskol 2 years ago
parent 050062d9fb
commit cb84caab73
  1. 14
      selfdrive/car/toyota/tests/test_toyota.py
  2. 73
      selfdrive/car/toyota/values.py

@ -1,4 +1,5 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from hypothesis import given, settings, strategies as st
import re import re
from cereal import car from cereal import car
import unittest import unittest
@ -10,7 +11,7 @@ from selfdrive.car.fw_versions import build_fw_dict
# EV_CAR, FW_QUERY_CONFIG, FW_VERSIONS, LEGACY_SAFETY_MODE_CAR, \ # EV_CAR, FW_QUERY_CONFIG, FW_VERSIONS, LEGACY_SAFETY_MODE_CAR, \
# PLATFORM_CODE_ECUS, get_platform_codes # PLATFORM_CODE_ECUS, get_platform_codes
from selfdrive.car.toyota.values import TSS2_CAR, ANGLE_CONTROL_CAR, FW_VERSIONS, FW_QUERY_CONFIG, EV_HYBRID_CAR, \ from selfdrive.car.toyota.values import TSS2_CAR, ANGLE_CONTROL_CAR, FW_VERSIONS, FW_QUERY_CONFIG, EV_HYBRID_CAR, \
FW_PATTERN, FW_LEN_CODE, FW_PATTERN_V3, get_platform_codes LONG_FW_PATTERN, FW_LEN_CODE, get_platform_codes # FW_PATTERN_V3
from openpilot.selfdrive.car.toyota.values import CAR, DBC, TSS2_CAR, ANGLE_CONTROL_CAR, RADAR_ACC_CAR, FW_VERSIONS from openpilot.selfdrive.car.toyota.values import CAR, DBC, TSS2_CAR, ANGLE_CONTROL_CAR, RADAR_ACC_CAR, FW_VERSIONS
Ecu = car.CarParams.Ecu Ecu = car.CarParams.Ecu
@ -51,6 +52,13 @@ class TestToyotaInterfaces(unittest.TestCase):
class TestToyotaFingerprint(unittest.TestCase): class TestToyotaFingerprint(unittest.TestCase):
# @settings(max_examples=100)
# @given(data=st.data())
# def test_platform_codes_fuzzy_fw(self, data):
# fw_strategy = st.lists(st.binary())
# fws = data.draw(fw_strategy)
# get_platform_codes(fws)
def test_fw_pattern(self): def test_fw_pattern(self):
for car_model, ecus in FW_VERSIONS.items(): for car_model, ecus in FW_VERSIONS.items():
# print() # print()
@ -60,7 +68,9 @@ class TestToyotaFingerprint(unittest.TestCase):
for fw in fws: for fw in fws:
print('\ninput', fw) print('\ninput', fw)
get_platform_codes([fw]) ret = get_platform_codes([fw])
# self.assertTrue(len(ret))
print('ret', ret)
continue continue
match = FW_PATTERN.search(fw) match = FW_PATTERN.search(fw)
length = FW_LEN_CODE.search(fw) length = FW_LEN_CODE.search(fw)

@ -236,10 +236,12 @@ STATIC_DSU_MSGS = [
] ]
SHORT_FW_PATTERN = re.compile(b'(?P<platform>[A-Z0-9]{4})(?P<version>[A-Z0-9]{4})') SHORT_FW_PATTERN = re.compile(b'(?P<platform>[A-Z0-9]{4})(?P<version>[A-Z0-9]{4})')
MED_PATTERN = re.compile(b'TODO') MEDIUM_FW_PATTERN = re.compile(b'(?P<part>[A-Z0-9]{5})(?P<platform>[A-Z0-9]{2})(?P<version>[A-Z0-9]{3})')
FW_PATTERN = re.compile(b'(?P<part>[0-9A-Z]{4})[0-9A-Z](?P<platform>[A-Z0-9]{2})(?P<major_version>[A-Z0-9]{2})(?P<sub_version>[A-Z0-9]{3})') LONG_FW_PATTERN = re.compile(b'(?P<part>[A-Z0-9]{5})(?P<platform>[A-Z0-9]{2})(?P<major_version>[A-Z0-9]{2})(?P<sub_version>[A-Z0-9]{3})')
FW_LEN_CODE = re.compile(b'^[\x01-\x05]') # 5 chunks max. max seen is 3 chunks, 16 bytes each
import random
def get_platform_codes(fw_versions: List[bytes]) -> Set[Tuple[bytes, Optional[bytes]]]: def get_platform_codes(fw_versions: List[bytes]) -> Set[Tuple[bytes, Optional[bytes]]]:
# Returns unique, platform-specific identification codes for a set of versions # Returns unique, platform-specific identification codes for a set of versions
codes = set() # (code-Optional[part], date) codes = set() # (code-Optional[part], date)
@ -247,44 +249,74 @@ def get_platform_codes(fw_versions: List[bytes]) -> Set[Tuple[bytes, Optional[by
# FW versions returned from UDS queries can return multiple fields/chunks of data (different ECU calibrations, different data?) # FW versions returned from UDS queries can return multiple fields/chunks of data (different ECU calibrations, different data?)
# and are prefixed with a byte that describes how many chunks of data there are. # and are prefixed with a byte that describes how many chunks of data there are.
# But FW returned from KWP requires querying of each sub-data id and does not have a length prefix. # But FW returned from KWP requires querying of each sub-data id and does not have a length prefix.
has_n_chunks = fw[0] < 0xf # max seen is 3 chunks, 16 bytes each # fw = bytearray(fw)
n_chunks = 1 # fw[0] = random.randint(0, 20)
print(f'{has_n_chunks=}') length_code = 1
if has_n_chunks: length_code_match = FW_LEN_CODE.search(fw)
n_chunks = fw[0] # has_n_chunks = fw[0] <= 0x5 # max seen is 3 chunks, 16 bytes each
# assert (length_code is not None) == bool(has_n_chunks), fw
# continue
# n_chunks = 1
# print(f'{has_n_chunks=}')
if length_code_match is not None:
# n_chunks = length_code.group()[0] # fw[0]
length_code = length_code_match.group()[0]
fw = fw[1:] fw = fw[1:]
assert n_chunks * 16 == len(fw)
chunks = [fw[16 * i:16 * i + 16] for i in range(n_chunks)] # fw length should be multiple of 16 bytes (per chunk, even if no length code), skip parsing if unexpected length
# chunks = [s for s in fw.split(b'\x00') if len(s)] if length_code * 16 != len(fw):
continue
chunks = [fw[16 * i:16 * i + 16] for i in range(length_code)]
chunks = [c.strip(b'\x00 ') for c in chunks] chunks = [c.strip(b'\x00 ') for c in chunks]
# chunks = [s.strip(b'\x00 ') for s in fw.split(b'\x00') if len(s)]
# Ensure not all empty bytes
# TODO: needed since we use
if not len(chunks):
continue
# if chunks not in ([b'896634A13000', b''], [b'896634A23000', b'']):
# assert chunks == chunks_new, (chunks, chunks_new)
a = list(map(len, chunks)) a = list(map(len, chunks))
print(fw, chunks, a) print(fw, chunks, a)
# assert len(set(a)) == 1 # assert len(set(a)) == 1, (a, fw)
# only first is considered for now since second is commonly shared (TODO: understand that) # only first is considered for now since second is commonly shared (TODO: understand that)
first_chunk = chunks[0] first_chunk = chunks[0]
# doesn't have a part encoded in version (OBD query?) # doesn't have a part encoded in version (OBD query?)
# short_version = sum(b > 0xf for b in first_chunk) == 8 # short_version = sum(b > 0xf for b in first_chunk) == 8
short_version = len(first_chunk) == 8 # short_version = len(first_chunk) == 8
if short_version: if len(first_chunk) == 8:
# TODO: some short chunks have the part number in subsequent chunks
print('short version') print('short version')
code_match = SHORT_FW_PATTERN.search(first_chunk) code_match = SHORT_FW_PATTERN.search(first_chunk)
if code_match is not None: if code_match is not None:
code, version = code_match.groups() platform, version = code_match.groups()
print('platform code, version', code, version) print('platform code, version', platform, version)
codes.add((platform, version))
elif len(first_chunk) == 10: elif len(first_chunk) == 10:
code_match = MEDIUM_FW_PATTERN.search(first_chunk)
if code_match is not None:
# TODO: platform is a loose term here
part, platform, version = code_match.groups()
codes.add((part + b'-' + platform, version))
print('not done', first_chunk)
# not done # not done
pass pass
elif len(first_chunk) == 12: elif len(first_chunk) == 12:
print(FW_PATTERN) print(LONG_FW_PATTERN)
code_match = FW_PATTERN.search(first_chunk) print('long, searching', first_chunk)
code_match = LONG_FW_PATTERN.search(first_chunk)
if code_match is not None: if code_match is not None:
print('got long match!') print('got long match!')
part, platform, major_version, sub_version = code_match.groups()
print(first_chunk, code_match, code_match.groups()) print(first_chunk, code_match, code_match.groups())
codes.add((part + b'-' + platform, major_version + b'-' + sub_version))
else: # else:
assert False, f'invalid length: {len(first_chunk)}' # assert False, f'invalid length: {len(first_chunk)}'
continue continue
@ -405,8 +437,7 @@ FW_QUERY_CONFIG = FwQueryConfig(
# FW_PATTERN = re.compile(b'[0-9]{4}[0-9A-Z][0-9A-Z]') # FW_PATTERN = re.compile(b'[0-9]{4}[0-9A-Z][0-9A-Z]')
# FW_PATTERN2 = re.compile(br'(?<=\\x[0-9]{2})[0-9A-Z]{5}|^[0-9A-Z]{5}') # FW_PATTERN2 = re.compile(br'(?<=\\x[0-9]{2})[0-9A-Z]{5}|^[0-9A-Z]{5}')
FW_LEN_CODE = re.compile(b'^[\x00-\x0F]') # FW_PATTERN_V3 = re.compile(b'(?P<length>^[\x00-\x0F])?(?P<part>[0-9A-Z]{4})')
FW_PATTERN_V3 = re.compile(b'(?P<length>^[\x00-\x0F])?(?P<part>[0-9A-Z]{4})')
FW_VERSIONS = { FW_VERSIONS = {
CAR.AVALON: { CAR.AVALON: {

Loading…
Cancel
Save