HKG: use platform codes to fuzzy fingerprint (#28531)

* get gas/ev/hev from FW (not all correct, poc)

* add test for essential ecus for fuzzy fingerprinting

* kinda works

* stash

* clean up

* add code

* simpler

* use the function

* test it with our cars

* no re

no re

no re

* debugging

* handle empty dict

* simpl

* this is promising

start on making existing fingerprinting functions use the config, instead of entirely replacing them

* needs to allow 1 match

* lay out how this should look

* changes

* executable

* some work

* use config

* fuzzy ecus

* config test

* comment and some clean up

* test platform codes

* use regex, simpler and fixes bug

* in func

* rm bad func

* typing for new func and remove old from dc

* todo done

* tested!

* remove fake platform codes

* thought we needed this, but actually...

* not needed

* not applicable any more

* use config for essential ecus

* first draft of test to make adding/removing fuzzy FP platform intentional

* compile

* clean up test

* even cleaner

* fix default ecus type

* temp fix

* this is mostly in tests now

* test every fuzzy ecu fw returns one platform code

* experiment with dates

* Revert "experiment with dates"

This reverts commit 3251b9cc5c.

* clean that up

* comment

* test

* work on all cars

* fix fuzz_fw_fingerprint

* comment

* get first by search

* bit more clean up

* and more

* use compiled pattern for nicer syntax

* default

* flip dat around, much cleaner

* clean up hyundai test a bit

* flip order

same here

* rename test and flip subTest

* fix pylint

* revert fw changes

revert fw changes

* line

* add original functions to test

* needs to be a list

* cmt

* draft (need to count one ecu as a match)

* tiny clean up

* todo: date range

* only in notebook

* remove comment (still can be either list or set)

* same, only notebook

* more consistent signature

* copilot inspired

* copilot no good

* test for date parsing

* better name

* good, now we don't have to worry about the dates mismatching in another test/logic

* comment up+

* some stuff

* clean up

fix test

fix test

* test

* comment

* use utils

* clean up (utils are cleaner and less buggy)

* clean up (utils are cleaner and less buggy)

* fixup test

* use a dash (prettier) and remove some platforms that can fingerprint now!

* compile global pattern

* same as what we do in values

* remove comments

* fuzzy_get_platform_codes is one or none here

* more clean up

* sort imports

* woah woah woah

* add comment

* fix reassigning different types

* add types

* adapt fuzzy test recently added (nice it caught this!)

* update lock

* options

comments

* stash

* comments and fixes

* better comments

* better

* test: run on exact fuzzy matching logic, same results!

* use match_fw_to_car

* test all fw

* ex

* unused random

* this is a possibility

* this is more clear

* fix

* revert

* revert to needing both ECUs to match to reduce false positives, test

* fix excluded platform test :( but it's ok

* add comment

* we actually want to only test fuzzy ecus to make it explicit

* fix mypy

* comment for tomorrow

* just add matches with fuzzy FP

* add comment

* this was the cleanest I could think of, but still working on it. not very easy to understand

* think this is better, but also worse...

* comment: reframing how this works

* revert back to what we did before

* was swapped

* else set

* remove old comment

* fixes from merge

* remove fuzzy_min_match_count from this pr

* fix static analysis

* also unused

* different method first draft

* copy existing fuzzy func

* check all possible ecus exist, only platform codes, slightly refactor main loop

* fix

* Revert recent

Revert "fix"

This reverts commit 5cdb7bda83.

Revert "check all possible ecus exist, only platform codes, slightly refactor main loop"

This reverts commit d3e918fa20.

Revert "copy existing fuzzy func"

This reverts commit 34c8c05450.

Revert "different method first draft"

This reverts commit b91139055d.

* new func

* fixup test

* remove changes from v1 from fw_versions.py

* clean up a bit

* return part as part of code

* fix test

* add original fuzzy function

* add an ecu match if the date is within range (or date doesn't exist)

* add format for what we're going to do

* not working stash

* the exact matching function does more of what we want with less code and less custom logic

* we don't care about found versions, only codes and dates

* actually we do have an exception

* this works pretty nicely now

* up here

* this is better

* some minor clean up

* old function=now junk

* fix platform code test

* remove old platform code function

* now rename _new to

* use FW_QUERY_CONFIG

* clean up imports

* rename that too

* one line

* correct typing

correct typing

* draft tests

* so that works

* fixup excluded platform test now too

* this is tested by excluded platform test

* test parts and dates

* remove old comment

* old import

* take platform code stuff out of FwQueryConfig

* fix test

* revert debug script

* flip order

* make this a set by default

* revert this part

* correct typing

* clean up comments

* clean that test up too/pylint

* combine these three tests ina clean way

* not right

* more general

* be consistent with quotes

* comment

* comment

* comment in fw_versions

* flip order

* this is more readable

* could test all this, but it's tested in test_hyundai and doesn't do a lot here

* only assert brands which use this

* invalidate all CAN FD ICE and hybrid

* tuple

* can get away without filtering

* add comment reasons

* fix

* some review suggestions

* this works (first draft)

* this is better

* script to print platform codes and dates

* sanity check for dates are in correct ecus and platforms

* mypy

* better variable name and comment

* rename

* same

* slightly better name

* subset

* exclude platforms and live car without dates

* consistent

* self explan

* better name

* test to make sure the functions agree

* clean that up

* comment

* we get other responses from queries not in DB, only check any

* not used or typed
pull/28556/head
Shane Smiskol 2 years ago committed by GitHub
parent 2166adda5f
commit f788edb6a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      selfdrive/car/fw_query_definitions.py
  2. 5
      selfdrive/car/fw_versions.py
  3. 11
      selfdrive/car/hyundai/tests/print_platform_codes.py
  4. 68
      selfdrive/car/hyundai/tests/test_hyundai.py
  5. 90
      selfdrive/car/hyundai/values.py
  6. 24
      selfdrive/car/tests/test_fw_fingerprint.py

@ -3,7 +3,7 @@ import capnp
import copy import copy
from dataclasses import dataclass, field from dataclasses import dataclass, field
import struct import struct
from typing import Dict, List, Optional, Tuple from typing import Callable, Dict, List, Optional, Set, Tuple
import panda.python.uds as uds import panda.python.uds as uds
@ -74,6 +74,9 @@ class FwQueryConfig:
non_essential_ecus: Dict[capnp.lib.capnp._EnumModule, List[str]] = field(default_factory=dict) non_essential_ecus: Dict[capnp.lib.capnp._EnumModule, List[str]] = field(default_factory=dict)
# Ecus added for data collection, not to be fingerprinted on # Ecus added for data collection, not to be fingerprinted on
extra_ecus: List[Tuple[capnp.lib.capnp._EnumModule, int, Optional[int]]] = field(default_factory=list) extra_ecus: List[Tuple[capnp.lib.capnp._EnumModule, int, Optional[int]]] = field(default_factory=list)
# Function a brand can implement to provide better fuzzy matching. Takes in FW versions,
# returns set of candidates. Only will match if one candidate is returned
match_fw_to_car_fuzzy: Optional[Callable[[Dict[Tuple[int, Optional[int]], Set[bytes]]], Set[str]]] = None
def __post_init__(self): def __post_init__(self):
for i in range(len(self.requests)): for i in range(len(self.requests)):

@ -151,6 +151,11 @@ def match_fw_to_car(fw_versions, allow_exact=True, allow_fuzzy=True, log=True):
fw_versions_dict = build_fw_dict(fw_versions, filter_brand=brand) fw_versions_dict = build_fw_dict(fw_versions, filter_brand=brand)
matches |= match_func(fw_versions_dict, log=log) matches |= match_func(fw_versions_dict, log=log)
# If specified and no matches so far, fall back to brand's fuzzy fingerprinting function
config = FW_QUERY_CONFIGS[brand]
if not exact_match and not len(matches) and config.match_fw_to_car_fuzzy is not None:
matches |= config.match_fw_to_car_fuzzy(fw_versions_dict)
if len(matches): if len(matches):
return exact_match, matches return exact_match, matches

@ -13,14 +13,9 @@ if __name__ == "__main__":
if ecu[0] not in PLATFORM_CODE_ECUS: if ecu[0] not in PLATFORM_CODE_ECUS:
continue continue
codes = set() platform_codes = get_platform_codes(ecus[ecu])
dates = set() codes = {code for code, _ in platform_codes}
for fw in ecus[ecu]: dates = {date for _, date in platform_codes if date is not None}
code = list(get_platform_codes([fw]))[0]
codes.add(code.split(b"-")[0])
if b"-" in code:
dates.add(code.split(b"-")[1])
print(f' (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])}, {ecu[2]}):') print(f' (Ecu.{ECU_NAME[ecu[0]]}, {hex(ecu[1])}, {ecu[2]}):')
print(f' Codes: {codes}') print(f' Codes: {codes}')
print(f' Dates: {dates}') print(f' Dates: {dates}')

@ -2,9 +2,11 @@
import unittest import unittest
from cereal import car from cereal import car
from selfdrive.car.fw_versions import build_fw_dict
from selfdrive.car.hyundai.values import CAMERA_SCC_CAR, CANFD_CAR, CAN_GEARS, CAR, CHECKSUM, DATE_FW_ECUS, \ from selfdrive.car.hyundai.values import CAMERA_SCC_CAR, CANFD_CAR, CAN_GEARS, CAR, CHECKSUM, DATE_FW_ECUS, \
FW_QUERY_CONFIG, FW_VERSIONS, LEGACY_SAFETY_MODE_CAR, \ EV_CAR, FW_QUERY_CONFIG, FW_VERSIONS, LEGACY_SAFETY_MODE_CAR, \
PART_NUMBER_FW_PATTERN, PLATFORM_CODE_ECUS, get_platform_codes PLATFORM_CODE_ECUS, get_platform_codes
Ecu = car.CarParams.Ecu Ecu = car.CarParams.Ecu
ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()} ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()}
@ -86,47 +88,75 @@ class TestHyundaiFingerprint(unittest.TestCase):
codes |= result codes |= result
if ecu[0] not in DATE_FW_ECUS or car_model in NO_DATES_PLATFORMS: if ecu[0] not in DATE_FW_ECUS or car_model in NO_DATES_PLATFORMS:
self.assertTrue(all({b"-" not in code for code in codes})) self.assertTrue(all({date is None for _, date in codes}))
else: else:
self.assertTrue(all({b"-" in code for code in codes})) self.assertTrue(all({date is not None for _, date in codes}))
if car_model == CAR.HYUNDAI_GENESIS: if car_model == CAR.HYUNDAI_GENESIS:
raise unittest.SkipTest("No part numbers for car model") raise unittest.SkipTest("No part numbers for car model")
# Hyundai places the ECU part number in their FW versions, assert all parsable # Hyundai places the ECU part number in their FW versions, assert all parsable
# Some examples of valid formats: b"56310-L0010", b"56310L0010", b"56310/M6300" # Some examples of valid formats: b"56310-L0010", b"56310L0010", b"56310/M6300"
for fw in fws: self.assertTrue(all({b"-" in code for code, _ in codes}),
match = PART_NUMBER_FW_PATTERN.search(fw) f"FW does not have part number: {fw}")
self.assertIsNotNone(match, fw)
def test_platform_codes_spot_check(self): def test_platform_codes_spot_check(self):
# Asserts basic platform code parsing behavior for a few cases # Asserts basic platform code parsing behavior for a few cases
codes = get_platform_codes([b"\xf1\x00DH LKAS 1.1 -150210"]) results = get_platform_codes([b"\xf1\x00DH LKAS 1.1 -150210"])
self.assertEqual(codes, {b"DH-1502"}) self.assertEqual(results, {(b"DH", b"150210")})
# Some cameras and all radars do not have dates # Some cameras and all radars do not have dates
codes = get_platform_codes([b"\xf1\x00AEhe SCC H-CUP 1.01 1.01 96400-G2000 "]) results = get_platform_codes([b"\xf1\x00AEhe SCC H-CUP 1.01 1.01 96400-G2000 "])
self.assertEqual(codes, {b"AEhe"}) self.assertEqual(results, {(b"AEhe-G2000", None)})
codes = get_platform_codes([b"\xf1\x00CV1_ RDR ----- 1.00 1.01 99110-CV000 "]) results = get_platform_codes([b"\xf1\x00CV1_ RDR ----- 1.00 1.01 99110-CV000 "])
self.assertEqual(codes, {b"CV1"}) self.assertEqual(results, {(b"CV1-CV000", None)})
codes = get_platform_codes([ results = get_platform_codes([
b"\xf1\x00DH LKAS 1.1 -150210", b"\xf1\x00DH LKAS 1.1 -150210",
b"\xf1\x00AEhe SCC H-CUP 1.01 1.01 96400-G2000 ", b"\xf1\x00AEhe SCC H-CUP 1.01 1.01 96400-G2000 ",
b"\xf1\x00CV1_ RDR ----- 1.00 1.01 99110-CV000 ", b"\xf1\x00CV1_ RDR ----- 1.00 1.01 99110-CV000 ",
]) ])
self.assertEqual(codes, {b"DH-1502", b"AEhe", b"CV1"}) self.assertEqual(results, {(b"DH", b"150210"), (b"AEhe-G2000", None), (b"CV1-CV000", None)})
# Returned platform codes must inclusively contain start/end dates results = get_platform_codes([
codes = get_platform_codes([
b"\xf1\x00LX2 MFC AT USA LHD 1.00 1.07 99211-S8100 220222", b"\xf1\x00LX2 MFC AT USA LHD 1.00 1.07 99211-S8100 220222",
b"\xf1\x00LX2 MFC AT USA LHD 1.00 1.08 99211-S8100 211103", b"\xf1\x00LX2 MFC AT USA LHD 1.00 1.08 99211-S8100 211103",
b"\xf1\x00ON MFC AT USA LHD 1.00 1.01 99211-S9100 190405", b"\xf1\x00ON MFC AT USA LHD 1.00 1.01 99211-S9100 190405",
b"\xf1\x00ON MFC AT USA LHD 1.00 1.03 99211-S9100 190720", b"\xf1\x00ON MFC AT USA LHD 1.00 1.03 99211-S9100 190720",
]) ])
self.assertEqual(codes, {b"LX2-2111", b"LX2-2112", b"LX2-2201", b"LX2-2202", self.assertEqual(results, {(b"LX2-S8100", b"220222"), (b"LX2-S8100", b"211103"),
b"ON-1904", b"ON-1905", b"ON-1906", b"ON-1907"}) (b"ON-S9100", b"190405"), (b"ON-S9100", b"190720")})
def test_fuzzy_excluded_platforms(self):
# Asserts a list of platforms that will not fuzzy fingerprint with platform codes due to them being shared.
# This list can be shrunk as we combine platforms and detect features
excluded_platforms = {
CAR.GENESIS_G70, # shared platform code, part number, and date
CAR.GENESIS_G70_2020,
CAR.TUCSON_4TH_GEN, # shared platform code and part number
CAR.TUCSON_HYBRID_4TH_GEN,
}
excluded_platforms |= CANFD_CAR - EV_CAR # shared platform codes
excluded_platforms |= NO_DATES_PLATFORMS # date codes are required to match
platforms_with_shared_codes = set()
for platform, fw_by_addr in FW_VERSIONS.items():
car_fw = []
for ecu, fw_versions in fw_by_addr.items():
ecu_name, addr, sub_addr = ecu
for fw in fw_versions:
car_fw.append({"ecu": ecu_name, "fwVersion": fw, "address": addr,
"subAddress": 0 if sub_addr is None else sub_addr})
CP = car.CarParams.new_message(carFw=car_fw)
matches = FW_QUERY_CONFIG.match_fw_to_car_fuzzy(build_fw_dict(CP.carFw))
if len(matches) == 1:
self.assertEqual(list(matches)[0], platform)
else:
platforms_with_shared_codes.add(platform)
self.assertEqual(platforms_with_shared_codes, excluded_platforms)
if __name__ == "__main__": if __name__ == "__main__":

