From 2c84f4862da760388bf9b2fd3636908da1fbae6f Mon Sep 17 00:00:00 2001 From: Shane Smiskol Date: Tue, 6 Aug 2024 23:41:58 -0700 Subject: [PATCH] test_fw_fingerprint: remove pandad, less cereal --- .importlinter | 1 - selfdrive/car/tests/test_fw_fingerprint.py | 37 +++++++++++----------- 2 files changed, 18 insertions(+), 20 deletions(-) diff --git a/.importlinter b/.importlinter index 38b7448ffc..55eb894007 100644 --- a/.importlinter +++ b/.importlinter @@ -37,7 +37,6 @@ ignore_imports = openpilot.selfdrive.car.fw_versions -> openpilot.common.params openpilot.selfdrive.car.ecu_addrs -> openpilot.common.params # requires generic CAN send/receive functions - openpilot.selfdrive.car.tests.test_fw_fingerprint -> openpilot.selfdrive.pandad # these are okay openpilot.selfdrive.car.card -> openpilot.common.swaglog diff --git a/selfdrive/car/tests/test_fw_fingerprint.py b/selfdrive/car/tests/test_fw_fingerprint.py index f5283acb1f..48e0058dd0 100644 --- a/selfdrive/car/tests/test_fw_fingerprint.py +++ b/selfdrive/car/tests/test_fw_fingerprint.py @@ -3,15 +3,15 @@ import random import time from collections import defaultdict from parameterized import parameterized +from types import SimpleNamespace from cereal import car -from openpilot.selfdrive.car import make_can_msg +from openpilot.selfdrive.car.can_definitions import CanData from openpilot.selfdrive.car.car_helpers import interfaces from openpilot.selfdrive.car.fingerprints import FW_VERSIONS from openpilot.selfdrive.car.fw_versions import ESSENTIAL_ECUS, FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, build_fw_dict, \ match_fw_to_car, get_brand_ecu_matches, get_fw_versions, get_fw_versions_ordered, get_present_ecus from openpilot.selfdrive.car.vin import get_vin -from openpilot.selfdrive.pandad import can_list_to_can_capnp CarFw = car.CarParams.CarFw Ecu = car.CarParams.Ecu @@ -19,15 +19,6 @@ Ecu = car.CarParams.Ecu ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()} -class FakeSocket: - def receive(self, non_blocking=False): - return (can_list_to_can_capnp([make_can_msg(random.randint(0x600, 0x800), b'\x00' * 8, 0)]) - if random.uniform(0, 1) > 0.5 else None) - - def send(self, msg): - pass - - class TestFwFingerprint: def assertFingerprints(self, candidates, expected): candidates = list(candidates) @@ -206,6 +197,11 @@ class TestFwFingerprint: assert get_brand_ecu_matches({(0x758, 0xf, 99)}) == expected_response +def fake_can_drain(wait_for_one: bool = False) -> list[list[CanData]]: + return ([[CanData(random.randint(0x600, 0x800), b'\x00' * 8, 0)]] + if random.uniform(0, 1) > 0.5 else []) + + class TestFwFingerprintTiming: N: int = 5 TOL: float = 0.05 @@ -214,6 +210,12 @@ class TestFwFingerprintTiming: current_obd_multiplexing: bool total_time: float + fake_logcan = SimpleNamespace(drain=fake_can_drain) + + @staticmethod + def fake_can_send(msgs): + pass + def fake_set_obd_multiplexing(self, obd_multiplexing): """The 10Hz blocking params loop adds on average 50ms to the query time for each OBD multiplexing change""" if obd_multiplexing != self.current_obd_multiplexing: @@ -225,7 +227,6 @@ class TestFwFingerprintTiming: return {} def _benchmark_brand(self, brand, num_pandas, mocker): - fake_socket = FakeSocket() self.total_time = 0 mocker.patch("openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data", self.fake_get_data) for _ in range(self.N): @@ -233,7 +234,7 @@ class TestFwFingerprintTiming: self.current_obd_multiplexing = True t = time.perf_counter() - get_fw_versions(fake_socket, fake_socket, self.fake_set_obd_multiplexing, brand, num_pandas=num_pandas) + get_fw_versions(self.fake_logcan, self.fake_can_send, self.fake_set_obd_multiplexing, brand, num_pandas=num_pandas) self.total_time += time.perf_counter() - t return self.total_time / self.N @@ -251,12 +252,11 @@ class TestFwFingerprintTiming: self.total_time += timeout return set() - fake_socket = FakeSocket() self.total_time = 0.0 mocker.patch("openpilot.selfdrive.car.fw_versions.get_ecu_addrs", fake_get_ecu_addrs) for _ in range(self.N): self.current_obd_multiplexing = True - get_present_ecus(fake_socket, fake_socket, self.fake_set_obd_multiplexing, num_pandas=2) + get_present_ecus(self.fake_logcan, self.fake_can_send, self.fake_set_obd_multiplexing, num_pandas=2) self._assert_timing(self.total_time / self.N, present_ecu_ref_time) print(f'get_present_ecus, query time={self.total_time / self.N} seconds') @@ -265,7 +265,7 @@ class TestFwFingerprintTiming: self.total_time = 0.0 mocker.patch("openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data", self.fake_get_data) for _ in range(self.N): - get_vin(fake_socket, fake_socket, (0, 1), **args) + get_vin(self.fake_logcan, self.fake_can_send, (0, 1), **args) self._assert_timing(self.total_time / self.N, vin_ref_times[name]) print(f'get_vin {name} case, query time={self.total_time / self.N} seconds') @@ -324,8 +324,7 @@ class TestFwFingerprintTiming: raise mocker.patch("openpilot.selfdrive.car.carlog.exception", fake_carlog_exception) - fake_socket = FakeSocket() - get_fw_versions_ordered(fake_socket, fake_socket, lambda obd: None, '0' * 17, set()) + get_fw_versions_ordered(self.fake_logcan, self.fake_can_send, lambda obd: None, '0' * 17, set()) for brand in FW_QUERY_CONFIGS.keys(): with subtests.test(brand=brand): - get_fw_versions(fake_socket, fake_socket, lambda obd: None, brand) + get_fw_versions(self.fake_logcan, self.fake_can_send, lambda obd: None, brand)