diff --git a/selfdrive/car/tests/test_fw_fingerprint.py b/selfdrive/car/tests/test_fw_fingerprint.py index 4bf755537..ed79ca71b 100755 --- a/selfdrive/car/tests/test_fw_fingerprint.py +++ b/selfdrive/car/tests/test_fw_fingerprint.py @@ -161,31 +161,36 @@ class TestFwFingerprint(unittest.TestCase): class TestFwFingerprintTiming(unittest.TestCase): + N: int = 5 + TOL: float = 0.1 + @staticmethod - def _benchmark(brand, num_pandas, n): + def _run_thread(thread: threading.Thread) -> float: params = Params() + params.put_bool("ObdMultiplexingEnabled", True) + thread.start() + t = time.perf_counter() + while thread.is_alive(): + time.sleep(0.02) + if not params.get_bool("ObdMultiplexingChanged"): + params.put_bool("ObdMultiplexingChanged", True) + return time.perf_counter() - t + + def _benchmark_brand(self, brand, num_pandas): fake_socket = FakeSocket() + brand_time = 0 + for _ in range(self.N): + thread = threading.Thread(target=get_fw_versions, args=(fake_socket, fake_socket, brand), + kwargs=dict(num_pandas=num_pandas)) + brand_time += self._run_thread(thread) - times = [] - for _ in range(n): - params.put_bool("ObdMultiplexingEnabled", True) - thread = threading.Thread(target=get_fw_versions, args=(fake_socket, fake_socket, brand), kwargs=dict(num_pandas=num_pandas)) - thread.start() - t = time.perf_counter() - while thread.is_alive(): - time.sleep(0.02) - if not params.get_bool("ObdMultiplexingChanged"): - params.put_bool("ObdMultiplexingChanged", True) - times.append(time.perf_counter() - t) - - return round(sum(times) / len(times), 2) + return round(brand_time / self.N, 2) - def _assert_timing(self, avg_time, ref_time, tol): - self.assertLess(avg_time, ref_time + tol) - self.assertGreater(avg_time, ref_time - tol, "Performance seems to have improved, update test refs.") + def _assert_timing(self, avg_time, ref_time): + self.assertLess(avg_time, ref_time + self.TOL) + self.assertGreater(avg_time, ref_time - self.TOL, "Performance seems to have improved, update test refs.") def test_fw_query_timing(self): - tol = 0.1 total_ref_time = 6.1 brand_ref_times = { 1: { @@ -214,13 +219,13 @@ class TestFwFingerprintTiming(unittest.TestCase): if not len(multi_panda_requests) and num_pandas > 1: raise unittest.SkipTest("No multi-panda FW queries") - avg_time = self._benchmark(brand, num_pandas, 5) + avg_time = self._benchmark_brand(brand, num_pandas) total_time += avg_time - self._assert_timing(avg_time, brand_ref_times[num_pandas][brand], tol) + self._assert_timing(avg_time, brand_ref_times[num_pandas][brand]) print(f'{brand=}, {num_pandas=}, {len(config.requests)=}, avg FW query time={avg_time} seconds') with self.subTest(brand='all_brands'): - self._assert_timing(total_time, total_ref_time, tol) + self._assert_timing(total_time, total_ref_time) print(f'all brands, total FW query time={total_time} seconds')