#!/usr/bin/env python2 import unittest import numpy as np import libpandasafety_py MAX_RATE_UP = 10 MAX_RATE_DOWN = 25 MAX_TORQUE = 1500 MAX_ACCEL = 1500 MIN_ACCEL = -3000 MAX_RT_DELTA = 375 RT_INTERVAL = 250000 MAX_TORQUE_ERROR = 350 IPAS_OVERRIDE_THRESHOLD = 200 ANGLE_DELTA_BP = [0., 5., 15.] ANGLE_DELTA_V = [5., .8, .15] # windup limit ANGLE_DELTA_VU = [5., 3.5, 0.4] # unwind limit def twos_comp(val, bits): if val >= 0: return val else: return (2**bits) + val def sign(a): if a > 0: return 1 else: return -1 class TestToyotaSafety(unittest.TestCase): @classmethod def setUp(cls): cls.safety = libpandasafety_py.libpandasafety cls.safety.toyota_init(100) cls.safety.init_tests_toyota() def _set_prev_torque(self, t): self.safety.set_toyota_desired_torque_last(t) self.safety.set_toyota_rt_torque_last(t) self.safety.set_toyota_torque_meas(t, t) def _torque_meas_msg(self, torque): to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') to_send[0].RIR = 0x260 << 21 t = twos_comp(torque, 16) to_send[0].RDHR = t | ((t & 0xFF) << 16) return to_send def _torque_driver_msg(self, torque): to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') to_send[0].RIR = 0x260 << 21 t = twos_comp(torque, 16) to_send[0].RDLR = t | ((t & 0xFF) << 16) return to_send def _torque_driver_msg_array(self, torque): for i in range(6): self.safety.toyota_ipas_rx_hook(self._torque_driver_msg(torque)) def _angle_meas_msg(self, angle): to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') to_send[0].RIR = 0x25 << 21 t = twos_comp(angle, 12) to_send[0].RDLR = ((t & 0xF00) >> 8) | ((t & 0xFF) << 8) return to_send def _angle_meas_msg_array(self, angle): for i in range(6): self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(angle)) def _torque_msg(self, torque): to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') to_send[0].RIR = 0x2E4 << 21 t = twos_comp(torque, 16) to_send[0].RDLR = t | ((t & 0xFF) << 16) return to_send def _ipas_state_msg(self, state): to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') to_send[0].RIR = 0x262 << 21 to_send[0].RDLR = state & 0xF return to_send def _ipas_control_msg(self, angle, state): # note: we command 2/3 of the angle due to CAN conversion to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') to_send[0].RIR = 0x266 << 21 t = twos_comp(angle, 12) to_send[0].RDLR = ((t & 0xF00) >> 8) | ((t & 0xFF) << 8) to_send[0].RDLR |= ((state & 0xf) << 4) return to_send def _speed_msg(self, speed): to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') to_send[0].RIR = 0xb4 << 21 speed = int(speed * 100 * 3.6) to_send[0].RDHR = ((speed & 0xFF) << 16) | (speed & 0xFF00) return to_send def _accel_msg(self, accel): to_send = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') to_send[0].RIR = 0x343 << 21 a = twos_comp(accel, 16) to_send[0].RDLR = (a & 0xFF) << 8 | (a >> 8) return to_send def test_default_controls_not_allowed(self): self.assertFalse(self.safety.get_controls_allowed()) def test_manually_enable_controls_allowed(self): self.safety.set_controls_allowed(1) self.assertTrue(self.safety.get_controls_allowed()) def test_enable_control_allowed_from_cruise(self): to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') to_push[0].RIR = 0x1D2 << 21 to_push[0].RDLR = 0x20 self.safety.toyota_rx_hook(to_push) self.assertTrue(self.safety.get_controls_allowed()) def test_disable_control_allowed_from_cruise(self): to_push = libpandasafety_py.ffi.new('CAN_FIFOMailBox_TypeDef *') to_push[0].RIR = 0x1D2 << 21 to_push[0].RDLR = 0 self.safety.set_controls_allowed(1) self.safety.toyota_rx_hook(to_push) self.assertFalse(self.safety.get_controls_allowed()) def test_accel_actuation_limits(self): for accel in np.arange(MIN_ACCEL - 1000, MAX_ACCEL + 1000, 100): for controls_allowed in [True, False]: self.safety.set_controls_allowed(controls_allowed) if controls_allowed: send = MIN_ACCEL <= accel <= MAX_ACCEL else: send = accel == 0 self.assertEqual(send, self.safety.toyota_tx_hook(self._accel_msg(accel))) def test_torque_absolute_limits(self): for controls_allowed in [True, False]: for torque in np.arange(-MAX_TORQUE - 1000, MAX_TORQUE + 1000, MAX_RATE_UP): self.safety.set_controls_allowed(controls_allowed) self.safety.set_toyota_rt_torque_last(torque) self.safety.set_toyota_torque_meas(torque, torque) self.safety.set_toyota_desired_torque_last(torque - MAX_RATE_UP) if controls_allowed: send = (-MAX_TORQUE <= torque <= MAX_TORQUE) else: send = torque == 0 self.assertEqual(send, self.safety.toyota_tx_hook(self._torque_msg(torque))) def test_non_realtime_limit_up(self): self.safety.set_controls_allowed(True) self._set_prev_torque(0) self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(MAX_RATE_UP))) self._set_prev_torque(0) self.assertFalse(self.safety.toyota_tx_hook(self._torque_msg(MAX_RATE_UP + 1))) def test_non_realtime_limit_down(self): self.safety.set_controls_allowed(True) self.safety.set_toyota_rt_torque_last(1000) self.safety.set_toyota_torque_meas(500, 500) self.safety.set_toyota_desired_torque_last(1000) self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(1000 - MAX_RATE_DOWN))) self.safety.set_toyota_rt_torque_last(1000) self.safety.set_toyota_torque_meas(500, 500) self.safety.set_toyota_desired_torque_last(1000) self.assertFalse(self.safety.toyota_tx_hook(self._torque_msg(1000 - MAX_RATE_DOWN + 1))) def test_exceed_torque_sensor(self): self.safety.set_controls_allowed(True) for sign in [-1, 1]: self._set_prev_torque(0) for t in np.arange(0, MAX_TORQUE_ERROR + 10, 10): t *= sign self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(t))) self.assertFalse(self.safety.toyota_tx_hook(self._torque_msg(sign * (MAX_TORQUE_ERROR + 10)))) def test_realtime_limit_up(self): self.safety.set_controls_allowed(True) for sign in [-1, 1]: self.safety.init_tests_toyota() self._set_prev_torque(0) for t in np.arange(0, 380, 10): t *= sign self.safety.set_toyota_torque_meas(t, t) self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(t))) self.assertFalse(self.safety.toyota_tx_hook(self._torque_msg(sign * 380))) self._set_prev_torque(0) for t in np.arange(0, 370, 10): t *= sign self.safety.set_toyota_torque_meas(t, t) self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(t))) # Increase timer to update rt_torque_last self.safety.set_timer(RT_INTERVAL + 1) self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(sign * 370))) self.assertTrue(self.safety.toyota_tx_hook(self._torque_msg(sign * 380))) def test_torque_measurements(self): self.safety.toyota_rx_hook(self._torque_meas_msg(50)) self.safety.toyota_rx_hook(self._torque_meas_msg(-50)) self.safety.toyota_rx_hook(self._torque_meas_msg(0)) self.safety.toyota_rx_hook(self._torque_meas_msg(0)) self.safety.toyota_rx_hook(self._torque_meas_msg(0)) self.safety.toyota_rx_hook(self._torque_meas_msg(0)) self.assertEqual(-51, self.safety.get_toyota_torque_meas_min()) self.assertEqual(51, self.safety.get_toyota_torque_meas_max()) self.safety.toyota_rx_hook(self._torque_meas_msg(0)) self.assertEqual(-1, self.safety.get_toyota_torque_meas_max()) self.assertEqual(-51, self.safety.get_toyota_torque_meas_min()) self.safety.toyota_rx_hook(self._torque_meas_msg(0)) self.assertEqual(-1, self.safety.get_toyota_torque_meas_max()) self.assertEqual(-1, self.safety.get_toyota_torque_meas_min()) def test_ipas_override(self): ## angle control is not active self.safety.set_controls_allowed(1) # 3 consecutive msgs where driver exceeds threshold but angle_control isn't active self.safety.set_controls_allowed(1) self._torque_driver_msg_array(IPAS_OVERRIDE_THRESHOLD + 1) self.assertTrue(self.safety.get_controls_allowed()) self._torque_driver_msg_array(-IPAS_OVERRIDE_THRESHOLD - 1) self.assertTrue(self.safety.get_controls_allowed()) # ipas state is override self.safety.toyota_ipas_rx_hook(self._ipas_state_msg(5)) self.assertTrue(self.safety.get_controls_allowed()) ## now angle control is active self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(0, 0)) self.safety.toyota_ipas_rx_hook(self._ipas_state_msg(0)) # 3 consecutive msgs where driver does exceed threshold self.safety.set_controls_allowed(1) self._torque_driver_msg_array(IPAS_OVERRIDE_THRESHOLD + 1) self.assertFalse(self.safety.get_controls_allowed()) self.safety.set_controls_allowed(1) self._torque_driver_msg_array(-IPAS_OVERRIDE_THRESHOLD - 1) self.assertFalse(self.safety.get_controls_allowed()) # ipas state is override and torque isn't overriding any more self.safety.set_controls_allowed(1) self._torque_driver_msg_array(0) self.safety.toyota_ipas_rx_hook(self._ipas_state_msg(5)) self.assertFalse(self.safety.get_controls_allowed()) # 3 consecutive msgs where driver does not exceed threshold and # ipas state is not override self.safety.set_controls_allowed(1) self.safety.toyota_ipas_rx_hook(self._ipas_state_msg(0)) self.assertTrue(self.safety.get_controls_allowed()) self._torque_driver_msg_array(IPAS_OVERRIDE_THRESHOLD) self.assertTrue(self.safety.get_controls_allowed()) self._torque_driver_msg_array(-IPAS_OVERRIDE_THRESHOLD) self.assertTrue(self.safety.get_controls_allowed()) # reset no angle control at the end of the test self.safety.reset_angle_control() def test_angle_cmd_when_disabled(self): self.safety.set_controls_allowed(0) # test angle cmd too far from actual angle_refs = [-10, 10] deltas = range(-2, 3) expected_results = [False, True, True, True, False] for a in angle_refs: self._angle_meas_msg_array(a) for i, d in enumerate(deltas): self.assertEqual(expected_results[i], self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a + d, 1))) # test ipas state cmd enabled self._angle_meas_msg_array(0) self.assertEqual(0, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(0, 3))) # reset no angle control at the end of the test self.safety.reset_angle_control() def test_angle_cmd_when_enabled(self): # ipas angle cmd should pass through when controls are enabled self.safety.set_controls_allowed(1) self._angle_meas_msg_array(0) self.safety.toyota_ipas_rx_hook(self._speed_msg(0.1)) self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(0, 1))) self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(4, 1))) self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(0, 3))) self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(-4, 3))) self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(-8, 3))) # reset no angle control at the end of the test self.safety.reset_angle_control() def test_angle_cmd_rate_when_disabled(self): # as long as the command is close to the measured, no rate limit is enforced when # controls are disabled self.safety.set_controls_allowed(0) self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(0)) self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(0, 1))) self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(100)) self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(100, 1))) self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(-100)) self.assertEqual(1, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(-100, 1))) # reset no angle control at the end of the test self.safety.reset_angle_control() def test_angle_cmd_rate_when_enabled(self): # when controls are allowed, angle cmd rate limit is enforced # test 1: no limitations if we stay within limits speeds = [0., 1., 5., 10., 15., 100.] angles = [-300, -100, -10, 0, 10, 100, 300] for a in angles: for s in speeds: # first test against false positives self._angle_meas_msg_array(a) self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a, 1)) self.safety.set_controls_allowed(1) self.safety.toyota_ipas_rx_hook(self._speed_msg(s)) max_delta_up = int(np.interp(s, ANGLE_DELTA_BP, ANGLE_DELTA_V) * 2 / 3. + 1.) max_delta_down = int(np.interp(s, ANGLE_DELTA_BP, ANGLE_DELTA_VU) * 2 / 3. + 1.) self.assertEqual(True, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a + sign(a) * max_delta_up, 1))) self.assertTrue(self.safety.get_controls_allowed()) self.assertEqual(True, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a, 1))) self.assertTrue(self.safety.get_controls_allowed()) self.assertEqual(True, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a - sign(a) * max_delta_down, 1))) self.assertTrue(self.safety.get_controls_allowed()) # now inject too high rates self.assertEqual(False, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a + sign(a) * (max_delta_up + 1), 1))) self.assertFalse(self.safety.get_controls_allowed()) self.safety.set_controls_allowed(1) self.assertEqual(True, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a + sign(a) * max_delta_up, 1))) self.assertTrue(self.safety.get_controls_allowed()) self.assertEqual(True, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a, 1))) self.assertTrue(self.safety.get_controls_allowed()) self.assertEqual(False, self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a - sign(a) * (max_delta_down + 1), 1))) self.assertFalse(self.safety.get_controls_allowed()) # reset no angle control at the end of the test self.safety.reset_angle_control() def test_angle_measured_rate(self): speeds = [0., 1., 5., 10., 15., 100.] angles = [-300, -100, -10, 0, 10, 100, 300] angles = [10] for a in angles: for s in speeds: self._angle_meas_msg_array(a) self.safety.toyota_ipas_tx_hook(self._ipas_control_msg(a, 1)) self.safety.set_controls_allowed(1) self.safety.toyota_ipas_rx_hook(self._speed_msg(s)) max_delta_up = int(np.interp(s, ANGLE_DELTA_BP, ANGLE_DELTA_V) * 2 / 3. + 1.) max_delta_down = int(np.interp(s, ANGLE_DELTA_BP, ANGLE_DELTA_VU) * 2 / 3. + 1.) self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(a)) self.assertTrue(self.safety.get_controls_allowed()) self.safety.toyota_ipas_rx_hook(self._angle_meas_msg(a + 150)) self.assertFalse(self.safety.get_controls_allowed()) # reset no angle control at the end of the test self.safety.reset_angle_control() if __name__ == "__main__": unittest.main()