diff --git a/selfdrive/car/tests/test_fw_fingerprint.py b/selfdrive/car/tests/test_fw_fingerprint.py index 96e10f1fe6..130f0783a9 100755 --- a/selfdrive/car/tests/test_fw_fingerprint.py +++ b/selfdrive/car/tests/test_fw_fingerprint.py @@ -1,13 +1,16 @@ #!/usr/bin/env python3 import random +import time import unittest from collections import defaultdict from parameterized import parameterized +import threading from cereal import car +from common.params import Params from selfdrive.car.car_helpers import interfaces from selfdrive.car.fingerprints import FW_VERSIONS -from selfdrive.car.fw_versions import FW_QUERY_CONFIGS, VERSIONS, match_fw_to_car +from selfdrive.car.fw_versions import FW_QUERY_CONFIGS, VERSIONS, match_fw_to_car, get_fw_versions CarFw = car.CarParams.CarFw Ecu = car.CarParams.Ecu @@ -15,6 +18,14 @@ Ecu = car.CarParams.Ecu ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()} +class FakeSocket: + def receive(self, non_blocking=False): + pass + + def send(self, msg): + pass + + class TestFwFingerprint(unittest.TestCase): def assertFingerprints(self, candidates, expected): candidates = list(candidates) @@ -103,5 +114,58 @@ class TestFwFingerprint(unittest.TestCase): f'{brand.title()}: FW query whitelist missing ecus: {ecu_strings}') +class TestFwFingerprintTiming(unittest.TestCase): + def _benchmark(self, brand, num_pandas, ref_time, tol, n): + params = Params() + fake_socket = FakeSocket() + + times = [] + for _ in range(n): + params.put_bool("ObdMultiplexingEnabled", True) + thread = threading.Thread(target=get_fw_versions, args=(fake_socket, fake_socket, brand), kwargs=dict(num_pandas=num_pandas)) + thread.start() + t = time.perf_counter() + while thread.is_alive(): + time.sleep(0.02) + if not params.get_bool("ObdMultiplexingChanged"): + params.put_bool("ObdMultiplexingChanged", True) + times.append(time.perf_counter() - t) + + avg_time = round(sum(times) / len(times), 2) + self.assertLess(avg_time, ref_time + tol) + self.assertGreater(avg_time, ref_time - tol, "Performance seems to have improved, update test refs.") + return avg_time + + @parameterized.expand([(1,), (2,), ]) + def test_fw_query_timing(self, num_pandas): + brand_ref_times = { + 1: { + 'body': 0.1, + 'chrysler': 0.3, + 'ford': 0.2, + 'honda': 0.5, + 'hyundai': 0.7, + 'mazda': 0.1, + 'nissan': 0.3, + 'subaru': 0.1, + 'tesla': 0.2, + 'toyota': 0.7, + 'volkswagen': 0.2, + }, + 2: { + 'hyundai': 1.1, + } + } + + for brand, config in FW_QUERY_CONFIGS.items(): + with self.subTest(brand=brand, num_pandas=num_pandas): + multi_panda_requests = [r for r in config.requests if r.bus > 3] + if not len(multi_panda_requests) and num_pandas > 1: + raise unittest.SkipTest("No multi-panda FW queries") + + avg_time = self._benchmark(brand, num_pandas, brand_ref_times[num_pandas][brand], 0.1, 10) + print(f'{brand=}, {num_pandas=}, {len(config.requests)=}, avg FW query time={avg_time} seconds') + + if __name__ == "__main__": unittest.main()