@ -1,13 +1,16 @@
#!/usr/bin/env python3
import random
import time
import unittest
from collections import defaultdict
from parameterized import parameterized
import threading
from cereal import car
from common . params import Params
from selfdrive . car . car_helpers import interfaces
from selfdrive . car . fingerprints import FW_VERSIONS
from selfdrive . car . fw_versions import FW_QUERY_CONFIGS , VERSIONS , match_fw_to_car
from selfdrive . car . fw_versions import FW_QUERY_CONFIGS , VERSIONS , match_fw_to_car , get_fw_versions
CarFw = car . CarParams . CarFw
Ecu = car . CarParams . Ecu
@ -15,6 +18,14 @@ Ecu = car.CarParams.Ecu
ECU_NAME = { v : k for k , v in Ecu . schema . enumerants . items ( ) }
class FakeSocket :
def receive ( self , non_blocking = False ) :
pass
def send ( self , msg ) :
pass
class TestFwFingerprint ( unittest . TestCase ) :
def assertFingerprints ( self , candidates , expected ) :
candidates = list ( candidates )
@ -103,5 +114,58 @@ class TestFwFingerprint(unittest.TestCase):
f ' { brand . title ( ) } : FW query whitelist missing ecus: { ecu_strings } ' )
class TestFwFingerprintTiming ( unittest . TestCase ) :
def _benchmark ( self , brand , num_pandas , ref_time , tol , n ) :
params = Params ( )
fake_socket = FakeSocket ( )
times = [ ]
for _ in range ( n ) :
params . put_bool ( " ObdMultiplexingEnabled " , True )
thread = threading . Thread ( target = get_fw_versions , args = ( fake_socket , fake_socket , brand ) , kwargs = dict ( num_pandas = num_pandas ) )
thread . start ( )
t = time . perf_counter ( )
while thread . is_alive ( ) :
time . sleep ( 0.02 )
if not params . get_bool ( " ObdMultiplexingChanged " ) :
params . put_bool ( " ObdMultiplexingChanged " , True )
times . append ( time . perf_counter ( ) - t )
avg_time = round ( sum ( times ) / len ( times ) , 2 )
self . assertLess ( avg_time , ref_time + tol )
self . assertGreater ( avg_time , ref_time - tol , " Performance seems to have improved, update test refs. " )
return avg_time
@parameterized . expand ( [ ( 1 , ) , ( 2 , ) , ] )
def test_fw_query_timing ( self , num_pandas ) :
brand_ref_times = {
1 : {
' body ' : 0.1 ,
' chrysler ' : 0.3 ,
' ford ' : 0.2 ,
' honda ' : 0.5 ,
' hyundai ' : 0.7 ,
' mazda ' : 0.1 ,
' nissan ' : 0.3 ,
' subaru ' : 0.1 ,
' tesla ' : 0.2 ,
' toyota ' : 0.7 ,
' volkswagen ' : 0.2 ,
} ,
2 : {
' hyundai ' : 1.1 ,
}
}
for brand , config in FW_QUERY_CONFIGS . items ( ) :
with self . subTest ( brand = brand , num_pandas = num_pandas ) :
multi_panda_requests = [ r for r in config . requests if r . bus > 3 ]
if not len ( multi_panda_requests ) and num_pandas > 1 :
raise unittest . SkipTest ( " No multi-panda FW queries " )
avg_time = self . _benchmark ( brand , num_pandas , brand_ref_times [ num_pandas ] [ brand ] , 0.1 , 10 )
print ( f ' { brand =} , { num_pandas =} , { len ( config . requests ) =} , avg FW query time= { avg_time } seconds ' )
if __name__ == " __main__ " :
unittest . main ( )