@ -82,7 +82,6 @@ class TestCarModelBase(unittest.TestCase):
raise unittest . SkipTest
raise Exception ( f " missing test route for { cls . car_model } " )
experimental_long = False
test_segs = ( 2 , 1 , 0 )
if cls . test_route . segment is not None :
test_segs = ( cls . test_route . segment , )
@ -102,6 +101,9 @@ class TestCarModelBase(unittest.TestCase):
car_fw = [ ]
can_msgs = [ ]
fingerprint = defaultdict ( dict )
experimental_long = False
enabled_toggle = True
dashcam_only = False
for msg in lr :
if msg . which ( ) == " can " :
for m in msg . can :
@ -110,16 +112,24 @@ class TestCarModelBase(unittest.TestCase):
can_msgs . append ( msg )
elif msg . which ( ) == " carParams " :
car_fw = msg . carParams . carFw
dashcam_only = msg . carParams . dashcamOnly
if msg . carParams . openpilotLongitudinalControl :
experimental_long = True
if cls . car_model is None and not cls . ci :
cls . car_model = msg . carParams . carFingerprint
elif msg . which ( ) == ' initData ' :
for param in msg . initData . params . entries :
if param . key == ' OpenpilotEnabledToggle ' :
enabled_toggle = param . value . strip ( b ' \x00 ' ) == b ' 1 '
if len ( can_msgs ) > int ( 50 / DT_CTRL ) :
break
else :
raise Exception ( f " Route: { repr ( cls . test_route . route ) } with segments: { test_segs } not found or no CAN msgs found. Is it uploaded? " )
# if relay is expected to be open in the route
cls . openpilot_enabled = enabled_toggle and not dashcam_only
cls . can_msgs = sorted ( can_msgs , key = lambda msg : msg . logMonoTime )
cls . CarInterface , cls . CarController , cls . CarState = interfaces [ cls . car_model ]
@ -219,6 +229,14 @@ class TestCarModelBase(unittest.TestCase):
self . safety . safety_tick_current_rx_checks ( )
if t > 1e6 :
self . assertTrue ( self . safety . addr_checks_valid ( ) )
# No need to check relay malfunction on disabled routes (relay closed) or for reasonable fingerprinting time
# TODO: detect when relay has flipped to properly check relay malfunction
if self . openpilot_enabled and t > 5e6 :
self . assertFalse ( self . safety . get_relay_malfunction ( ) )
else :
self . safety . set_relay_malfunction ( False )
self . assertFalse ( len ( failed_addrs ) , f " panda safety RX check failed: { failed_addrs } " )
def test_panda_safety_tx_cases ( self , data = None ) :