FPv2: add ECU whitelisting support (#24730)

* reduce fw queries for hyundai

* no whitelisting yet

* formatting

* fix

* clean up test

* clean up variables, comments, error

* update error
old-commit-hash: 29a1bf5a46
taco
Shane Smiskol 3 years ago committed by GitHub
parent f21897ee14
commit 41f9767370
  1. 7
      selfdrive/car/fw_versions.py
  2. 21
      selfdrive/car/tests/test_fw_fingerprint.py

@ -2,7 +2,7 @@
import struct import struct
import traceback import traceback
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass from dataclasses import dataclass, field
from typing import Any, List from typing import Any, List
from tqdm import tqdm from tqdm import tqdm
@ -96,9 +96,11 @@ class Request:
brand: str brand: str
request: List[bytes] request: List[bytes]
response: List[bytes] response: List[bytes]
whitelist_ecus: List[int] = field(default_factory=list)
rx_offset: int = DEFAULT_RX_OFFSET rx_offset: int = DEFAULT_RX_OFFSET
bus: int = 1 bus: int = 1
REQUESTS: List[Request] = [ REQUESTS: List[Request] = [
# Subaru # Subaru
Request( Request(
@ -328,7 +330,8 @@ def get_fw_versions(logcan, sendcan, extra=None, timeout=0.1, debug=False, progr
for addr_chunk in chunks(addr): for addr_chunk in chunks(addr):
for r in REQUESTS: for r in REQUESTS:
try: try:
addrs = [(a, s) for (b, a, s) in addr_chunk if b in (r.brand, 'any')] addrs = [(a, s) for (b, a, s) in addr_chunk if b in (r.brand, 'any') and
(len(r.whitelist_ecus) == 0 or ecu_types[(a, s)] in r.whitelist_ecus)]
if addrs: if addrs:
query = IsoTpParallelQuery(sendcan, logcan, r.bus, addrs, r.request, r.response, r.rx_offset, debug=debug) query = IsoTpParallelQuery(sendcan, logcan, r.bus, addrs, r.request, r.response, r.rx_offset, debug=debug)

@ -4,9 +4,9 @@ import unittest
from parameterized import parameterized from parameterized import parameterized
from cereal import car from cereal import car
from selfdrive.car.car_helpers import interfaces from selfdrive.car.car_helpers import get_interface_attr, interfaces
from selfdrive.car.fingerprints import FW_VERSIONS from selfdrive.car.fingerprints import FW_VERSIONS
from selfdrive.car.fw_versions import match_fw_to_car from selfdrive.car.fw_versions import REQUESTS, match_fw_to_car
CarFw = car.CarParams.CarFw CarFw = car.CarParams.CarFw
Ecu = car.CarParams.Ecu Ecu = car.CarParams.Ecu
@ -57,6 +57,23 @@ class TestFwFingerprint(unittest.TestCase):
self.assertTrue(passed, "Blacklisted FW versions found") self.assertTrue(passed, "Blacklisted FW versions found")
def test_fw_request_ecu_whitelist(self):
passed = True
brands = set(r.brand for r in REQUESTS)
versions = get_interface_attr('FW_VERSIONS')
for brand in brands:
whitelisted_ecus = [ecu for r in REQUESTS for ecu in r.whitelist_ecus if r.brand == brand]
brand_ecus = set([fw[0] for car_fw in versions[brand].values() for fw in car_fw])
# each ecu in brand's fw versions needs to be whitelisted at least once
ecus_not_whitelisted = set(brand_ecus) - set(whitelisted_ecus)
if len(whitelisted_ecus) and len(ecus_not_whitelisted):
ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_not_whitelisted])
print(f'{brand.title()}: FW query whitelist missing ecus: {ecu_strings}')
passed = False
self.assertTrue(passed, "Not all ecus in FW versions found in query whitelists")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

Loading…
Cancel
Save