FW query timing test: mock get_data function for timeout (#29712)

* use a mock function to simulate timeout

* Cleanup

* Cleanup

* clean refs

* tolerance can also go down

* fix

* better name

revert refs

* use unittest

* revert

* Revert "use unittest"

This reverts commit 7a1d6a6fc7.

* order

* local

* use a context

* revert

* stress test it

* let's try

* it's consistent

---------

Co-authored-by: Shane Smiskol <shane@smiskol.com>
pull/30576/head
Justin Newberry 1 year ago committed by GitHub
parent b0831fb117
commit 4028cb6121
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 24
      selfdrive/car/tests/test_fw_fingerprint.py

@ -5,6 +5,7 @@ import time
import unittest import unittest
from collections import defaultdict from collections import defaultdict
from parameterized import parameterized from parameterized import parameterized
from unittest import mock
import threading import threading
from cereal import car from cereal import car
@ -176,7 +177,7 @@ class TestFwFingerprint(unittest.TestCase):
class TestFwFingerprintTiming(unittest.TestCase): class TestFwFingerprintTiming(unittest.TestCase):
N: int = 5 N: int = 5
TOL: float = 0.12 TOL: float = 0.1
@staticmethod @staticmethod
def _run_thread(thread: threading.Thread) -> float: def _run_thread(thread: threading.Thread) -> float:
@ -191,12 +192,19 @@ class TestFwFingerprintTiming(unittest.TestCase):
return time.perf_counter() - t return time.perf_counter() - t
def _benchmark_brand(self, brand, num_pandas): def _benchmark_brand(self, brand, num_pandas):
fake_socket = FakeSocket() def fake_get_data(_, timeout):
brand_time = 0 nonlocal fake_timeout_time
for _ in range(self.N): fake_timeout_time += timeout
thread = threading.Thread(target=get_fw_versions, args=(fake_socket, fake_socket, brand), return {}
kwargs=dict(num_pandas=num_pandas))
brand_time += self._run_thread(thread) with mock.patch("openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data", fake_get_data):
fake_socket = FakeSocket()
brand_time = 0
for _ in range(self.N):
fake_timeout_time = 0
thread = threading.Thread(target=get_fw_versions, args=(fake_socket, fake_socket, brand),
kwargs=dict(num_pandas=num_pandas))
brand_time += self._run_thread(thread) + fake_timeout_time
return brand_time / self.N return brand_time / self.N
@ -227,7 +235,7 @@ class TestFwFingerprintTiming(unittest.TestCase):
@pytest.mark.timeout(60) @pytest.mark.timeout(60)
def test_fw_query_timing(self): def test_fw_query_timing(self):
total_ref_time = 6.41 total_ref_time = 6.58
brand_ref_times = { brand_ref_times = {
1: { 1: {
'body': 0.11, 'body': 0.11,

Loading…
Cancel
Save