@ -5,10 +5,9 @@ import time
import unittest
from collections import defaultdict
from parameterized import parameterized
import threading
from unittest import mock
from cereal import car
from openpilot . common . params import Params
from openpilot . selfdrive . car . car_helpers import interfaces
from openpilot . selfdrive . car . fingerprints import FW_VERSIONS
from openpilot . selfdrive . car . fw_versions import FW_QUERY_CONFIGS , FUZZY_EXCLUDE_ECUS , VERSIONS , build_fw_dict , \
@ -176,29 +175,36 @@ class TestFwFingerprint(unittest.TestCase):
class TestFwFingerprintTiming ( unittest . TestCase ) :
N : int = 5
TOL : float = 0.12
TOL : float = 0.05
@staticmethod
def _run_thread ( thread : threading . Thread ) - > float :
params = Params ( )
params . put_bool ( " ObdMultiplexingEnabled " , True )
thread . start ( )
t = time . perf_counter ( )
while thread . is_alive ( ) :
time . sleep ( 0.02 )
if not params . get_bool ( " ObdMultiplexingChanged " ) :
params . put_bool ( " ObdMultiplexingChanged " , True )
return time . perf_counter ( ) - t
# for patched functions
current_obd_multiplexing : bool
total_time : float
def fake_set_obd_multiplexing ( self , _ , obd_multiplexing ) :
""" The 10Hz blocking params loop adds on average 50ms to the query time for each OBD multiplexing change """
if obd_multiplexing != self . current_obd_multiplexing :
self . current_obd_multiplexing = obd_multiplexing
self . total_time + = 0.1 / 2
def fake_get_data ( self , timeout ) :
self . total_time + = timeout
return { }
def _benchmark_brand ( self , brand , num_pandas ) :
fake_socket = FakeSocket ( )
brand_time = 0
self . total_time = 0
with ( mock . patch ( " openpilot.selfdrive.car.fw_versions.set_obd_multiplexing " , self . fake_set_obd_multiplexing ) ,
mock . patch ( " openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data " , self . fake_get_data ) ) :
for _ in range ( self . N ) :
thread = threading . Thread ( target = get_fw_versions , args = ( fake_socket , fake_socket , brand ) ,
kwargs = dict ( num_pandas = num_pandas ) )
brand_time + = self . _run_thread ( thread )
# Treat each brand as the most likely (aka, the first) brand with OBD multiplexing initially on
self . current_obd_multiplexing = True
return brand_time / self . N
t = time . perf_counter ( )
get_fw_versions ( fake_socket , fake_socket , brand , num_pandas = num_pandas )
self . total_time + = time . perf_counter ( ) - t
return self . total_time / self . N
def _assert_timing ( self , avg_time , ref_time ) :
self . assertLess ( avg_time , ref_time + self . TOL )
@ -207,44 +213,49 @@ class TestFwFingerprintTiming(unittest.TestCase):
def test_startup_timing ( self ) :
# Tests worse-case VIN query time and typical present ECU query time
vin_ref_time = 1.0
present_ecu_ref_time = 0.8
present_ecu_ref_time = 0.75
def fake_get_ecu_addrs ( * _ , timeout ) :
self . total_time + = timeout
return set ( )
fake_socket = FakeSocket ( )
present_ecu_time = 0.0
self . total_time = 0.0
with ( mock . patch ( " openpilot.selfdrive.car.fw_versions.set_obd_multiplexing " , self . fake_set_obd_multiplexing ) ,
mock . patch ( " openpilot.selfdrive.car.fw_versions.get_ecu_addrs " , fake_get_ecu_addrs ) ) :
for _ in range ( self . N ) :
thread = threading . Thread ( target = get_present_ecus , args = ( fake_socket , fake_socket ) ,
kwargs = dict ( num_pandas = 2 ) )
present_ecu_time + = self . _run_thread ( thread )
self . _assert_timing ( present_ecu_time / self . N , present_ecu_ref_time )
print ( f ' get_present_ecus, query time= { present_ecu_time / self . N } seconds ' )
self . current_obd_multiplexing = True
get_present_ecus ( fake_socket , fake_socket , num_pandas = 2 )
self . _assert_timing ( self . total_time / self . N , present_ecu_ref_time )
print ( f ' get_present_ecus, query time= { self . total_time / self . N } seconds ' )
vin_time = 0.0
self . total_time = 0.0
with ( mock . patch ( " openpilot.selfdrive.car.isotp_parallel_query.IsoTpParallelQuery.get_data " , self . fake_get_data ) ) :
for _ in range ( self . N ) :
thread = threading . Thread ( target = get_vin , args = ( fake_socket , fake_socket , 1 ) )
vin_time + = self . _run_thread ( thread )
self . _assert_timing ( vin_time / self . N , vin_ref_time )
print ( f ' get_vin, query time= { vin_time / self . N } seconds ' )
get_vin ( fake_socket , fake_socket , 1 )
self . _assert_timing ( self . total_time / self . N , vin_ref_time )
print ( f ' get_vin, query time= { self . total_time / self . N } seconds ' )
@pytest . mark . timeout ( 60 )
def test_fw_query_timing ( self ) :
total_ref_time = 6.4 1
total_ref_time = 6.1
brand_ref_times = {
1 : {
' body ' : 0.11 ,
' body ' : 0.1 ,
' chrysler ' : 0.3 ,
' ford ' : 0.2 ,
' honda ' : 0.52 ,
' hyundai ' : 0.72 ,
' honda ' : 0.4 5 ,
' hyundai ' : 0.65 ,
' mazda ' : 0.2 ,
' nissan ' : 0.4 ,
' subaru ' : 0.52 ,
' subaru ' : 0.4 5 ,
' tesla ' : 0.2 ,
' toyota ' : 1.6 ,
' volkswagen ' : 0.2 ,
} ,
2 : {
' ford ' : 0.3 ,
' hyundai ' : 1.12 ,
' hyundai ' : 1.05 ,
}
}