Fuzzy match on ECU FW versions (#20687)

* Fuzzy match on 3+ ECUs

* reuse dict

* exclude some shared ecus to be sure

* show alert for fuzzy match

* use title case

* require community toggle

* refactor

* do both exact and fuzzy in test script

* update test script

* add fuzz test and lower matches to >= 2

* strip alert length

* sort mismatches

* add fw tests to test_startup

* bump cereal
old-commit-hash: e4f73fbda5
vw-mqb-aeb
Willem Melching 5 years ago committed by GitHub
parent c0ac9bb63c
commit ee0e80a6d4
  1. 2
      cereal
  2. 15
      selfdrive/car/car_helpers.py
  3. 82
      selfdrive/car/fw_versions.py
  4. 3
      selfdrive/car/tests/test_fw_fingerprint.py
  5. 7
      selfdrive/controls/controlsd.py
  6. 11
      selfdrive/controls/lib/events.py
  7. 63
      selfdrive/controls/tests/test_startup.py
  8. 52
      selfdrive/debug/internal/fuzz_fw_fingerprint.py
  9. 77
      selfdrive/debug/test_fw_query_on_routes.py

@ -1 +1 @@
Subproject commit 957147cb8428d984a776ca468f866160f3b71bbc
Subproject commit b39c6fc26d93ca776b27a2e4005b12ae85e7bacc

@ -13,7 +13,7 @@ from cereal import car
EventName = car.CarEvent.EventName
def get_startup_event(car_recognized, controller_available):
def get_startup_event(car_recognized, controller_available, fuzzy_fingerprint):
if comma_remote and tested_branch:
event = EventName.startup
else:
@ -23,6 +23,8 @@ def get_startup_event(car_recognized, controller_available):
event = EventName.startupNoCar
elif car_recognized and not controller_available:
event = EventName.startupNoControl
elif car_recognized and fuzzy_fingerprint:
event = EventName.startupFuzzyFingerprint
return event
@ -104,10 +106,10 @@ def fingerprint(logcan, sendcan):
_, vin = get_vin(logcan, sendcan, bus)
car_fw = get_fw_versions(logcan, sendcan, bus)
fw_candidates = match_fw_to_car(car_fw)
exact_fw_match, fw_candidates = match_fw_to_car(car_fw)
else:
vin = VIN_UNKNOWN
fw_candidates, car_fw = set(), []
exact_fw_match, fw_candidates, car_fw = True, set(), []
cloudlog.warning("VIN %s", vin)
Params().put("CarVin", vin)
@ -152,23 +154,25 @@ def fingerprint(logcan, sendcan):
frame += 1
exact_match = True
source = car.CarParams.FingerprintSource.can
# If FW query returns exactly 1 candidate, use it
if len(fw_candidates) == 1:
car_fingerprint = list(fw_candidates)[0]
source = car.CarParams.FingerprintSource.fw
exact_match = exact_fw_match
if fixed_fingerprint:
car_fingerprint = fixed_fingerprint
source = car.CarParams.FingerprintSource.fixed
cloudlog.warning("fingerprinted %s", car_fingerprint)
return car_fingerprint, finger, vin, car_fw, source
return car_fingerprint, finger, vin, car_fw, source, exact_match
def get_car(logcan, sendcan):
candidate, fingerprints, vin, car_fw, source = fingerprint(logcan, sendcan)
candidate, fingerprints, vin, car_fw, source, exact_match = fingerprint(logcan, sendcan)
if candidate is None:
cloudlog.warning("car doesn't match any fingerprints: %r", fingerprints)
@ -179,5 +183,6 @@ def get_car(logcan, sendcan):
car_params.carVin = vin
car_params.carFw = car_fw
car_params.fingerprintSource = source
car_params.fuzzyFingerprint = not exact_match
return CarInterface(car_params, CarController, CarState), car_params

@ -2,6 +2,7 @@
import struct
import traceback
from typing import Any
from collections import defaultdict
from tqdm import tqdm
@ -136,15 +137,67 @@ def chunks(l, n=128):
yield l[i:i + n]
def match_fw_to_car(fw_versions):
candidates = FW_VERSIONS
invalid = []
def build_fw_dict(fw_versions):
fw_versions_dict = {}
for fw in fw_versions:
addr = fw.address
sub_addr = fw.subAddress if fw.subAddress != 0 else None
fw_versions_dict[(addr, sub_addr)] = fw.fwVersion
return fw_versions_dict
def match_fw_to_car_fuzzy(fw_versions_dict, log=True, exclude=None):
"""Do a fuzzy FW match. This function will return a match, and the number of firmware version
that were matched uniquely to that specific car. If multiple ECUs uniquely match to different cars
the match is rejected."""
# These ECUs are known to be shared between models (EPS only between hybrid/ICE version)
# Getting this exactly right isn't crucial, but excluding camera and radar makes it almost
# impossible to get 3 matching versions, even if two models with shared parts are released at the same
# time and only one is in our database.
exclude_types = [Ecu.fwdCamera, Ecu.fwdRadar, Ecu.eps]
# Build lookup table from (addr, subaddr, fw) to list of candidate cars
all_fw_versions = defaultdict(list)
for candidate, fw_by_addr in FW_VERSIONS.items():
if candidate == exclude:
continue
for addr, fws in fw_by_addr.items():
if addr[0] in exclude_types:
continue
for f in fws:
all_fw_versions[(addr[1], addr[2], f)].append(candidate)
match_count = 0
candidate = None
for addr, version in fw_versions_dict.items():
# All cars that have this FW response on the specified address
candidates = all_fw_versions[(addr[0], addr[1], version)]
if len(candidates) == 1:
match_count += 1
if candidate is None:
candidate = candidates[0]
# We uniquely matched two different cars. No fuzzy match possible
elif candidate != candidates[0]:
return set()
if match_count >= 2:
if log:
cloudlog.error(f"Fingerprinted {candidate} using fuzzy match. {match_count} matching ECUs")
return set([candidate])
else:
return set()
def match_fw_to_car_exact(fw_versions_dict):
"""Do an exact FW match. Returns all cars that match the given
FW versions for a list of "essential" ECUs. If an ECU is not considered
essential the FW version can be missing to get a fingerprint, but if it's present it
needs to match the database."""
invalid = []
candidates = FW_VERSIONS
for candidate, fws in candidates.items():
for ecu, expected_versions in fws.items():
@ -155,11 +208,11 @@ def match_fw_to_car(fw_versions):
if ecu_type == Ecu.esp and candidate in [TOYOTA.RAV4, TOYOTA.COROLLA, TOYOTA.HIGHLANDER] and found_version is None:
continue
# TODO: on some toyota, the engine can show on two different addresses
# On some Toyota models, the engine can show on two different addresses
if ecu_type == Ecu.engine and candidate in [TOYOTA.COROLLA_TSS2, TOYOTA.CHR, TOYOTA.LEXUS_IS, TOYOTA.AVALON] and found_version is None:
continue
# ignore non essential ecus
# Ignore non essential ecus
if ecu_type not in ESSENTIAL_ECUS and found_version is None:
continue
@ -170,6 +223,21 @@ def match_fw_to_car(fw_versions):
return set(candidates.keys()) - set(invalid)
def match_fw_to_car(fw_versions, allow_fuzzy=True):
fw_versions_dict = build_fw_dict(fw_versions)
matches = match_fw_to_car_exact(fw_versions_dict)
exact_match = True
if allow_fuzzy and len(matches) == 0:
matches = match_fw_to_car_fuzzy(fw_versions_dict)
# Fuzzy match found
if len(matches) == 1:
exact_match = False
return exact_match, matches
def get_fw_versions(logcan, sendcan, bus, extra=None, timeout=0.1, debug=False, progress=False):
ecu_types = {}
@ -264,7 +332,7 @@ if __name__ == "__main__":
t = time.time()
fw_vers = get_fw_versions(logcan, sendcan, 1, extra=extra, debug=args.debug, progress=True)
candidates = match_fw_to_car(fw_vers)
_, candidates = match_fw_to_car(fw_vers)
print()
print("Found FW versions")

@ -28,7 +28,8 @@ class TestFwFingerprint(unittest.TestCase):
fw.append({"ecu": ecu_name, "fwVersion": random.choice(fw_versions),
"address": addr, "subAddress": 0 if sub_addr is None else sub_addr})
CP.carFw = fw
self.assertFingerprints(match_fw_to_car(CP.carFw), car_model)
_, matches = match_fw_to_car(CP.carFw)
self.assertFingerprints(matches, car_model)
def test_no_duplicate_fw_versions(self):
passed = True

@ -84,9 +84,12 @@ class Controls:
sounds_available = HARDWARE.get_sound_card_online()
car_recognized = self.CP.carName != 'mock'
fuzzy_fingerprint = self.CP.fuzzyFingerprint
# If stock camera is disconnected, we loaded car controls and it's not dashcam mode
controller_available = self.CP.enableCamera and self.CI.CC is not None and not passive and not self.CP.dashcamOnly
community_feature_disallowed = self.CP.communityFeature and not community_feature_toggle
community_feature = self.CP.communityFeature or fuzzy_fingerprint
community_feature_disallowed = community_feature and (not community_feature_toggle)
self.read_only = not car_recognized or not controller_available or \
self.CP.dashcamOnly or community_feature_disallowed
if self.read_only:
@ -137,7 +140,7 @@ class Controls:
self.sm['driverMonitoringState'].faceDetected = False
self.sm['liveParameters'].valid = True
self.startup_event = get_startup_event(car_recognized, controller_available)
self.startup_event = get_startup_event(car_recognized, controller_available, fuzzy_fingerprint)
if not sounds_available:
self.events.add(EventName.soundsUnavailable, static=True)

@ -208,6 +208,13 @@ def wrong_car_mode_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: boo
text = "Main Switch Off"
return NoEntryAlert(text, duration_hud_alert=0.)
def startup_fuzzy_fingerprint_alert(CP: car.CarParams, sm: messaging.SubMaster, metric: bool) -> Alert:
return Alert(
"WARNING: No Exact Match on Car Model",
f"Closest Match: {CP.carFingerprint.title()[:40]}",
AlertStatus.userPrompt, AlertSize.mid,
Priority.LOWER, VisualAlert.none, AudibleAlert.none, 0., 0., 15.)
EVENTS: Dict[int, Dict[str, Union[Alert, Callable[[Any, messaging.SubMaster, bool], Alert]]]] = {
# ********** events with no alerts **********
@ -253,6 +260,10 @@ EVENTS: Dict[int, Dict[str, Union[Alert, Callable[[Any, messaging.SubMaster, boo
Priority.LOWER, VisualAlert.none, AudibleAlert.none, 0., 0., 15.),
},
EventName.startupFuzzyFingerprint: {
ET.PERMANENT: startup_fuzzy_fingerprint_alert,
},
EventName.dashcamMode: {
ET.PERMANENT: Alert(
"Dashcam Mode",

@ -9,11 +9,23 @@ from common.params import Params
from selfdrive.boardd.boardd_api_impl import can_list_to_can_capnp # pylint: disable=no-name-in-module,import-error
from selfdrive.car.fingerprints import _FINGERPRINTS
from selfdrive.car.hyundai.values import CAR as HYUNDAI
from selfdrive.car.toyota.values import CAR as TOYOTA
from selfdrive.car.mazda.values import CAR as MAZDA
from selfdrive.controls.lib.events import EVENT_NAME
from selfdrive.test.helpers import with_processes
EventName = car.CarEvent.EventName
Ecu = car.CarParams.Ecu
COROLLA_TSS2_FW_VERSIONS = [
(Ecu.engine, 0x700, None, b'\x01896630ZG5000\x00\x00\x00\x00'),
(Ecu.eps, 0x7a1, None, b'\x018965B1255000\x00\x00\x00\x00'),
(Ecu.esp, 0x7b0, None, b'\x01F152602280\x00\x00\x00\x00\x00\x00'),
(Ecu.fwdRadar, 0x750, 0xf, b'\x018821F3301100\x00\x00\x00\x00'),
(Ecu.fwdCamera, 0x750, 0x6d, b'\x028646F12010D0\x00\x00\x00\x008646G26011A0\x00\x00\x00\x00'),
]
COROLLA_TSS2_FW_VERSIONS_FUZZY = COROLLA_TSS2_FW_VERSIONS[:-1] + [(Ecu.fwdCamera, 0x750, 0x6d, b'xxxxxx')]
class TestStartup(unittest.TestCase):
@ -21,23 +33,30 @@ class TestStartup(unittest.TestCase):
# TODO: test EventName.startup for release branches
# officially supported car
(EventName.startupMaster, HYUNDAI.SONATA, False),
(EventName.startupMaster, HYUNDAI.SONATA, True),
(EventName.startupMaster, HYUNDAI.SONATA, False, None),
(EventName.startupMaster, HYUNDAI.SONATA, True, None),
# offically supported car, FW query
(EventName.startupMaster, TOYOTA.COROLLA_TSS2, False, COROLLA_TSS2_FW_VERSIONS),
# community supported car
(EventName.startupMaster, HYUNDAI.KIA_STINGER, True),
(EventName.communityFeatureDisallowed, HYUNDAI.KIA_STINGER, False),
(EventName.startupMaster, HYUNDAI.KIA_STINGER, True, None),
(EventName.communityFeatureDisallowed, HYUNDAI.KIA_STINGER, False, None),
# dashcamOnly car
(EventName.startupNoControl, MAZDA.CX5, True),
(EventName.startupNoControl, MAZDA.CX5, False),
(EventName.startupNoControl, MAZDA.CX5, True, None),
(EventName.startupNoControl, MAZDA.CX5, False, None),
# unrecognized car
(EventName.startupNoCar, None, True),
(EventName.startupNoCar, None, False),
(EventName.startupNoCar, None, True, None),
(EventName.startupNoCar, None, False, None),
# fuzzy match
(EventName.startupFuzzyFingerprint, TOYOTA.COROLLA_TSS2, True, COROLLA_TSS2_FW_VERSIONS_FUZZY),
(EventName.communityFeatureDisallowed, TOYOTA.COROLLA_TSS2, False, COROLLA_TSS2_FW_VERSIONS_FUZZY),
])
@with_processes(['controlsd'])
def test_startup_alert(self, expected_event, car, toggle_enabled):
def test_startup_alert(self, expected_event, car_model, toggle_enabled, fw_versions):
# TODO: this should be done without any real sockets
controls_sock = messaging.sub_sock("controlsState")
@ -49,6 +68,24 @@ class TestStartup(unittest.TestCase):
params.put_bool("OpenpilotEnabledToggle", True)
params.put_bool("CommunityFeaturesToggle", toggle_enabled)
# Build capnn version of FW array
if fw_versions is not None:
car_fw = []
cp = car.CarParams.new_message()
for ecu, addr, subaddress, version in fw_versions:
f = car.CarParams.CarFw.new_message()
f.ecu = ecu
f.address = addr
f.fwVersion = version
if subaddress is not None:
f.subAddress = subaddress
car_fw.append(f)
cp.carVin = "1" * 17
cp.carFw = car_fw
params.put("CarParamsCache", cp.to_bytes())
time.sleep(2) # wait for controlsd to be ready
msg = messaging.new_message('pandaState')
@ -56,10 +93,10 @@ class TestStartup(unittest.TestCase):
pm.send('pandaState', msg)
# fingerprint
if car is None:
if (car_model is None) or (fw_versions is not None):
finger = {addr: 1 for addr in range(1, 100)}
else:
finger = _FINGERPRINTS[car][0]
finger = _FINGERPRINTS[car_model][0]
for _ in range(500):
msgs = [[addr, 0, b'\x00'*length, 0] for addr, length in finger.items()]
@ -70,10 +107,10 @@ class TestStartup(unittest.TestCase):
if len(msgs):
event_name = msgs[0].controlsState.alertType.split("/")[0]
self.assertEqual(EVENT_NAME[expected_event], event_name,
f"expected {EVENT_NAME[expected_event]} for '{car}', got {event_name}")
f"expected {EVENT_NAME[expected_event]} for '{car_model}', got {event_name}")
break
else:
self.fail(f"failed to fingerprint {car}")
self.fail(f"failed to fingerprint {car_model}")
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,52 @@
#!/usr/bin/env python3
# type: ignore
import random
from collections import defaultdict
from tqdm import tqdm
from selfdrive.car.fw_versions import match_fw_to_car_fuzzy
from selfdrive.car.toyota.values import FW_VERSIONS as TOYOTA_FW_VERSIONS
from selfdrive.car.honda.values import FW_VERSIONS as HONDA_FW_VERSIONS
from selfdrive.car.hyundai.values import FW_VERSIONS as HYUNDAI_FW_VERSIONS
from selfdrive.car.volkswagen.values import FW_VERSIONS as VW_FW_VERSIONS
FWS = {}
FWS.update(TOYOTA_FW_VERSIONS)
FWS.update(HONDA_FW_VERSIONS)
FWS.update(HYUNDAI_FW_VERSIONS)
FWS.update(VW_FW_VERSIONS)
if __name__ == "__main__":
total = 0
match = 0
wrong_match = 0
confusions = defaultdict(set)
for _ in tqdm(range(1000)):
for candidate, fws in FWS.items():
fw_dict = {}
for (tp, addr, subaddr), fw_list in fws.items():
fw_dict[(addr, subaddr)] = random.choice(fw_list)
matches = match_fw_to_car_fuzzy(fw_dict, log=False, exclude=candidate)
total += 1
if len(matches) == 1:
if list(matches)[0] == candidate:
match += 1
else:
confusions[candidate] |= matches
wrong_match += 1
print()
for candidate, wrong_matches in sorted(confusions.items()):
print(candidate, wrong_matches)
print()
print(f"Total fuzz cases: {total}")
print(f"Correct matches: {match}")
print(f"Wrong matches: {wrong_match}")

@ -7,7 +7,8 @@ import os
import traceback
from tqdm import tqdm
from tools.lib.logreader import LogReader
from selfdrive.car.fw_versions import match_fw_to_car
from tools.lib.route import Route
from selfdrive.car.fw_versions import match_fw_to_car_exact, match_fw_to_car_fuzzy, build_fw_dict
from selfdrive.car.toyota.values import FW_VERSIONS as TOYOTA_FW_VERSIONS
from selfdrive.car.honda.values import FW_VERSIONS as HONDA_FW_VERSIONS
from selfdrive.car.hyundai.values import FW_VERSIONS as HYUNDAI_FW_VERSIONS
@ -18,6 +19,7 @@ from selfdrive.car.honda.values import FINGERPRINTS as HONDA_FINGERPRINTS
from selfdrive.car.hyundai.values import FINGERPRINTS as HYUNDAI_FINGERPRINTS
from selfdrive.car.volkswagen.values import FINGERPRINTS as VW_FINGERPRINTS
NO_API = "NO_API" in os.environ
SUPPORTED_CARS = list(TOYOTA_FINGERPRINTS.keys()) + list(HONDA_FINGERPRINTS.keys()) + list(HYUNDAI_FINGERPRINTS.keys())+ list(VW_FINGERPRINTS.keys())
if __name__ == "__main__":
@ -33,25 +35,37 @@ if __name__ == "__main__":
mismatches = defaultdict(list)
wrong = 0
good = 0
not_fingerprinted = 0
solved_by_fuzzy = 0
good_exact = 0
wrong_fuzzy = 0
good_fuzzy = 0
dongles = []
for route in tqdm(routes):
route = route.rstrip()
dongle_id, time = route.split('|')
qlog_path = f"cd:/{dongle_id}/{time}/0/qlog.bz2"
if dongle_id in dongles:
continue
if NO_API:
qlog_path = f"cd:/{dongle_id}/{time}/0/qlog.bz2"
else:
route = Route(route)
qlog_path = route.qlog_paths()[0]
if qlog_path is None:
continue
try:
lr = LogReader(qlog_path)
dongles.append(dongle_id)
for msg in lr:
if msg.which() == "pandaState":
if msg.pandaState.pandaType not in ['uno', 'blackPanda']:
dongles.append(dongle_id)
if msg.pandaState.pandaType not in ['uno', 'blackPanda', 'dos']:
break
elif msg.which() == "carParams":
@ -61,7 +75,6 @@ if __name__ == "__main__":
if len(car_fw) == 0:
break
dongles.append(dongle_id)
live_fingerprint = msg.carParams.carFingerprint
if args.car is not None:
@ -70,15 +83,28 @@ if __name__ == "__main__":
if live_fingerprint not in SUPPORTED_CARS:
break
candidates = match_fw_to_car(car_fw)
if (len(candidates) == 1) and (list(candidates)[0] == live_fingerprint):
good += 1
print("Correct", live_fingerprint, dongle_id)
fw_versions_dict = build_fw_dict(car_fw)
exact_matches = match_fw_to_car_exact(fw_versions_dict)
fuzzy_matches = match_fw_to_car_fuzzy(fw_versions_dict)
if (len(exact_matches) == 1) and (list(exact_matches)[0] == live_fingerprint):
good_exact += 1
print(f"Correct! Live: {live_fingerprint} - Fuzzy: {fuzzy_matches}")
# Check if fuzzy match was correct
if len(fuzzy_matches) == 1:
if list(fuzzy_matches)[0] != live_fingerprint:
wrong_fuzzy += 1
print(f"{dongle_id}|{time}")
print("Fuzzy match wrong! Fuzzy:", fuzzy_matches, "Live:", live_fingerprint)
else:
good_fuzzy += 1
break
print(f"{dongle_id}|{time}")
print("Old style:", live_fingerprint, "Vin", msg.carParams.carVin)
print("New style:", candidates)
print("New style (exact):", exact_matches)
print("New style (fuzzy):", fuzzy_matches)
for version in car_fw:
subaddr = None if version.subAddress == 0 else hex(version.subAddress)
@ -114,19 +140,24 @@ if __name__ == "__main__":
mismatches[live_fingerprint].append(mismatch)
print()
wrong += 1
not_fingerprinted += 1
if len(fuzzy_matches) == 1:
if list(fuzzy_matches)[0] == live_fingerprint:
solved_by_fuzzy += 1
else:
wrong_fuzzy += 1
print("Fuzzy match wrong! Fuzzy:", fuzzy_matches, "Live:", live_fingerprint)
break
except Exception:
traceback.print_exc()
except KeyboardInterrupt:
break
print(f"Fingerprinted: {good} - Not fingerprinted: {wrong}")
print(f"Number of dongle ids checked: {len(dongles)}")
print()
# Print FW versions that need to be added seperated out by car and address
for car, m in mismatches.items():
for car, m in sorted(mismatches.items()):
print(car)
addrs = defaultdict(list)
for (addr, sub_addr, version) in m:
@ -138,3 +169,15 @@ if __name__ == "__main__":
print(f" {v},")
print(" ]")
print()
print()
print(f"Number of dongle ids checked: {len(dongles)}")
print(f"Fingerprinted: {good_exact}")
print(f"Not fingerprinted: {not_fingerprinted}")
print(f" of which had a fuzzy match: {solved_by_fuzzy}")
print()
print(f"Correct fuzzy matches: {good_fuzzy}")
print(f"Wrong fuzzy matches: {wrong_fuzzy}")
print()

Loading…
Cancel
Save