@ -161,31 +161,36 @@ class TestFwFingerprint(unittest.TestCase):
class TestFwFingerprintTiming ( unittest . TestCase ) :
class TestFwFingerprintTiming ( unittest . TestCase ) :
N : int = 5
TOL : float = 0.1
@staticmethod
@staticmethod
def _benchmark ( brand , num_pandas , n ) :
def _run_thread ( thread : threading . Thread ) - > float :
params = Params ( )
params = Params ( )
params . put_bool ( " ObdMultiplexingEnabled " , True )
thread . start ( )
t = time . perf_counter ( )
while thread . is_alive ( ) :
time . sleep ( 0.02 )
if not params . get_bool ( " ObdMultiplexingChanged " ) :
params . put_bool ( " ObdMultiplexingChanged " , True )
return time . perf_counter ( ) - t
def _benchmark_brand ( self , brand , num_pandas ) :
fake_socket = FakeSocket ( )
fake_socket = FakeSocket ( )
brand_time = 0
for _ in range ( self . N ) :
thread = threading . Thread ( target = get_fw_versions , args = ( fake_socket , fake_socket , brand ) ,
kwargs = dict ( num_pandas = num_pandas ) )
brand_time + = self . _run_thread ( thread )
times = [ ]
return round ( brand_time / self . N , 2 )
for _ in range ( n ) :
params . put_bool ( " ObdMultiplexingEnabled " , True )
thread = threading . Thread ( target = get_fw_versions , args = ( fake_socket , fake_socket , brand ) , kwargs = dict ( num_pandas = num_pandas ) )
thread . start ( )
t = time . perf_counter ( )
while thread . is_alive ( ) :
time . sleep ( 0.02 )
if not params . get_bool ( " ObdMultiplexingChanged " ) :
params . put_bool ( " ObdMultiplexingChanged " , True )
times . append ( time . perf_counter ( ) - t )
return round ( sum ( times ) / len ( times ) , 2 )
def _assert_timing ( self , avg_time , ref_time , tol ) :
def _assert_timing ( self , avg_time , ref_time ) :
self . assertLess ( avg_time , ref_time + tol )
self . assertLess ( avg_time , ref_time + self . TOL )
self . assertGreater ( avg_time , ref_time - tol , " Performance seems to have improved, update test refs. " )
self . assertGreater ( avg_time , ref_time - self . TOL , " Performance seems to have improved, update test refs. " )
def test_fw_query_timing ( self ) :
def test_fw_query_timing ( self ) :
tol = 0.1
total_ref_time = 6.1
total_ref_time = 6.1
brand_ref_times = {
brand_ref_times = {
1 : {
1 : {
@ -214,13 +219,13 @@ class TestFwFingerprintTiming(unittest.TestCase):
if not len ( multi_panda_requests ) and num_pandas > 1 :
if not len ( multi_panda_requests ) and num_pandas > 1 :
raise unittest . SkipTest ( " No multi-panda FW queries " )
raise unittest . SkipTest ( " No multi-panda FW queries " )
avg_time = self . _benchmark ( brand , num_pandas , 5 )
avg_time = self . _benchmark_brand ( brand , num_pandas )
total_time + = avg_time
total_time + = avg_time
self . _assert_timing ( avg_time , brand_ref_times [ num_pandas ] [ brand ] , tol )
self . _assert_timing ( avg_time , brand_ref_times [ num_pandas ] [ brand ] )
print ( f ' { brand =} , { num_pandas =} , { len ( config . requests ) =} , avg FW query time= { avg_time } seconds ' )
print ( f ' { brand =} , { num_pandas =} , { len ( config . requests ) =} , avg FW query time= { avg_time } seconds ' )
with self . subTest ( brand = ' all_brands ' ) :
with self . subTest ( brand = ' all_brands ' ) :
self . _assert_timing ( total_time , total_ref_time , tol )
self . _assert_timing ( total_time , total_ref_time )
print ( f ' all brands, total FW query time= { total_time } seconds ' )
print ( f ' all brands, total FW query time= { total_time } seconds ' )