From 41f9767370c1525356cdf933232f7ac1da61dcfc Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Fri, 3 Jun 2022 20:16:11 -0700 Subject: [PATCH] FPv2: add ECU whitelisting support (#24730) * reduce fw queries for hyundai * no whitelisting yet * formatting * fix * clean up test * clean up variables, comments, error * update error old-commit-hash: 29a1bf5a4667de9de74e977956145132b2301fd7 --- selfdrive/car/fw_versions.py | 7 +++++-- selfdrive/car/tests/test_fw_fingerprint.py | 21 +++++++++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/selfdrive/car/fw_versions.py b/selfdrive/car/fw_versions.py index 272928248c..69b7c5f452 100755 --- a/selfdrive/car/fw_versions.py +++ b/selfdrive/car/fw_versions.py @@ -2,7 +2,7 @@ import struct import traceback from collections import defaultdict -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any, List from tqdm import tqdm @@ -96,9 +96,11 @@ class Request: brand: str request: List[bytes] response: List[bytes] + whitelist_ecus: List[int] = field(default_factory=list) rx_offset: int = DEFAULT_RX_OFFSET bus: int = 1 + REQUESTS: List[Request] = [ # Subaru Request( @@ -328,7 +330,8 @@ def get_fw_versions(logcan, sendcan, extra=None, timeout=0.1, debug=False, progr for addr_chunk in chunks(addr): for r in REQUESTS: try: - addrs = [(a, s) for (b, a, s) in addr_chunk if b in (r.brand, 'any')] + addrs = [(a, s) for (b, a, s) in addr_chunk if b in (r.brand, 'any') and + (len(r.whitelist_ecus) == 0 or ecu_types[(a, s)] in r.whitelist_ecus)] if addrs: query = IsoTpParallelQuery(sendcan, logcan, r.bus, addrs, r.request, r.response, r.rx_offset, debug=debug) diff --git a/selfdrive/car/tests/test_fw_fingerprint.py b/selfdrive/car/tests/test_fw_fingerprint.py index ae209b2910..ed7e420e1a 100755 --- a/selfdrive/car/tests/test_fw_fingerprint.py +++ b/selfdrive/car/tests/test_fw_fingerprint.py @@ -4,9 +4,9 @@ import unittest from parameterized import parameterized from cereal import car -from selfdrive.car.car_helpers import interfaces +from selfdrive.car.car_helpers import get_interface_attr, interfaces from selfdrive.car.fingerprints import FW_VERSIONS -from selfdrive.car.fw_versions import match_fw_to_car +from selfdrive.car.fw_versions import REQUESTS, match_fw_to_car CarFw = car.CarParams.CarFw Ecu = car.CarParams.Ecu @@ -57,6 +57,23 @@ class TestFwFingerprint(unittest.TestCase): self.assertTrue(passed, "Blacklisted FW versions found") + def test_fw_request_ecu_whitelist(self): + passed = True + brands = set(r.brand for r in REQUESTS) + versions = get_interface_attr('FW_VERSIONS') + for brand in brands: + whitelisted_ecus = [ecu for r in REQUESTS for ecu in r.whitelist_ecus if r.brand == brand] + brand_ecus = set([fw[0] for car_fw in versions[brand].values() for fw in car_fw]) + + # each ecu in brand's fw versions needs to be whitelisted at least once + ecus_not_whitelisted = set(brand_ecus) - set(whitelisted_ecus) + if len(whitelisted_ecus) and len(ecus_not_whitelisted): + ecu_strings = ", ".join([f'Ecu.{ECU_NAME[ecu]}' for ecu in ecus_not_whitelisted]) + print(f'{brand.title()}: FW query whitelist missing ecus: {ecu_strings}') + passed = False + + self.assertTrue(passed, "Not all ecus in FW versions found in query whitelists") + if __name__ == "__main__": unittest.main()