diff --git a/selfdrive/car/tests/test_models.py b/selfdrive/car/tests/test_models.py index 0a9f86b556..cf7727dafe 100755 --- a/selfdrive/car/tests/test_models.py +++ b/selfdrive/car/tests/test_models.py @@ -201,6 +201,42 @@ class TestCarModelBase(unittest.TestCase): self.assertTrue(self.safety.addr_checks_valid()) self.assertFalse(len(failed_addrs), f"panda safety RX check failed: {failed_addrs}") + def test_panda_safety_tx_cases(self, data=None): + """Asserts we can tx common messages""" + if self.CP.notCar: + self.skipTest("Skipping test for notCar") + + def test_car_controller(car_control): + now_nanos = 0 + msgs_sent = 0 + CI = self.CarInterface(self.CP, self.CarController, self.CarState) + for _ in range(round(10.0 / DT_CTRL)): # make sure we hit the slowest messages + CI.update(car_control, []) + _, sendcan = CI.apply(car_control, now_nanos) + + now_nanos += DT_CTRL * 1e9 + msgs_sent += len(sendcan) + for addr, _, dat, bus in sendcan: + to_send = libpanda_py.make_CANPacket(addr, bus % 4, dat) + self.assertTrue(self.safety.safety_tx_hook(to_send), (addr, dat, bus)) + + # Make sure we attempted to send messages + self.assertGreater(msgs_sent, 50) + + # Make sure we can send all messages while inactive + CC = car.CarControl.new_message() + test_car_controller(CC) + + # Test cancel + general messages (controls_allowed=False & cruise_engaged=True) + self.safety.set_cruise_engaged_prev(True) + CC = car.CarControl.new_message(cruiseControl={'cancel': True}) + test_car_controller(CC) + + # Test resume + general messages (controls_allowed=True & cruise_engaged=True) + self.safety.set_controls_allowed(True) + CC = car.CarControl.new_message(cruiseControl={'resume': True}) + test_car_controller(CC) + def test_panda_safety_carstate(self): """ Assert that panda safety matches openpilot's carState