diff --git a/selfdrive/car/tests/test_models.py b/selfdrive/car/tests/test_models.py index 3edc8ffeb9..c78b0b53f1 100755 --- a/selfdrive/car/tests/test_models.py +++ b/selfdrive/car/tests/test_models.py @@ -163,6 +163,9 @@ class TestCarModelBase(unittest.TestCase): assert cls.CP assert cls.CP.carFingerprint == cls.car_model + cls.car_state_dict = {'panda': {'gas_pressed': False}, 'CS': {'gasPressed': False}} + cls.init_gas_pressed = False + @classmethod def tearDownClass(cls): del cls.can_msgs @@ -182,131 +185,9 @@ class TestCarModelBase(unittest.TestCase): self.assertEqual(0, set_status, f"failed to set safetyModel {cfg}") self.safety.init_tests() - # def test_car_params(self): - # if self.CP.dashcamOnly: - # self.skipTest("no need to check carParams for dashcamOnly") - # - # # make sure car params are within a valid range - # self.assertGreater(self.CP.mass, 1) - # - # if self.CP.steerControlType != car.CarParams.SteerControlType.angle: - # tuning = self.CP.lateralTuning.which() - # if tuning == 'pid': - # self.assertTrue(len(self.CP.lateralTuning.pid.kpV)) - # elif tuning == 'torque': - # self.assertTrue(self.CP.lateralTuning.torque.kf > 0) - # else: - # raise Exception("unknown tuning") - # - # def test_car_interface(self): - # # TODO: also check for checksum violations from can parser - # can_invalid_cnt = 0 - # can_valid = False - # CC = car.CarControl.new_message() - # - # for i, msg in enumerate(self.can_msgs): - # CS = self.CI.update(CC, (msg.as_builder().to_bytes(),)) - # self.CI.apply(CC, msg.logMonoTime) - # - # if CS.canValid: - # can_valid = True - # - # # wait max of 2s for low frequency msgs to be seen - # if i > 200 or can_valid: - # can_invalid_cnt += not CS.canValid - # - # self.assertEqual(can_invalid_cnt, 0) - # - # def test_radar_interface(self): - # os.environ['NO_RADAR_SLEEP'] = "1" - # RadarInterface = importlib.import_module(f'selfdrive.car.{self.CP.carName}.radar_interface').RadarInterface - # RI = RadarInterface(self.CP) - # assert RI - # - # # Since OBD port is multiplexed to bus 1 (commonly radar bus) while fingerprinting, - # # start parsing CAN messages after we've left ELM mode and can expect CAN traffic - # error_cnt = 0 - # for i, msg in enumerate(self.can_msgs[self.elm_frame:]): - # rr = RI.update((msg.as_builder().to_bytes(),)) - # if rr is not None and i > 50: - # error_cnt += car.RadarData.Error.canError in rr.errors - # self.assertEqual(error_cnt, 0) - # - # def test_panda_safety_rx_valid(self): - # if self.CP.dashcamOnly: - # self.skipTest("no need to check panda safety for dashcamOnly") - # - # start_ts = self.can_msgs[0].logMonoTime - # - # failed_addrs = Counter() - # for can in self.can_msgs: - # # update panda timer - # t = (can.logMonoTime - start_ts) / 1e3 - # self.safety.set_timer(int(t)) - # - # # run all msgs through the safety RX hook - # for msg in can.can: - # if msg.src >= 64: - # continue - # - # to_send = libpanda_py.make_CANPacket(msg.address, msg.src % 4, msg.dat) - # if self.safety.safety_rx_hook(to_send) != 1: - # failed_addrs[hex(msg.address)] += 1 - # - # # ensure all msgs defined in the addr checks are valid - # if self.car_model not in ignore_addr_checks_valid: - # self.safety.safety_tick_current_rx_checks() - # if t > 1e6: - # self.assertTrue(self.safety.addr_checks_valid()) - # - # # No need to check relay malfunction on disabled routes (relay closed), - # # or before fingerprinting is done (1s of tolerance to exit silent mode) - # if self.openpilot_enabled and t / 1e4 > (self.elm_frame + 100): - # self.assertFalse(self.safety.get_relay_malfunction()) - # else: - # self.safety.set_relay_malfunction(False) - # - # self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}") - # - # def test_panda_safety_tx_cases(self, data=None): - # """Asserts we can tx common messages""" - # if self.CP.notCar: - # self.skipTest("Skipping test for notCar") - # - # def test_car_controller(car_control): - # now_nanos = 0 - # msgs_sent = 0 - # CI = self.CarInterface(self.CP, self.CarController, self.CarState) - # for _ in range(round(10.0 / DT_CTRL)): # make sure we hit the slowest messages - # CI.update(car_control, []) - # _, sendcan = CI.apply(car_control, now_nanos) - # - # now_nanos += DT_CTRL * 1e9 - # msgs_sent += len(sendcan) - # for addr, _, dat, bus in sendcan: - # to_send = libpanda_py.make_CANPacket(addr, bus % 4, dat) - # self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus)) - # - # # Make sure we attempted to send messages - # self.assertGreater(msgs_sent, 50) - # - # # Make sure we can send all messages while inactive - # CC = car.CarControl.new_message() - # test_car_controller(CC) - # - # # Test cancel + general messages (controls_allowed=False & cruise_engaged=True) - # self.safety.set_cruise_engaged_prev(True) - # CC = car.CarControl.new_message(cruiseControl={'cancel': True}) - # test_car_controller(CC) - # - # # Test resume + general messages (controls_allowed=True & cruise_engaged=True) - # self.safety.set_controls_allowed(True) - # CC = car.CarControl.new_message(cruiseControl={'resume': True}) - # test_car_controller(CC) - - @settings(max_examples=1000, deadline=None, - # phases=(Phase.reuse, Phase.generate, Phase.shrink), - suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.too_slow], + @settings(max_examples=100, deadline=None, + phases=(Phase.reuse, Phase.generate, ), + suppress_health_check=[HealthCheck.filter_too_much, HealthCheck.too_slow, HealthCheck.large_base_example], ) @given(data=st.data()) def test_panda_safety_carstate_fuzzy(self, data): @@ -319,16 +200,17 @@ class TestCarModelBase(unittest.TestCase): # bus = 0 # random.randint(0, 3) # address = 0xaa # random.randint(0x200, 0x300) - address = data.draw(st.integers(0x1ff, 0x250)) + address = data.draw(st.integers(0x201, 0x201)) bus = 0 # ORIG: # msg_strategy = st.tuples(st.integers(min_value=0, max_value=0), st.integers(min_value=0x100, max_value=0x400), st.binary(min_size=8, max_size=8)) msg_strategy = st.binary(min_size=8, max_size=8) - msgs = data.draw(st.lists(msg_strategy, min_size=100))#, min_size=100, max_size=1000)) - print(len(msgs)) + msgs = data.draw(st.lists(msg_strategy, min_size=100)) + # print(len(msgs)) + prev_panda_gas = self.safety.get_gas_pressed_prev() start_gas = self.safety.get_gas_pressed_prev() start_gas_int_detected = self.safety.get_gas_interceptor_detected() @@ -343,13 +225,21 @@ class TestCarModelBase(unittest.TestCase): CC = car.CarControl.new_message() CS = self.CI.update(CC, (can.to_bytes(),)) - if self.safety.get_gas_interceptor_detected() and state_has_changed(start_gas, self.safety.get_gas_pressed_prev()): + if self.safety.get_gas_pressed_prev(): + self.init_gas_pressed = True + + # if self.safety.get_gas_interceptor_detected():# and state_has_changed(start_gas, self.safety.get_gas_pressed_prev()): + if self.safety.get_gas_pressed_prev() != prev_panda_gas: + print() + print('ret.gas', CS.gas, 'safety gas', self.safety.get_gas_interceptor_prev()) + print('both', CS.gasPressed, self.safety.get_gas_pressed_prev(), 'int') print('get_gas_interceptor_detected!') + print('can.can', can.can) # self.assertEqual(CS.gasPressed, self.safety.get_gas_interceptor_prev()) self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev()) # self.assertFalse(True) - + prev_panda_gas = self.safety.get_gas_pressed_prev() # if self.safety.get_gas_pressed_prev() and self.safety.get_cruise_engaged_prev(): # self.assertFalse(True) # self.assertFalse(self.safety.get_cruise_engaged_prev()) @@ -382,9 +272,13 @@ class TestCarModelBase(unittest.TestCase): # self.assertEqual(CS.gasPressed, self.safety.get_gas_pressed_prev()) # # self.assertFalse(True) - print(self.safety.get_gas_pressed_prev(), self.safety.get_brake_pressed_prev(), self.safety.get_vehicle_moving(), self.safety.get_cruise_engaged_prev()) + # return + # self.car_state_dict['panda'] = {'gas_pressed': self.safety.get_gas_pressed_prev()} + # self.car_state_dict['CS'] = {'gasPressed': CS.gasPressed} + + # print(self.safety.get_gas_pressed_prev(), self.safety.get_brake_pressed_prev(), self.safety.get_vehicle_moving(), self.safety.get_cruise_engaged_prev()) # assume(state_has_changed(False, self.safety.get_gas_pressed_prev())) - assume(state_has_changed(start_gas, self.safety.get_gas_pressed_prev())) + assume(state_has_changed(start_gas, self.safety.get_gas_pressed_prev())) # this just goes on forever # assume(state_has_changed(start_gas_int_detected, self.safety.get_gas_interceptor_detected())) # assume(state_has_changed(False, self.safety.get_brake_pressed_prev())) # assume(state_has_changed(False, self.safety.get_vehicle_moving())) diff --git a/selfdrive/car/toyota/carstate.py b/selfdrive/car/toyota/carstate.py index 8eb8e69b45..99c9d13a10 100644 --- a/selfdrive/car/toyota/carstate.py +++ b/selfdrive/car/toyota/carstate.py @@ -56,6 +56,7 @@ class CarState(CarStateBase): ret.brakeHoldActive = cp.vl["ESP_CONTROL"]["BRAKE_HOLD_ACTIVE"] == 1 if self.CP.enableGasInterceptor: ret.gas = (cp.vl["GAS_SENSOR"]["INTERCEPTOR_GAS"] + cp.vl["GAS_SENSOR"]["INTERCEPTOR_GAS2"]) // 2 + # print('ret.gas', ret.gas) ret.gasPressed = ret.gas > 805 else: # TODO: find a common gas pedal percentage signal diff --git a/selfdrive/car/toyota/interface.py b/selfdrive/car/toyota/interface.py index 42ab5cf257..08b57a4901 100644 --- a/selfdrive/car/toyota/interface.py +++ b/selfdrive/car/toyota/interface.py @@ -215,7 +215,7 @@ class CarInterface(CarInterfaceBase): found_ecus = [fw.ecu for fw in car_fw] ret.enableDsu = len(found_ecus) > 0 and Ecu.dsu not in found_ecus and candidate not in (NO_DSU_CAR | UNSUPPORTED_DSU_CAR) \ and not (ret.flags & ToyotaFlags.SMART_DSU) - ret.enableGasInterceptor = 0x201 in fingerprint[0] + ret.enableGasInterceptor = True # 0x201 in fingerprint[0] # if the smartDSU is detected, openpilot can send ACC_CONTROL and the smartDSU will block it from the DSU or radar. # since we don't yet parse radar on TSS2/TSS-P radar-based ACC cars, gate longitudinal behind experimental toggle