@ -1,10 +1,7 @@
import re import re
from datetime import datetime
from dateutil import rrule
from collections import defaultdict
from dataclasses import dataclass from dataclasses import dataclass
from enum import Enum, IntFlag from enum import Enum, IntFlag
from typing import DefaultDict, Dict, List, Optional, Set, Union, cast from typing import Dict, List, Optional, Set, Tuple, Union
from cereal import car from cereal import car
from panda.python import uds from panda.python import uds
@ -12,7 +9,6 @@ from common.conversions import Conversions as CV
from selfdrive.car import dbc_dict from selfdrive.car import dbc_dict
from selfdrive.car.docs_definitions import CarFootnote, CarHarness, CarInfo, CarParts, Column from selfdrive.car.docs_definitions import CarFootnote, CarHarness, CarInfo, CarParts, Column
from selfdrive.car.fw_query_definitions import FwQueryConfig, Request, p16 from selfdrive.car.fw_query_definitions import FwQueryConfig, Request, p16
from system.swaglog import cloudlog
Ecu = car.CarParams.Ecu Ecu = car.CarParams.Ecu
@ -348,35 +344,73 @@ FINGERPRINTS = {
} }
def get_platform_codes(fw_versions: List[bytes]) -> Set[bytes]: def get_platform_codes(fw_versions: List[bytes]) -> Set[Tuple[bytes, Optional[bytes]]]:
codes: DefaultDict[bytes, Set[Optional[bytes]]] = defaultdict(set) # Returns unique, platform-specific identification codes for a set of versions
codes = set() # (code-Optional[part], date)
for fw in fw_versions: for fw in fw_versions:
code_match, date_match = (PLATFORM_CODE_FW_PATTERN.search(fw), code_match = PLATFORM_CODE_FW_PATTERN.search(fw)
DATE_FW_PATTERN.search(fw)) part_match = PART_NUMBER_FW_PATTERN.search(fw)
date_match = DATE_FW_PATTERN.search(fw)
if code_match is not None: if code_match is not None:
code = code_match.group() code: bytes = code_match.group()
part = part_match.group() if part_match else None
date = date_match.group() if date_match else None date = date_match.group() if date_match else None
codes[code].add(date) if part is not None:
# part number starts with generic ECU part type, add what is specific to platform
code += b"-" + part[-5:]
# Create platform codes for all dates inclusive if ECU has FW dates codes.add((code, date))
final_codes = set() return codes
for code, dates in codes.items():
# Radar and some cameras don't have FW dates
if None in dates:
final_codes.add(code)
continue
try:
parsed = {datetime.strptime(cast(bytes, date).decode()[:4], '%y%m') for date in dates}
except ValueError:
cloudlog.exception(f'Error parsing date in FW versions: {code!r}, {dates}')
final_codes.add(code)
continue
for monthly in rrule.rrule(rrule.MONTHLY, dtstart=min(parsed), until=max(parsed)): def match_fw_to_car_fuzzy(live_fw_versions) -> Set[str]:
final_codes.add(code + b'-' + monthly.strftime('%y%m').encode()) # Non-electric CAN FD platforms often do not have platform code specifiers needed
# to distinguish between hybrid and ICE. All EVs so far are either exclusively
# electric or specify electric in the platform code.
fuzzy_platform_blacklist = set(CANFD_CAR - EV_CAR)
candidates = set()
return final_codes for candidate, fws in FW_VERSIONS.items():
# Keep track of ECUs which pass all checks (platform codes, within date range)
valid_found_ecus = set()
valid_expected_ecus = {ecu[1:] for ecu in fws if ecu[0] in PLATFORM_CODE_ECUS}
for ecu, expected_versions in fws.items():
addr = ecu[1:]
# Only check ECUs expected to have platform codes
if ecu[0] not in PLATFORM_CODE_ECUS:
continue
# Expected platform codes & dates
codes = get_platform_codes(expected_versions)
expected_platform_codes = {code for code, _ in codes}
expected_dates = {date for _, date in codes if date is not None}
# Found platform codes & dates
codes = get_platform_codes(live_fw_versions.get(addr, set()))
found_platform_codes = {code for code, _ in codes}
found_dates = {date for _, date in codes if date is not None}
# Check platform code + part number matches for any found versions
if not any(found_platform_code in expected_platform_codes for found_platform_code in found_platform_codes):
break
if ecu[0] in DATE_FW_ECUS:
# If ECU can have a FW date, require it to exist
# (this excludes candidates in the database without dates)
if not len(expected_dates) or not len(found_dates):
break
# Check any date within range in the database, format is %y%m%d
if not any(min(expected_dates) <= found_date <= max(expected_dates) for found_date in found_dates):
break
valid_found_ecus.add(addr)
# If all live ECUs pass all checks for candidate, add it as a match
if valid_expected_ecus.issubset(valid_found_ecus):
candidates.add(candidate)
return candidates - fuzzy_platform_blacklist
HYUNDAI_VERSION_REQUEST_LONG = bytes([uds.SERVICE_TYPE.READ_DATA_BY_IDENTIFIER]) + \ HYUNDAI_VERSION_REQUEST_LONG = bytes([uds.SERVICE_TYPE.READ_DATA_BY_IDENTIFIER]) + \
@ -460,6 +494,8 @@ FW_QUERY_CONFIG = FwQueryConfig(
(Ecu.hvac, 0x7b3, None), # HVAC Control Assembly (Ecu.hvac, 0x7b3, None), # HVAC Control Assembly
(Ecu.cornerRadar, 0x7b7, None), (Ecu.cornerRadar, 0x7b7, None),
], ],
# Custom fuzzy fingerprinting function using platform codes, part numbers + FW dates:
match_fw_to_car_fuzzy=match_fw_to_car_fuzzy,
) )
FW_VERSIONS = { FW_VERSIONS = {

@ -10,7 +10,7 @@ from cereal import car
from common.params import Params from common.params import Params
from selfdrive.car.car_helpers import interfaces from selfdrive.car.car_helpers import interfaces
from selfdrive.car.fingerprints import FW_VERSIONS from selfdrive.car.fingerprints import FW_VERSIONS
from selfdrive.car.fw_versions import FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, match_fw_to_car, get_fw_versions from selfdrive.car.fw_versions import FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, build_fw_dict, match_fw_to_car, get_fw_versions
CarFw = car.CarParams.CarFw CarFw = car.CarParams.CarFw
Ecu = car.CarParams.Ecu Ecu = car.CarParams.Ecu
@ -45,6 +45,28 @@ class TestFwFingerprint(unittest.TestCase):
_, matches = match_fw_to_car(CP.carFw, allow_fuzzy=False) _, matches = match_fw_to_car(CP.carFw, allow_fuzzy=False)
self.assertFingerprints(matches, car_model) self.assertFingerprints(matches, car_model)
@parameterized.expand([(b, c, e[c]) for b, e in VERSIONS.items() for c in e])
def test_custom_fuzzy_match(self, brand, car_model, ecus):
# Assert brand-specific fuzzy fingerprinting function doesn't disagree with standard fuzzy function
config = FW_QUERY_CONFIGS[brand]
if config.match_fw_to_car_fuzzy is None:
raise unittest.SkipTest("Brand does not implement custom fuzzy fingerprinting function")
CP = car.CarParams.new_message()
for _ in range(5):
fw = []
for ecu, fw_versions in ecus.items():
ecu_name, addr, sub_addr = ecu
fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions), 'brand': brand,
"address": addr, "subAddress": 0 if sub_addr is None else sub_addr})
CP.carFw = fw
_, matches = match_fw_to_car(CP.carFw, allow_exact=False, log=False)
brand_matches = config.match_fw_to_car_fuzzy(build_fw_dict(CP.carFw))
# If both have matches, they must agree
if len(matches) == 1 and len(brand_matches) == 1:
self.assertEqual(matches, brand_matches)
@parameterized.expand([(b, c, e[c]) for b, e in VERSIONS.items() for c in e]) @parameterized.expand([(b, c, e[c]) for b, e in VERSIONS.items() for c in e])
def test_fuzzy_match_ecu_count(self, brand, car_model, ecus): def test_fuzzy_match_ecu_count(self, brand, car_model, ecus):
# Asserts that fuzzy matching does not count matching FW, but ECU address keys # Asserts that fuzzy matching does not count matching FW, but ECU address keys

Loading…
Cancel
Save