@ -1,5 +1,4 @@
import time
import time
import capnp
import os
import os
import pytest
import pytest
import random
import random
@ -67,7 +66,7 @@ class TestCarModelBase(unittest.TestCase):
platform : Platform | None = None
platform : Platform | None = None
test_route : CarTestRoute | None = None
test_route : CarTestRoute | None = None
can_msgs : list [ capnp . lib . capnp . _DynamicStructReader ]
can_msgs : list [ tuple [ int , list [ CanData ] ] ]
fingerprint : dict [ int , dict [ int , int ] ]
fingerprint : dict [ int , dict [ int , int ] ]
elm_frame : int | None
elm_frame : int | None
car_safety_mode_frame : int | None
car_safety_mode_frame : int | None
@ -82,7 +81,8 @@ class TestCarModelBase(unittest.TestCase):
experimental_long = False
experimental_long = False
for msg in lr :
for msg in lr :
if msg . which ( ) == " can " :
if msg . which ( ) == " can " :
can_msgs . append ( msg )
can = can_capnp_to_list ( ( msg . as_builder ( ) . to_bytes ( ) , ) ) [ 0 ]
can_msgs . append ( ( can [ 0 ] , [ CanData ( * can ) for can in can [ 1 ] ] ) )
if len ( can_msgs ) < = FRAME_FINGERPRINT :
if len ( can_msgs ) < = FRAME_FINGERPRINT :
for m in msg . can :
for m in msg . can :
if m . src < 64 :
if m . src < 64 :
@ -146,13 +146,11 @@ class TestCarModelBase(unittest.TestCase):
raise unittest . SkipTest
raise unittest . SkipTest
raise Exception ( f " missing test route for { cls . platform } " )
raise Exception ( f " missing test route for { cls . platform } " )
car_fw , can_msgs , experimental_long = cls . get_testing_data ( )
car_fw , cls . can_msgs , experimental_long = cls . get_testing_data ( )
# if relay is expected to be open in the route
# if relay is expected to be open in the route
cls . openpilot_enabled = cls . car_safety_mode_frame is not None
cls . openpilot_enabled = cls . car_safety_mode_frame is not None
cls . can_msgs = sorted ( can_msgs , key = lambda msg : msg . logMonoTime )
cls . CarInterface , cls . CarController , cls . CarState , cls . RadarInterface = interfaces [ cls . platform ]
cls . CarInterface , cls . CarController , cls . CarState , cls . RadarInterface = interfaces [ cls . platform ]
cls . CP = cls . CarInterface . get_params ( cls . platform , cls . fingerprint , car_fw , experimental_long , docs = False )
cls . CP = cls . CarInterface . get_params ( cls . platform , cls . fingerprint , car_fw , experimental_long , docs = False )
assert cls . CP
assert cls . CP
@ -199,8 +197,8 @@ class TestCarModelBase(unittest.TestCase):
CC = structs . CarControl ( ) . as_reader ( )
CC = structs . CarControl ( ) . as_reader ( )
for i , msg in enumerate ( self . can_msgs ) :
for i , msg in enumerate ( self . can_msgs ) :
CS = self . CI . update ( can_capnp_to_list ( ( msg . as_builder ( ) . to_bytes ( ) , ) ) )
CS = self . CI . update ( msg )
self . CI . apply ( CC , msg . logMonoTime )
self . CI . apply ( CC , msg [ 0 ] )
if CS . canValid :
if CS . canValid :
can_valid = True
can_valid = True
@ -219,7 +217,7 @@ class TestCarModelBase(unittest.TestCase):
# start parsing CAN messages after we've left ELM mode and can expect CAN traffic
# start parsing CAN messages after we've left ELM mode and can expect CAN traffic
error_cnt = 0
error_cnt = 0
for i , msg in enumerate ( self . can_msgs [ self . elm_frame : ] ) :
for i , msg in enumerate ( self . can_msgs [ self . elm_frame : ] ) :
rr : structs . RadarData | None = RI . update ( can_capnp_to_list ( ( msg . as_builder ( ) . to_bytes ( ) , ) ) )
rr : structs . RadarData | None = RI . update ( msg )
if rr is not None and i > 50 :
if rr is not None and i > 50 :
error_cnt + = structs . RadarData . Error . canError in rr . errors
error_cnt + = structs . RadarData . Error . canError in rr . errors
self . assertEqual ( error_cnt , 0 )
self . assertEqual ( error_cnt , 0 )
@ -228,16 +226,16 @@ class TestCarModelBase(unittest.TestCase):
if self . CP . dashcamOnly :
if self . CP . dashcamOnly :
self . skipTest ( " no need to check panda safety for dashcamOnly " )
self . skipTest ( " no need to check panda safety for dashcamOnly " )
start_ts = self . can_msgs [ 0 ] . logMonoTime
start_ts = self . can_msgs [ 0 ] [ 0 ]
failed_addrs = Counter ( )
failed_addrs = Counter ( )
for can in self . can_msgs :
for can in self . can_msgs :
# update panda timer
# update panda timer
t = ( can . logMonoTime - start_ts ) / 1e3
t = ( can [ 0 ] - start_ts ) / 1e3
self . safety . set_timer ( int ( t ) )
self . safety . set_timer ( int ( t ) )
# run all msgs through the safety RX hook
# run all msgs through the safety RX hook
for msg in can . can :
for msg in can [ 1 ] :
if msg . src > = 64 :
if msg . src > = 64 :
continue
continue
@ -389,8 +387,8 @@ class TestCarModelBase(unittest.TestCase):
# warm up pass, as initial states may be different
# warm up pass, as initial states may be different
for can in self . can_msgs [ : 300 ] :
for can in self . can_msgs [ : 300 ] :
self . CI . update ( can_capnp_to_list ( ( can . as_builder ( ) . to_bytes ( ) , ) ) )
self . CI . update ( can )
for msg in filter ( lambda m : m . src in range ( 64 ) , can . can ) :
for msg in filter ( lambda m : m . src < 64 , can [ 1 ] ) :
to_send = libsafety_py . make_CANPacket ( msg . address , msg . src % 4 , msg . dat )
to_send = libsafety_py . make_CANPacket ( msg . address , msg . src % 4 , msg . dat )
self . safety . safety_rx_hook ( to_send )
self . safety . safety_rx_hook ( to_send )
@ -399,8 +397,8 @@ class TestCarModelBase(unittest.TestCase):
checks = defaultdict ( int )
checks = defaultdict ( int )
vehicle_speed_seen = self . CP . steerControlType == SteerControlType . angle and not self . CP . notCar
vehicle_speed_seen = self . CP . steerControlType == SteerControlType . angle and not self . CP . notCar
for idx , can in enumerate ( self . can_msgs ) :
for idx , can in enumerate ( self . can_msgs ) :
CS = self . CI . update ( can_capnp_to_list ( ( can . as_builder ( ) . to_bytes ( ) , ) ) ) . as_reader ( )
CS = self . CI . update ( can ) . as_reader ( )
for msg in filter ( lambda m : m . src in range ( 64 ) , can . can ) :
for msg in filter ( lambda m : m . src < 64 , can [ 1 ] ) :
to_send = libsafety_py . make_CANPacket ( msg . address , msg . src % 4 , msg . dat )
to_send = libsafety_py . make_CANPacket ( msg . address , msg . src % 4 , msg . dat )
ret = self . safety . safety_rx_hook ( to_send )
ret = self . safety . safety_rx_hook ( to_send )
self . assertEqual ( 1 , ret , f " safety rx failed ( { ret =} ): { ( msg . address , msg . src % 4 ) } " )
self . assertEqual ( 1 , ret , f " safety rx failed ( { ret =} ): { ( msg . address , msg . src % 4 ) } " )