@ -9,7 +9,7 @@ from collections import defaultdict, Counter
from typing import List , Optional , Tuple
from typing import List , Optional , Tuple
from parameterized import parameterized_class
from parameterized import parameterized_class
import hypothesis . strategies as st
import hypothesis . strategies as st
from hypothesis import HealthCheck , Phase , assume , given , settings
from hypothesis import HealthCheck , Phase , assume , given , settings , seed
from cereal import messaging , log , car
from cereal import messaging , log , car
from openpilot . common . basedir import BASEDIR
from openpilot . common . basedir import BASEDIR
@ -190,6 +190,7 @@ class TestCarModelBase(unittest.TestCase):
suppress_health_check = [ HealthCheck . filter_too_much , HealthCheck . too_slow , HealthCheck . large_base_example ] ,
suppress_health_check = [ HealthCheck . filter_too_much , HealthCheck . too_slow , HealthCheck . large_base_example ] ,
)
)
@given ( data = st . data ( ) )
@given ( data = st . data ( ) )
@seed ( 0 ) # for reproduction
def test_panda_safety_carstate_fuzzy ( self , data ) :
def test_panda_safety_carstate_fuzzy ( self , data ) :
state_has_changed = lambda prev_state , new_state : prev_state != new_state
state_has_changed = lambda prev_state , new_state : prev_state != new_state
# cfg = self.CP.safetyConfigs[-1]
# cfg = self.CP.safetyConfigs[-1]
@ -198,7 +199,11 @@ class TestCarModelBase(unittest.TestCase):
# self.safety.init_tests()
# self.safety.init_tests()
# bus = 0 # random.randint(0, 3)
# bus = 0 # random.randint(0, 3)
address = data . draw ( st . sampled_from ( [ i for i in self . fingerprint [ 0 ] ] ) ) # random.randint(0x200, 0x300)
# address = data.draw(st.sampled_from([i for i in self.fingerprint[0] if i < 0x700])) # random.randint(0x200, 0x300)
address = 380 # data.draw(st.sampled_from([i for i in self.fingerprint[0]])) # random.randint(0x200, 0x300)
# addresses = [i for i in self.fingerprint[0]]
# weighted_address_strategy = st.sampled_from(sorted(addresses, key=lambda x: random.choices(addresses, weights=[(1 / (i + 1)) for i in range(len(addresses))])[0]))
# address = data.draw(weighted_address_strategy)
size = self . fingerprint [ 0 ] [ address ]
size = self . fingerprint [ 0 ] [ address ]
print ( address , size )
print ( address , size )
# print(self.fingerprint)
# print(self.fingerprint)
@ -211,7 +216,7 @@ class TestCarModelBase(unittest.TestCase):
# msg_strategy = st.tuples(st.integers(min_value=0xaa, max_value=0xaa), st.binary(min_size=8, max_size=8))
# msg_strategy = st.tuples(st.integers(min_value=0xaa, max_value=0xaa), st.binary(min_size=8, max_size=8))
msg_strategy = st . binary ( min_size = size , max_size = size )
msg_strategy = st . binary ( min_size = size , max_size = size )
msgs = data . draw ( st . lists ( msg_strategy , min_size = 10 0) )
msgs = data . draw ( st . lists ( msg_strategy , min_size = 5 0) )
# print(len(msgs))
# print(len(msgs))
prev_panda_gas = self . safety . get_gas_pressed_prev ( )
prev_panda_gas = self . safety . get_gas_pressed_prev ( )
@ -226,15 +231,22 @@ class TestCarModelBase(unittest.TestCase):
# for bus, address, dat in msgs:
# for bus, address, dat in msgs:
# since all toyotas can detect fake interceptor, but we want to test PCM gas too
# since all toyotas can detect fake interceptor, but we want to test PCM gas too
for dat in msgs :
for dat1 , dat2 , dat3 , dat4 in zip ( * [ iter ( msgs ) ] * 4 ) :
# set interceptor detected so we don't accidentally trigger gas_pressed with other message
self . safety . set_gas_interceptor_detected ( self . CP . enableGasInterceptor )
# if not self.CP.enableGasInterceptor:
# if not self.CP.enableGasInterceptor:
# self.safety.set_gas_interceptor_detected(False)
# self.safety.set_gas_interceptor_detected(False)
print ( )
for dat in ( dat1 , dat2 , dat3 , dat4 ) :
to_send = libpanda_py . make_CANPacket ( address , bus , dat )
to_send = libpanda_py . make_CANPacket ( address , bus , dat )
did_rx = self . safety . safety_rx_hook ( to_send )
did_rx = self . safety . safety_rx_hook ( to_send )
can = messaging . new_message ( ' can ' , 1 )
# test multiple CAN packets as well as multiple messages per CAN packet
can . can = [ log . CanData ( address = address , dat = dat , src = bus ) ]
for ( _dat1 , _dat2 ) in ( ( dat1 , dat2 ) , ( dat3 , dat4 ) ) :
can = messaging . new_message ( ' can ' , 2 )
can . can = [ log . CanData ( address = address , dat = _dat1 , src = bus ) , log . CanData ( address = address , dat = _dat2 , src = bus ) ]
print ( ' rxing ' , dict ( address = address , _dat1 = _dat1 , _dat2 = _dat2 , src = bus ) )
CC = car . CarControl . new_message ( )
CC = car . CarControl . new_message ( )
CS = self . CI . update ( CC , ( can . to_bytes ( ) , ) )
CS = self . CI . update ( CC , ( can . to_bytes ( ) , ) )
@ -245,6 +257,8 @@ class TestCarModelBase(unittest.TestCase):
# due to panda updating state selectively, per message, we can only compare on a change
# due to panda updating state selectively, per message, we can only compare on a change
# if self.safety.get_gas_interceptor_detected():# and state_has_changed(start_gas, self.safety.get_gas_pressed_prev()):
# if self.safety.get_gas_interceptor_detected():# and state_has_changed(start_gas, self.safety.get_gas_pressed_prev()):
# print('ret.gas', CS.gas, 'safety gas', self.safety.get_gas_interceptor_prev())
print ( ' both ' , CS . brakePressed , ' safety brake ' , self . safety . get_brake_pressed_prev ( ) )
if self . safety . get_gas_pressed_prev ( ) != prev_panda_gas :
if self . safety . get_gas_pressed_prev ( ) != prev_panda_gas :
print ( )
print ( )
print ( ' ret.gas ' , CS . gas , ' safety gas ' , self . safety . get_gas_interceptor_prev ( ) )
print ( ' ret.gas ' , CS . gas , ' safety gas ' , self . safety . get_gas_interceptor_prev ( ) )
@ -253,7 +267,7 @@ class TestCarModelBase(unittest.TestCase):
print ( ' can.can ' , can . can )
print ( ' can.can ' , can . can )
# self.assertEqual(CS.gasPressed, self.safety.get_gas_interceptor_prev())
# self.assertEqual(CS.gasPressed, self.safety.get_gas_interceptor_prev())
self . assertEqual ( CS . gasPressed , self . safety . get_gas_pressed_prev ( ) )
self . assertEqual ( CS . gasPressed , self . safety . get_gas_pressed_prev ( ) )
self . assertEqual ( CS . gas , self . safety . get_gas_interceptor_prev ( ) )
# self.assertEqual(CS.gas, self.safety.get_gas_interceptor_prev() )
# self.assertFalse(True)
# self.assertFalse(True)
if self . safety . get_brake_pressed_prev ( ) != prev_panda_brake :
if self . safety . get_brake_pressed_prev ( ) != prev_panda_brake :
@ -269,6 +283,7 @@ class TestCarModelBase(unittest.TestCase):
if self . safety . get_vehicle_moving ( ) != prev_panda_vehicle_moving :
if self . safety . get_vehicle_moving ( ) != prev_panda_vehicle_moving :
self . assertEqual ( not CS . standstill , self . safety . get_vehicle_moving ( ) )
self . assertEqual ( not CS . standstill , self . safety . get_vehicle_moving ( ) )
if not ( self . CP . carName == " honda " and self . CP . carFingerprint not in HONDA_BOSCH ) :
if self . safety . get_cruise_engaged_prev ( ) != prev_panda_cruise_engaged :
if self . safety . get_cruise_engaged_prev ( ) != prev_panda_cruise_engaged :
self . assertEqual ( CS . cruiseState . enabled , self . safety . get_cruise_engaged_prev ( ) )
self . assertEqual ( CS . cruiseState . enabled , self . safety . get_cruise_engaged_prev ( ) )