diff --git a/selfdrive/car/tests/routes.py b/selfdrive/car/tests/routes.py index 38937edde5..3b80600119 100755 --- a/selfdrive/car/tests/routes.py +++ b/selfdrive/car/tests/routes.py @@ -63,7 +63,7 @@ routes = [ CarTestRoute("168f8b3be57f66ae|2023-09-12--21-44-42", GM.ESCALADE_ESV_2019), CarTestRoute("c950e28c26b5b168|2018-05-30--22-03-41", GM.VOLT), CarTestRoute("f08912a233c1584f|2022-08-11--18-02-41", GM.BOLT_EUV, segment=1), - CarTestRoute("555d4087cf86aa91|2022-12-02--12-15-07", GM.BOLT_EUV, segment=14), # Bolt EV + # CarTestRoute("555d4087cf86aa91|2022-12-02--12-15-07", GM.BOLT_EUV, segment=14), # Bolt EV CarTestRoute("38aa7da107d5d252|2022-08-15--16-01-12", GM.SILVERADO), CarTestRoute("5085c761395d1fe6|2023-04-07--18-20-06", GM.TRAILBLAZER), diff --git a/selfdrive/car/tests/test_models.py b/selfdrive/car/tests/test_models.py index 04d4cf48d9..08dad00bf6 100755 --- a/selfdrive/car/tests/test_models.py +++ b/selfdrive/car/tests/test_models.py @@ -185,7 +185,7 @@ class TestCarModelBase(unittest.TestCase): self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") self.safety.init_tests() - @settings(max_examples=100, deadline=None, + @settings(max_examples=500, deadline=None, phases=(Phase.reuse, Phase.generate, ), suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.too_slow, HealthCheck.large_base_example], ) @@ -198,9 +198,10 @@ class TestCarModelBase(unittest.TestCase): # self.safety.init_tests() # bus = 0 # random.randint(0, 3) - address = data.draw(st.sampled_from([i for i in self.fingerprint[0] if i < 0x600])) # random.randint(0x200, 0x300) + address = data.draw(st.sampled_from([i for i in self.fingerprint[0]])) # random.randint(0x200, 0x300) size = self.fingerprint[0][address] print(address, size) + # print(self.fingerprint) # address = data.draw(st.integers(0x201, 0x226)) bus = 0 @@ -215,8 +216,11 @@ class TestCarModelBase(unittest.TestCase): prev_panda_gas = self.safety.get_gas_pressed_prev() prev_panda_brake = self.safety.get_brake_pressed_prev() + prev_panda_regen_braking = self.safety.get_regen_braking_prev() prev_panda_vehicle_moving = self.safety.get_vehicle_moving() prev_panda_cruise_engaged = self.safety.get_cruise_engaged_prev() + prev_panda_acc_main_on = self.safety.get_acc_main_on() + start_gas = self.safety.get_gas_pressed_prev() start_gas_int_detected = self.safety.get_gas_interceptor_detected() @@ -238,6 +242,8 @@ class TestCarModelBase(unittest.TestCase): if self.safety.get_gas_pressed_prev(): self.init_gas_pressed = True + # due to panda updating state selectively, per message, we can only compare on a change + # if self.safety.get_gas_interceptor_detected():# and state_has_changed(start_gas, self.safety.get_gas_pressed_prev()): if self.safety.get_gas_pressed_prev() != prev_panda_gas: print() @@ -247,23 +253,35 @@ class TestCarModelBase(unittest.TestCase): print('can.can', can.can) # self.assertEqual(CS.gasPressed, self.safety.get_gas_interceptor_prev()) self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev()) + self.assertEqual(CS.gas, self.safety.get_gas_interceptor_prev()) # self.assertFalse(True) if self.safety.get_brake_pressed_prev() != prev_panda_brake: - print('brake change!') - print('both', CS.brakePressed, self.safety.get_brake_pressed_prev()) + # print('brake change!') + # print('both', CS.brakePressed, self.safety.get_brake_pressed_prev()) self.assertEqual(CS.brakePressed, self.safety.get_brake_pressed_prev()) + if self.safety.get_regen_braking_prev() != prev_panda_regen_braking: + print('regen change!') + print('both', CS.regenBraking, self.safety.get_regen_braking_prev()) + self.assertEqual(CS.regenBraking, self.safety.get_regen_braking_prev()) + if self.safety.get_vehicle_moving() != prev_panda_vehicle_moving: self.assertEqual(not CS.standstill, self.safety.get_vehicle_moving()) if self.safety.get_cruise_engaged_prev() != prev_panda_cruise_engaged: self.assertEqual(CS.cruiseState.enabled, self.safety.get_cruise_engaged_prev()) + if self.CP.carName == "honda": + if self.safety.get_acc_main_on() != prev_panda_acc_main_on: + self.assertEqual(CS.cruiseState.available, self.safety.get_acc_main_on()) + prev_panda_gas = self.safety.get_gas_pressed_prev() prev_panda_brake = self.safety.get_brake_pressed_prev() + prev_panda_regen_braking = self.safety.get_regen_braking_prev() prev_panda_vehicle_moving = self.safety.get_vehicle_moving() prev_panda_cruise_engaged = self.safety.get_cruise_engaged_prev() + prev_panda_acc_main_on = self.safety.get_acc_main_on() # if self.safety.get_gas_pressed_prev() and self.safety.get_cruise_engaged_prev(): # self.assertFalse(True) # self.assertFalse(self.safety.get_cruise_engaged_prev()) diff --git a/selfdrive/car/tests/test_models_fuzzy.py b/selfdrive/car/tests/test_models_fuzzy.py new file mode 100755 index 0000000000..3ef5d6005f --- /dev/null +++ b/selfdrive/car/tests/test_models_fuzzy.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python3 +import capnp +import os +import importlib +import pytest +import random +import unittest +from collections import defaultdict, Counter +from typing import List, Optional, Tuple +from parameterized import parameterized_class +import hypothesis.strategies as st +from hypothesis.stateful import RuleBasedStateMachine, rule, precondition +from hypothesis import HealthCheck, Phase, assume, given, settings + +from cereal import messaging, log, car +from openpilot.common.basedir import BASEDIR +from openpilot.common.params import Params +from openpilot.common.realtime import DT_CTRL +from openpilot.selfdrive.car.fingerprints import all_known_cars +from openpilot.selfdrive.car.car_helpers import FRAME_FINGERPRINT, interfaces +from openpilot.selfdrive.car.gm.values import CAR as GM +from openpilot.selfdrive.car.honda.values import CAR as HONDA, HONDA_BOSCH +from openpilot.selfdrive.car.hyundai.values import CAR as HYUNDAI +from openpilot.selfdrive.car.tests.routes import non_tested_cars, routes, CarTestRoute +from openpilot.selfdrive.controls.controlsd import Controls +from openpilot.selfdrive.test.openpilotci import get_url +from openpilot.tools.lib.logreader import LogReader +from openpilot.tools.lib.route import Route, SegmentName, RouteName + +from panda.tests.libpanda import libpanda_py + +EventName = car.CarEvent.EventName +PandaType = log.PandaState.PandaType +SafetyModel = car.CarParams.SafetyModel + +NUM_JOBS = int(os.environ.get("NUM_JOBS", "1")) +JOB_ID = int(os.environ.get("JOB_ID", "0")) +INTERNAL_SEG_LIST = os.environ.get("INTERNAL_SEG_LIST", "") +INTERNAL_SEG_CNT = int(os.environ.get("INTERNAL_SEG_CNT", "0")) + +ignore_addr_checks_valid = [ + GM.BUICK_REGAL, + HYUNDAI.GENESIS_G70_2020, +] + + +def get_test_cases() -> List[Tuple[str, Optional[CarTestRoute]]]: + # build list of test cases + test_cases = [] + if not len(INTERNAL_SEG_LIST): + routes_by_car = defaultdict(set) + for r in routes: + routes_by_car[r.car_model].add(r) + + for i, c in enumerate(sorted(all_known_cars())): + if i % NUM_JOBS == JOB_ID: + test_cases.extend(sorted((c, r) for r in routes_by_car.get(c, (None,)))) + + else: + with open(os.path.join(BASEDIR, INTERNAL_SEG_LIST), "r") as f: + seg_list = f.read().splitlines() + + cnt = INTERNAL_SEG_CNT or len(seg_list) + seg_list_iter = iter(seg_list[:cnt]) + + for platform in seg_list_iter: + platform = platform[2:] # get rid of comment + segment_name = SegmentName(next(seg_list_iter)) + test_cases.append((platform, CarTestRoute(segment_name.route_name.canonical_name, platform, + segment=segment_name.segment_num))) + return test_cases + + +init_done = False + +@pytest.mark.slow +class CarModelBase(RuleBasedStateMachine): + car_model: Optional[str] = 'TOYOTA CAMRY 2021' + test_route: Optional[CarTestRoute] = CarTestRoute("3456ad0cd7281b24|2020-12-13--17-45-56", 'TOYOTA CAMRY 2021') + ci: bool = True + + can_msgs: List[capnp.lib.capnp._DynamicStructReader] + elm_frame: Optional[int] + + def __init__(self): + super().__init__() + global init_done + if init_done: + return + init_done = True + # if self.__name__ == 'TestCarModel' or self.__name__.endswith('Base'): + # raise unittest.SkipTest + + if 'FILTER' in os.environ: + if not self.car_model.startswith(tuple(os.environ.get('FILTER').split(','))): + raise unittest.SkipTest + + if self.test_route is None: + if self.car_model in non_tested_cars: + print(f"Skipping tests for {self.car_model}: missing route") + raise unittest.SkipTest + raise Exception(f"missing test route for {self.car_model}") + + test_segs = (2, 1, 0) + if self.test_route.segment is not None: + test_segs = (self.test_route.segment,) + + for seg in test_segs: + try: + if len(INTERNAL_SEG_LIST): + route_name = RouteName(self.test_route.route) + lr = LogReader(f"cd:/{route_name.dongle_id}/{route_name.time_str}/{seg}/rlog.bz2") + elif self.ci: + lr = LogReader(get_url(self.test_route.route, seg)) + else: + lr = LogReader(Route(self.test_route.route).log_paths()[seg]) + except Exception: + continue + + car_fw = [] + can_msgs = [] + self.elm_frame = None + fingerprint = defaultdict(dict) + experimental_long = False + enabled_toggle = True + dashcam_only = False + for msg in lr: + if msg.which() == "can": + can_msgs.append(msg) + if len(can_msgs) <= FRAME_FINGERPRINT: + for m in msg.can: + if m.src < 64: + fingerprint[m.src][m.address] = len(m.dat) + + elif msg.which() == "carParams": + car_fw = msg.carParams.carFw + dashcam_only = msg.carParams.dashcamOnly + if msg.carParams.openpilotLongitudinalControl: + experimental_long = True + if self.car_model is None and not self.ci: + self.car_model = msg.carParams.carFingerprint + + elif msg.which() == 'initData': + for param in msg.initData.params.entries: + if param.key == 'OpenpilotEnabledToggle': + enabled_toggle = param.value.strip(b'\x00') == b'1' + + # Log which can frame the panda safety mode left ELM327, for CAN validity checks + if msg.which() == 'pandaStates': + for ps in msg.pandaStates: + if self.elm_frame is None and ps.safetyModel != SafetyModel.elm327: + self.elm_frame = len(can_msgs) + + elif msg.which() == 'pandaStateDEPRECATED': + if self.elm_frame is None and msg.pandaStateDEPRECATED.safetyModel != SafetyModel.elm327: + self.elm_frame = len(can_msgs) + + if len(can_msgs) > int(50 / DT_CTRL): + break + else: + raise Exception(f"Route: {repr(self.test_route.route)} with segments: {test_segs} not found or no CAN msgs found. Is it uploaded?") + + # if relay is expected to be open in the route + self.openpilot_enabled = enabled_toggle and not dashcam_only + + self.can_msgs = sorted(can_msgs, key=lambda msg: msg.logMonoTime) + + self.CarInterface, self.CarController, self.CarState = interfaces[self.car_model] + self.CP = self.CarInterface.get_params(self.car_model, fingerprint, car_fw, experimental_long, docs=False) + assert self.CP + assert self.CP.carFingerprint == self.car_model + + self.car_state_dict = {'gas_pressed': False} + print("HERE!!!!!") + + # print('SETUP HEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHEREHERE') + self.CI = self.CarInterface(self.CP.copy(), self.CarController, self.CarState) + assert self.CI + + Params().put_bool("OpenpilotEnabledToggle", self.openpilot_enabled) + + # TODO: check safetyModel is in release panda build + self.safety = libpanda_py.libpanda + + cfg = self.CP.safetyConfigs[-1] + set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam) + assert 0 == set_status, f"failed to set safetyModel {cfg}" + self.safety.init_tests() + + @rule(messages=st.lists(st.tuples( + st.integers(min_value=0, max_value=2), + st.integers(min_value=0, max_value=0x1FFFF), + st.binary(min_size=8, max_size=8) + ))) + def send_messages(self, messages): + print(messages) + for bus, address, dat in messages: + to_send = libpanda_py.make_CANPacket(address, bus, dat) + self.safety.safety_rx_hook(to_send) + + can = messaging.new_message('can', 1) + can.can = [log.CanData(address=address, dat=dat, src=bus)] + + CC = car.CarControl.new_message() + prev_car_state = self.car_state.copy() + self.car_state = self.CI.update(CC, (can.to_bytes(),)) + + # Check for state change. + self.check_state_change(prev_car_state, self.car_state) + + # Only start asserting once there's been a state change. + if self.has_state_changed: + # Assertions based on your criteria here + # For example: + assert self.car_state['gas_pressed'] == self.initial_car_state['gas_pressed'], \ + "Mismatch in gas pressed state." + + # @settings(max_examples=100, deadline=None, + # # phases=(Phase.reuse, Phase.generate, Phase.shrink), + # suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.too_slow], + # ) + # @given(data=st.data()) + # def test_panda_safety_carstate_fuzzy(self, data): + # state_has_changed = lambda prev_state, new_state: prev_state != new_state + # # cfg = self.CP.safetyConfigs[-1] + # # set_status = self.safety.set_safety_hooks(cfg.safetyModel.raw, cfg.safetyParam) + # # self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") + # # self.safety.init_tests() + # + # # bus = 0 # random.randint(0, 3) + # # address = 0xaa # random.randint(0x200, 0x300) + # + # address = data.draw(st.integers(0x1ff, 0x250)) + # bus = 0 + # + # # ORIG: + # # msg_strategy = st.tuples(st.integers(min_value=0, max_value=0), st.integers(min_value=0x100, max_value=0x400), st.binary(min_size=8, max_size=8)) + # + # msg_strategy = st.binary(min_size=8, max_size=8) + # msgs = data.draw(st.lists(msg_strategy, min_size=100))#, min_size=100, max_size=1000)) + # print(len(msgs)) + # + # start_gas = self.safety.get_gas_pressed_prev() + # start_gas_int_detected = self.safety.get_gas_interceptor_detected() + # + # # for bus, address, dat in msgs: + # for dat in msgs: + # to_send = libpanda_py.make_CANPacket(address, bus, dat) + # did_rx = self.safety.safety_rx_hook(to_send) + # + # can = messaging.new_message('can', 1) + # can.can = [log.CanData(address=address, dat=dat, src=bus)] + # + # CC = car.CarControl.new_message() + # CS = self.CI.update(CC, (can.to_bytes(),)) + # + # if self.safety.get_gas_interceptor_detected():# and state_has_changed(start_gas, self.safety.get_gas_pressed_prev()): + # print('get_gas_interceptor_detected!') + # # self.assertEqual(CS.gasPressed, self.safety.get_gas_interceptor_prev()) + # self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev()) + # # self.assertFalse(True) + # + # + # # if self.safety.get_gas_pressed_prev() and self.safety.get_cruise_engaged_prev(): + # # self.assertFalse(True) + # # self.assertFalse(self.safety.get_cruise_engaged_prev()) + # + # # print('gas_pressed', CS.gasPressed, self.safety.get_gas_pressed_prev()) + # # print('wheel_speeds', CS.wheelSpeeds) + # # print('standstill', CS.standstill, not self.safety.get_vehicle_moving()) + # + # # print('did_rx', did_rx) + # # if did_rx: + # # self.assertFalse(True, 'finally did rx: {}, {}'.format(i, dat)) + # # self.assertTrue(CS.standstill, (not CS.standstill, self.safety.get_vehicle_moving(), CS.vEgoRaw, CS.wheelSpeeds)) + # + # + # # self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev()) + # # self.assertEqual(not CS.standstill, self.safety.get_vehicle_moving()) + # # self.assertEqual(CS.brakePressed, self.safety.get_brake_pressed_prev()) + # # self.assertEqual(CS.regenBraking, self.safety.get_regen_braking_prev()) + # # + # # if self.CP.pcmCruise: + # # self.assertEqual(CS.cruiseState.enabled, self.safety.get_cruise_engaged_prev()) + # # + # # if self.CP.carName == "honda": + # # self.assertEqual(CS.cruiseState.available, self.safety.get_acc_main_on()) + # + # + # # if self.safety.get_gas_interceptor_detected(): + # # print('get_gas_interceptor_detected!') + # # # self.assertEqual(CS.gasPressed, self.safety.get_gas_interceptor_prev()) + # # self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev()) + # # # self.assertFalse(True) + # + # print(self.safety.get_gas_pressed_prev(), self.safety.get_brake_pressed_prev(), self.safety.get_vehicle_moving(), self.safety.get_cruise_engaged_prev()) + # assume(state_has_changed(False, self.safety.get_gas_pressed_prev())) + # # assume(state_has_changed(start_gas, self.safety.get_gas_pressed_prev())) # this just goes on forever + # # assume(state_has_changed(start_gas_int_detected, self.safety.get_gas_interceptor_detected())) + # # assume(state_has_changed(False, self.safety.get_brake_pressed_prev())) + # # assume(state_has_changed(False, self.safety.get_vehicle_moving())) + # # assume(state_has_changed(False, self.safety.get_cruise_engaged_prev())) + # + # # print(msgs) + # # print('\nresults', self.safety.get_gas_pressed_prev(), self.safety.get_vehicle_moving(), self.safety.get_brake_pressed_prev(), self.safety.get_regen_braking_prev(), self.safety.get_cruise_engaged_prev(), self.safety.get_acc_main_on()) + # return + # + # for i in range(1000): + # # self.setUp() + # dat = os.urandom(8) + # to_send = libpanda_py.make_CANPacket(address, bus, dat) + # did_rx = self.safety.safety_rx_hook(to_send) + # + # can = messaging.new_message('can', 1) + # can.can = [log.CanData(address=address, dat=dat, src=bus)] + # + # CC = car.CarControl.new_message() + # CS = self.CI.update(CC, (can.to_bytes(), )) + # + # print('gas_pressed', CS.gasPressed, self.safety.get_gas_pressed_prev()) + # print('wheel_speeds', CS.wheelSpeeds) + # print('standstill', CS.standstill, not self.safety.get_vehicle_moving()) + # + # print('did_rx', did_rx) + # # if did_rx: + # # self.assertFalse(True, 'finally did rx: {}, {}'.format(i, dat)) + # self.assertEqual(not CS.standstill, self.safety.get_vehicle_moving()) + # + # print('\nresults', self.safety.get_gas_pressed_prev(), self.safety.get_vehicle_moving(), self.safety.get_brake_pressed_prev(), self.safety.get_regen_braking_prev(), self.safety.get_cruise_engaged_prev(), self.safety.get_acc_main_on()) + # + # # self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev()) + # # self.assertEqual(not CS.standstill, self.safety.get_vehicle_moving()) + # # self.assertEqual(CS.brakePressed, self.safety.get_brake_pressed_prev()) + # # self.assertEqual(CS.regenBraking, self.safety.get_regen_braking_prev()) + # # + # # if self.CP.pcmCruise: + # # self.assertEqual(CS.cruiseState.enabled, self.safety.get_cruise_engaged_prev()) + # # + # # if self.CP.carName == "honda": + # # self.assertEqual(CS.cruiseState.available, self.safety.get_acc_main_on()) + + +# @parameterized_class(('car_model', 'test_route'), get_test_cases()) +# class TestCarModel(TestCarModelBase): +# pass + +TestCarModelBase = CarModelBase.TestCase + +# TestCarModelBase().runTest() + + +if __name__ == "__main__": + unittest.main() + +