|
|
|
@ -3,15 +3,15 @@ import random |
|
|
|
|
import time |
|
|
|
|
from collections import defaultdict |
|
|
|
|
from parameterized import parameterized |
|
|
|
|
from types import SimpleNamespace |
|
|
|
|
|
|
|
|
|
from cereal import car |
|
|
|
|
from openpilot.selfdrive.car import make_can_msg |
|
|
|
|
from openpilot.selfdrive.car.can_definitions import CanData |
|
|
|
|
from openpilot.selfdrive.car.car_helpers import interfaces |
|
|
|
|
from openpilot.selfdrive.car.fingerprints import FW_VERSIONS |
|
|
|
|
from openpilot.selfdrive.car.fw_versions import ESSENTIAL_ECUS, FW_QUERY_CONFIGS, FUZZY_EXCLUDE_ECUS, VERSIONS, build_fw_dict, \ |
|
|
|
|
match_fw_to_car, get_brand_ecu_matches, get_fw_versions, get_fw_versions_ordered, get_present_ecus |
|
|
|
|
from openpilot.selfdrive.car.vin import get_vin |
|
|
|
|
from openpilot.selfdrive.pandad import can_list_to_can_capnp |
|
|
|
|
|
|
|
|
|
CarFw = car.CarParams.CarFw |
|
|
|
|
Ecu = car.CarParams.Ecu |
|
|
|
@ -19,15 +19,6 @@ Ecu = car.CarParams.Ecu |
|
|
|
|
ECU_NAME = {v: k for k, v in Ecu.schema.enumerants.items()} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FakeSocket: |
|
|
|
|
def receive(self, non_blocking=False): |
|
|
|
|
return (can_list_to_can_capnp([make_can_msg(random.randint(0x600, 0x800), b'\x00' * 8, 0)]) |
|
|
|
|
if random.uniform(0, 1) > 0.5 else None) |
|
|
|
|
|
|
|
|
|
def send(self, msg): |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestFwFingerprint: |
|
|
|
|
def assertFingerprints(self, candidates, expected): |
|
|
|
|
candidates = list(candidates) |
|
|
|
@ -206,6 +197,11 @@ class TestFwFingerprint: |
|
|
|
|
assert get_brand_ecu_matches({(0x758, 0xf, 99)}) == expected_response |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fake_can_drain(wait_for_one: bool = False) -> list[list[CanData]]: |
|
|
|
|
return ([[CanData(random.randint(0x600, 0x800), b'\x00' * 8, 0)]] |
|
|
|
|
if random.uniform(0, 1) > 0.5 else []) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestFwFingerprintTiming: |
|
|
|
|
N: int = 5 |
|
|
|
|
TOL: float = 0.05 |
|
|
|
@ -214,6 +210,12 @@ class TestFwFingerprintTiming: |
|
|
|
|
current_obd_multiplexing: bool |
|
|
|
|
total_time: float |
|
|
|
|
|
|
|
|
|
fake_logcan = SimpleNamespace(drain=fake_can_drain) |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def fake_can_send(msgs): |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
def fake_set_obd_multiplexing(self, obd_multiplexing): |
|
|
|
|
"""The 10Hz blocking params loop adds on average 50ms to the query time for each OBD multiplexing change""" |
|
|
|
|
if obd_multiplexing != self.current_obd_multiplexing: |
|
|
|
@ -225,7 +227,6 @@ class TestFwFingerprintTiming: |
|
|
|
|
return {} |
|
|
|
|
|
|
|
|
|
def _benchmark_brand(self, brand, num_pandas, mocker): |
|
|
|
|
fake_socket = FakeSocket() |
|
|
|
|
self.total_time = 0 |
|
|
|
|
mocker.patch("openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data", self.fake_get_data) |
|
|
|
|
for _ in range(self.N): |
|
|
|
@ -233,7 +234,7 @@ class TestFwFingerprintTiming: |
|
|
|
|
self.current_obd_multiplexing = True |
|
|
|
|
|
|
|
|
|
t = time.perf_counter() |
|
|
|
|
get_fw_versions(fake_socket, fake_socket, self.fake_set_obd_multiplexing, brand, num_pandas=num_pandas) |
|
|
|
|
get_fw_versions(self.fake_logcan, self.fake_can_send, self.fake_set_obd_multiplexing, brand, num_pandas=num_pandas) |
|
|
|
|
self.total_time += time.perf_counter() - t |
|
|
|
|
|
|
|
|
|
return self.total_time / self.N |
|
|
|
@ -251,12 +252,11 @@ class TestFwFingerprintTiming: |
|
|
|
|
self.total_time += timeout |
|
|
|
|
return set() |
|
|
|
|
|
|
|
|
|
fake_socket = FakeSocket() |
|
|
|
|
self.total_time = 0.0 |
|
|
|
|
mocker.patch("openpilot.selfdrive.car.fw_versions.get_ecu_addrs", fake_get_ecu_addrs) |
|
|
|
|
for _ in range(self.N): |
|
|
|
|
self.current_obd_multiplexing = True |
|
|
|
|
get_present_ecus(fake_socket, fake_socket, self.fake_set_obd_multiplexing, num_pandas=2) |
|
|
|
|
get_present_ecus(self.fake_logcan, self.fake_can_send, self.fake_set_obd_multiplexing, num_pandas=2) |
|
|
|
|
self._assert_timing(self.total_time / self.N, present_ecu_ref_time) |
|
|
|
|
print(f'get_present_ecus, query time={self.total_time / self.N} seconds') |
|
|
|
|
|
|
|
|
@ -265,7 +265,7 @@ class TestFwFingerprintTiming: |
|
|
|
|
self.total_time = 0.0 |
|
|
|
|
mocker.patch("openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data", self.fake_get_data) |
|
|
|
|
for _ in range(self.N): |
|
|
|
|
get_vin(fake_socket, fake_socket, (0, 1), **args) |
|
|
|
|
get_vin(self.fake_logcan, self.fake_can_send, (0, 1), **args) |
|
|
|
|
self._assert_timing(self.total_time / self.N, vin_ref_times[name]) |
|
|
|
|
print(f'get_vin {name} case, query time={self.total_time / self.N} seconds') |
|
|
|
|
|
|
|
|
@ -324,8 +324,7 @@ class TestFwFingerprintTiming: |
|
|
|
|
raise |
|
|
|
|
|
|
|
|
|
mocker.patch("openpilot.selfdrive.car.carlog.exception", fake_carlog_exception) |
|
|
|
|
fake_socket = FakeSocket() |
|
|
|
|
get_fw_versions_ordered(fake_socket, fake_socket, lambda obd: None, '0' * 17, set()) |
|
|
|
|
get_fw_versions_ordered(self.fake_logcan, self.fake_can_send, lambda obd: None, '0' * 17, set()) |
|
|
|
|
for brand in FW_QUERY_CONFIGS.keys(): |
|
|
|
|
with subtests.test(brand=brand): |
|
|
|
|
get_fw_versions(fake_socket, fake_socket, lambda obd: None, brand) |
|
|
|
|
get_fw_versions(self.fake_logcan, self.fake_can_send, lambda obd: None, brand) |
|
|
|
|