use a mock function to simulate timeout

pull/29712/head
Justin Newberry 2 years ago
parent f31b4ad25e
commit a89837038e
  1. 36
      selfdrive/car/tests/test_fw_fingerprint.py

@ -3,6 +3,7 @@ import random
import time import time
import unittest import unittest
from collections import defaultdict from collections import defaultdict
from unittest import mock
from parameterized import parameterized from parameterized import parameterized
import threading import threading
@ -175,29 +176,36 @@ class TestFwFingerprint(unittest.TestCase):
class TestFwFingerprintTiming(unittest.TestCase): class TestFwFingerprintTiming(unittest.TestCase):
N: int = 5 N: int = 5
TOL: float = 0.1 TOL: float = 0.11
@staticmethod @staticmethod
def _run_thread(thread: threading.Thread) -> float: def _run_thread(thread: threading.Thread) -> float:
params = Params() params = Params()
params.put_bool("ObdMultiplexingEnabled", True) params.put_bool("ObdMultiplexingEnabled", True)
thread.start() thread.start()
t = time.perf_counter()
while thread.is_alive(): while thread.is_alive():
time.sleep(0.02) time.sleep(0.02)
if not params.get_bool("ObdMultiplexingChanged"): if not params.get_bool("ObdMultiplexingChanged"):
params.put_bool("ObdMultiplexingChanged", True) params.put_bool("ObdMultiplexingChanged", True)
return time.perf_counter() - t return
def fake_get_data(self, timeout):
self.timeout_total += timeout
return {}
def _benchmark_brand(self, brand, num_pandas): def _benchmark_brand(self, brand, num_pandas):
query_patch = mock.patch("openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data", self.fake_get_data)
query_patch.start()
fake_socket = FakeSocket() fake_socket = FakeSocket()
brand_time = 0 self.timeout_total = 0
for _ in range(self.N): thread = threading.Thread(target=get_fw_versions, args=(fake_socket, fake_socket, brand),
thread = threading.Thread(target=get_fw_versions, args=(fake_socket, fake_socket, brand), kwargs=dict(num_pandas=num_pandas))
kwargs=dict(num_pandas=num_pandas)) self._run_thread(thread)
brand_time += self._run_thread(thread) query_patch.stop()
return brand_time / self.N return self.timeout_total
def _assert_timing(self, avg_time, ref_time): def _assert_timing(self, avg_time, ref_time):
self.assertLess(avg_time, ref_time + self.TOL) self.assertLess(avg_time, ref_time + self.TOL)
@ -225,14 +233,14 @@ class TestFwFingerprintTiming(unittest.TestCase):
print(f'get_vin, query time={vin_time / self.N} seconds') print(f'get_vin, query time={vin_time / self.N} seconds')
def test_fw_query_timing(self): def test_fw_query_timing(self):
total_ref_time = 6.07 total_ref_time = 5.7
brand_ref_times = { brand_ref_times = {
1: { 1: {
'body': 0.11, 'body': 0.1,
'chrysler': 0.3, 'chrysler': 0.3,
'ford': 0.2, 'ford': 0.2,
'honda': 0.52, 'honda': 0.5,
'hyundai': 0.72, 'hyundai': 0.7,
'mazda': 0.2, 'mazda': 0.2,
'nissan': 0.4, 'nissan': 0.4,
'subaru': 0.2, 'subaru': 0.2,
@ -242,7 +250,7 @@ class TestFwFingerprintTiming(unittest.TestCase):
}, },
2: { 2: {
'ford': 0.3, 'ford': 0.3,
'hyundai': 1.12, 'hyundai': 1.1,
} }
} }

Loading…
Cancel
Save