|
|
|
@ -8,6 +8,8 @@ import unittest |
|
|
|
|
from collections import defaultdict, Counter |
|
|
|
|
from typing import List, Optional, Tuple |
|
|
|
|
from parameterized import parameterized_class |
|
|
|
|
import hypothesis.strategies as st |
|
|
|
|
from hypothesis import HealthCheck, Phase, assume, given, settings |
|
|
|
|
|
|
|
|
|
from cereal import messaging, log, car |
|
|
|
|
from openpilot.common.basedir import BASEDIR |
|
|
|
@ -166,6 +168,7 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
del cls.can_msgs |
|
|
|
|
|
|
|
|
|
def setUp(self): |
|
|
|
|
# print('SETUP HEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHERE') |
|
|
|
|
self.CI = self.CarInterface(self.CP.copy(), self.CarController, self.CarState) |
|
|
|
|
assert self.CI |
|
|
|
|
|
|
|
|
@ -301,9 +304,95 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
# CC = car.CarControl.new_message(cruiseControl={'resume': True}) |
|
|
|
|
# test_car_controller(CC) |
|
|
|
|
|
|
|
|
|
def test_panda_safety_carstate_fuzzy(self): |
|
|
|
|
bus = 0#random.randint(0, 3) |
|
|
|
|
address = 466 # random.randint(0x200, 0x300) |
|
|
|
|
@settings(max_examples=1000, deadline=None, |
|
|
|
|
# phases=(Phase.reuse, Phase.generate, Phase.shrink), |
|
|
|
|
suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.too_slow], |
|
|
|
|
) |
|
|
|
|
@given(data=st.data()) |
|
|
|
|
def test_panda_safety_carstate_fuzzy(self, data): |
|
|
|
|
state_has_changed = lambda prev_state, new_state: prev_state != new_state |
|
|
|
|
# cfg = self.CP.safetyConfigs[-1] |
|
|
|
|
# set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam) |
|
|
|
|
# self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") |
|
|
|
|
# self.safety.init_tests() |
|
|
|
|
|
|
|
|
|
# bus = 0 # random.randint(0, 3) |
|
|
|
|
# address = 0xaa # random.randint(0x200, 0x300) |
|
|
|
|
|
|
|
|
|
address = data.draw(st.integers(0x1ff, 0x250)) |
|
|
|
|
bus = 0 |
|
|
|
|
|
|
|
|
|
# ORIG: |
|
|
|
|
# msg_strategy = st.tuples(st.integers(min_value=0, max_value=0), st.integers(min_value=0x100, max_value=0x400), st.binary(min_size=8, max_size=8)) |
|
|
|
|
|
|
|
|
|
msg_strategy = st.binary(min_size=8, max_size=8) |
|
|
|
|
msgs = data.draw(st.lists(msg_strategy, min_size=100))#, min_size=100, max_size=1000)) |
|
|
|
|
print(len(msgs)) |
|
|
|
|
|
|
|
|
|
start_gas = self.safety.get_gas_pressed_prev() |
|
|
|
|
start_gas_int_detected = self.safety.get_gas_interceptor_detected() |
|
|
|
|
|
|
|
|
|
# for bus, address, dat in msgs: |
|
|
|
|
for dat in msgs: |
|
|
|
|
to_send = libpanda_py.make_CANPacket(address, bus, dat) |
|
|
|
|
did_rx = self.safety.safety_rx_hook(to_send) |
|
|
|
|
|
|
|
|
|
can = messaging.new_message('can', 1) |
|
|
|
|
can.can = [log.CanData(address=address, dat=dat, src=bus)] |
|
|
|
|
|
|
|
|
|
CC = car.CarControl.new_message() |
|
|
|
|
CS = self.CI.update(CC, (can.to_bytes(),)) |
|
|
|
|
|
|
|
|
|
if self.safety.get_gas_interceptor_detected() and state_has_changed(start_gas, self.safety.get_gas_pressed_prev()): |
|
|
|
|
print('get_gas_interceptor_detected!') |
|
|
|
|
# self.assertEqual(CS.gasPressed, self.safety.get_gas_interceptor_prev()) |
|
|
|
|
self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev()) |
|
|
|
|
# self.assertFalse(True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# if self.safety.get_gas_pressed_prev() and self.safety.get_cruise_engaged_prev(): |
|
|
|
|
# self.assertFalse(True) |
|
|
|
|
# self.assertFalse(self.safety.get_cruise_engaged_prev()) |
|
|
|
|
|
|
|
|
|
# print('gas_pressed', CS.gasPressed, self.safety.get_gas_pressed_prev()) |
|
|
|
|
# print('wheel_speeds', CS.wheelSpeeds) |
|
|
|
|
# print('standstill', CS.standstill, not self.safety.get_vehicle_moving()) |
|
|
|
|
|
|
|
|
|
# print('did_rx', did_rx) |
|
|
|
|
# if did_rx: |
|
|
|
|
# self.assertFalse(True, 'finally did rx: {}, {}'.format(i, dat)) |
|
|
|
|
# self.assertTrue(CS.standstill, (not CS.standstill, self.safety.get_vehicle_moving(), CS.vEgoRaw, CS.wheelSpeeds)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev()) |
|
|
|
|
# self.assertEqual(not CS.standstill, self.safety.get_vehicle_moving()) |
|
|
|
|
# self.assertEqual(CS.brakePressed, self.safety.get_brake_pressed_prev()) |
|
|
|
|
# self.assertEqual(CS.regenBraking, self.safety.get_regen_braking_prev()) |
|
|
|
|
# |
|
|
|
|
# if self.CP.pcmCruise: |
|
|
|
|
# self.assertEqual(CS.cruiseState.enabled, self.safety.get_cruise_engaged_prev()) |
|
|
|
|
# |
|
|
|
|
# if self.CP.carName == "honda": |
|
|
|
|
# self.assertEqual(CS.cruiseState.available, self.safety.get_acc_main_on()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# if self.safety.get_gas_interceptor_detected(): |
|
|
|
|
# print('get_gas_interceptor_detected!') |
|
|
|
|
# # self.assertEqual(CS.gasPressed, self.safety.get_gas_interceptor_prev()) |
|
|
|
|
# self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev()) |
|
|
|
|
# # self.assertFalse(True) |
|
|
|
|
|
|
|
|
|
print(self.safety.get_gas_pressed_prev(), self.safety.get_brake_pressed_prev(), self.safety.get_vehicle_moving(), self.safety.get_cruise_engaged_prev()) |
|
|
|
|
# assume(state_has_changed(False, self.safety.get_gas_pressed_prev())) |
|
|
|
|
assume(state_has_changed(start_gas, self.safety.get_gas_pressed_prev())) |
|
|
|
|
# assume(state_has_changed(start_gas_int_detected, self.safety.get_gas_interceptor_detected())) |
|
|
|
|
# assume(state_has_changed(False, self.safety.get_brake_pressed_prev())) |
|
|
|
|
# assume(state_has_changed(False, self.safety.get_vehicle_moving())) |
|
|
|
|
# assume(state_has_changed(False, self.safety.get_cruise_engaged_prev())) |
|
|
|
|
|
|
|
|
|
# print(msgs) |
|
|
|
|
# print('\nresults', self.safety.get_gas_pressed_prev(), self.safety.get_vehicle_moving(), self.safety.get_brake_pressed_prev(), self.safety.get_regen_braking_prev(), self.safety.get_cruise_engaged_prev(), self.safety.get_acc_main_on()) |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
for i in range(1000): |
|
|
|
|
# self.setUp() |
|
|
|
@ -323,7 +412,8 @@ class TestCarModelBase(unittest.TestCase): |
|
|
|
|
|
|
|
|
|
print('did_rx', did_rx) |
|
|
|
|
# if did_rx: |
|
|
|
|
# self.assertFalse(True, 'finally did rx: {}'.format(i)) |
|
|
|
|
# self.assertFalse(True, 'finally did rx: {}, {}'.format(i, dat)) |
|
|
|
|
self.assertEqual(not CS.standstill, self.safety.get_vehicle_moving()) |
|
|
|
|
|
|
|
|
|
print('\nresults', self.safety.get_gas_pressed_prev(), self.safety.get_vehicle_moving(), self.safety.get_brake_pressed_prev(), self.safety.get_regen_braking_prev(), self.safety.get_cruise_engaged_prev(), self.safety.get_acc_main_on()) |
|
|
|
|
|
|
|
|
|