@ -210,47 +210,24 @@ class TestCarModelBase(unittest.TestCase):
suppress_health_check = [ HealthCheck . filter_too_much , HealthCheck . too_slow , HealthCheck . large_base_example ] ,
)
@given ( data = st . data ( ) )
@seed ( 1 ) # for reproduction
# @seed(1) # for reproduction
def test_panda_safety_carstate_fuzzy ( self , data ) :
# raise unittest.SkipTest
# return
# TODO: how much of test_panda_safety_carstate can we re-use?
"""
Assert that panda safety matches openpilot ' s carState by fuzzing the CAN data
For each example , pick a random CAN message on the bus and fuzz its data ,
checking for panda state mismatches .
"""
# TODO: how much of test_panda_safety_carstate can we re-use?
if self . CP . dashcamOnly :
self . skipTest ( " no need to check panda safety for dashcamOnly " )
# cfg = self.CP.safetyConfigs[-1]
# set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam)
# self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}")
# self.safety.init_tests()
# bus = 0 # random.randint(0, 3)
bus_offset = CanBusBase ( None , fingerprint = self . fingerprint ) . offset
bus = bus_offset
# address = data.draw(st.sampled_from([i for i in self.fingerprint[0] if i < 0x700])) # random.randint(0x200, 0x300)
address = data . draw ( st . sampled_from ( [ i for i in self . fingerprint [ bus ] ] ) ) # random.randint(0x200, 0x300)
# addresses = [i for i in self.fingerprint[bus_offset]]
# weighted_address_strategy = st.sampled_from(sorted(addresses, key=lambda x: random.choices(addresses, weights=[(1 / (i + 1)) for i in range(len(addresses))])[0]))
# address = data.draw(weighted_address_strategy)
# if address not in self.fingerprint[bus_offset]:
# raise unittest.SkipTest
size = self . fingerprint [ bus ] [ address ]
print ( address , size )
# if self.CP.carFingerprint in TSS2_CAR:
# raise unittest.SkipTest
# print(self.fingerprint)
# address = data.draw(st.integers(0x201, 0x226))
# ORIG:
# msg_strategy = st.tuples(st.integers(min_value=0, max_value=0), st.integers(min_value=0x100, max_value=0x400), st.binary(min_size=8, max_size=8))
# msg_strategy = st.tuples(st.integers(min_value=0xaa, max_value=0xaa), st.binary(min_size=8, max_size=8))
valid_addrs = [ ( addr , bus ) for bus , addrs in self . fingerprint . items ( ) for addr in addrs ]
address , bus = data . draw ( st . sampled_from ( valid_addrs ) )
print ( ' addr, bus: ' , address , bus )
size = self . fingerprint [ bus ] [ address ]
msg_strategy = st . binary ( min_size = size , max_size = size )
msgs = data . draw ( st . lists ( msg_strategy , min_size = 20 ) )
# time.sleep(8)
# print(len(msgs))
prev_panda_gas = self . safety . get_gas_pressed_prev ( )
prev_panda_brake = self . safety . get_brake_pressed_prev ( )
@ -262,31 +239,20 @@ class TestCarModelBase(unittest.TestCase):
start_gas = self . safety . get_gas_pressed_prev ( )
start_gas_int_detected = self . safety . get_gas_interceptor_detected ( )
# for bus, address, dat in msgs:
# since all toyotas can detect fake interceptor, but we want to test PCM gas too
for dat in msgs :
# set interceptor detected so we don't accidentally trigger gas_pressed with other message
# set interceptor detected so we don't accidentally trigger gas_pressed with other messages
self . safety . set_gas_interceptor_detected ( self . CP . enableGasInterceptor )
# if not self.CP.enableGasInterceptor:
# self.safety.set_gas_interceptor_detected(False)
print ( )
# for i in range(100):
to_send = libpanda_py . make_CANPacket ( address , bus , dat )
did_rx = self . safety . safety_rx_hook ( to_send )
can = messaging . new_message ( ' can ' , 1 )
can . can = [ log . CanData ( address = address , dat = dat , src = bus ) ]
# del can.can[0]
# del can.can
print ( ' rxing ' , dict ( address = address , dat = dat , src = bus ) )
# continue
CC = car . CarControl . new_message ( )
CS = self . CI . update ( CC , ( can . to_bytes ( ) , ) )
# del can
# del CC
# continue
# test multiple CAN packets as well as multiple messages per CAN packet
# for (_dat1, _dat2) in ((dat1, dat2), (dat3, dat4)):
@ -300,7 +266,7 @@ class TestCarModelBase(unittest.TestCase):
if self . safety . get_gas_pressed_prev ( ) :
self . init_gas_pressed = True
# due to panda updating state selectively, per message, we can only compare on a change
# due to panda updating state selectively, only edges are expected to match
# print('ret.gas', CS.gas, 'safety gas', self.safety.get_gas_interceptor_prev())
# print('both', CS.gasPressed, self.safety.get_gas_pressed_prev(), 'int')
@ -310,18 +276,18 @@ class TestCarModelBase(unittest.TestCase):
print ( ' both ' , CS . gasPressed , self . safety . get_gas_pressed_prev ( ) , ' int ' )
print ( ' get_gas_interceptor_detected! ' )
print ( ' can.can ' , can . can )
# self.assertEqual(CS.gasPressed, self.safety.get_gas_interceptor_prev())
self . assertEqual ( CS . gasPressed , self . safety . get_gas_pressed_prev ( ) )
# self.assertEqual(CS.gas, self.safety.get_gas_interceptor_prev())
# self.assertFalse(True)
# TODO: don't fully skip
if self . CP . carFingerprint not in ( HONDA . PILOT , HONDA . RIDGELINE ) :
if self . safety . get_brake_pressed_prev ( ) != prev_panda_brake :
brake_pressed = CS . brakePressed
if CS . brakePressed and not self . safety . get_brake_pressed_prev ( ) :
if self . CP . carFingerprint in ( HONDA . PILOT , HONDA . RIDGELINE ) and CS . brake > 0.05 :
brake_pressed = False
print ( ' both ' , CS . brakePressed , ' safety brake ' , self . safety . get_brake_pressed_prev ( ) )
if self . safety . get_brake_pressed_prev ( ) != prev_panda_brake :
# print('brake change!')
# print('both', CS.brakePressed, self.safety.get_brake_pressed_prev())
self . assertEqual ( CS . brakePressed , self . safety . get_brake_pressed_prev ( ) )
# print('brake change!')
# print('both', CS.brakePressed, self.safety.get_brake_pressed_prev())
self . assertEqual ( brake_pressed , self . safety . get_brake_pressed_prev ( ) )
if self . safety . get_regen_braking_prev ( ) != prev_panda_regen_braking :
print ( ' regen change! ' )