You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					147 lines
				
				5.2 KiB
			
		
		
			
		
	
	
					147 lines
				
				5.2 KiB
			| 
								 
											3 weeks ago
										 
									 | 
							
								#!/usr/bin/env python3
							 | 
						||
| 
								 | 
							
								import unittest
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from opendbc.car.structs import CarParams
							 | 
						||
| 
								 | 
							
								from opendbc.safety.tests.libsafety import libsafety_py
							 | 
						||
| 
								 | 
							
								import opendbc.safety.tests.common as common
							 | 
						||
| 
								 | 
							
								from opendbc.safety.tests.common import CANPackerSafety
							 | 
						||
| 
								 | 
							
								from opendbc.car.rivian.values import RivianSafetyFlags
							 | 
						||
| 
								 | 
							
								from opendbc.car.rivian.riviancan import checksum as _checksum
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def checksum(msg):
							 | 
						||
| 
								 | 
							
								  addr, dat, bus = msg
							 | 
						||
| 
								 | 
							
								  ret = bytearray(dat)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  # ESP_Status
							 | 
						||
| 
								 | 
							
								  if addr == 0x208:
							 | 
						||
| 
								 | 
							
								    ret[0] = _checksum(ret[1:], 0x1D, 0xB1)
							 | 
						||
| 
								 | 
							
								  elif addr == 0x150:
							 | 
						||
| 
								 | 
							
								    ret[0] = _checksum(ret[1:], 0x1D, 0x9A)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  return addr, ret, bus
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class TestRivianSafetyBase(common.CarSafetyTest, common.DriverTorqueSteeringSafetyTest, common.LongitudinalAccelSafetyTest,
							 | 
						||
| 
								 | 
							
								                           common.VehicleSpeedSafetyTest):
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  TX_MSGS = [[0x120, 0], [0x321, 2], [0x162, 2]]
							 | 
						||
| 
								 | 
							
								  RELAY_MALFUNCTION_ADDRS = {0: (0x120,), 2: (0x321, 0x162)}
							 | 
						||
| 
								 | 
							
								  FWD_BLACKLISTED_ADDRS = {0: [0x321, 0x162], 2: [0x120]}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  MAX_TORQUE_LOOKUP = [9, 17], [350, 250]
							 | 
						||
| 
								 | 
							
								  DYNAMIC_MAX_TORQUE = True
							 | 
						||
| 
								 | 
							
								  MAX_RATE_UP = 3
							 | 
						||
| 
								 | 
							
								  MAX_RATE_DOWN = 5
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  MAX_RT_DELTA = 125
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  DRIVER_TORQUE_ALLOWANCE = 100
							 | 
						||
| 
								 | 
							
								  DRIVER_TORQUE_FACTOR = 2
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  cnt_speed = 0
							 | 
						||
| 
								 | 
							
								  cnt_speed_2 = 0
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _torque_driver_msg(self, torque):
							 | 
						||
| 
								 | 
							
								    values = {"EPAS_TorsionBarTorque": torque / 100.0}
							 | 
						||
| 
								 | 
							
								    return self.packer.make_can_msg_safety("EPAS_SystemStatus", 0, values)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _torque_cmd_msg(self, torque, steer_req=1):
							 | 
						||
| 
								 | 
							
								    values = {"ACM_lkaStrToqReq": torque, "ACM_lkaActToi": steer_req}
							 | 
						||
| 
								 | 
							
								    return self.packer.make_can_msg_safety("ACM_lkaHbaCmd", 0, values)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _speed_msg(self, speed, quality_flag=True):
							 | 
						||
| 
								 | 
							
								    values = {"ESP_Vehicle_Speed": speed * 3.6, "ESP_Status_Counter": self.cnt_speed % 15,
							 | 
						||
| 
								 | 
							
								              "ESP_Vehicle_Speed_Q": 1 if quality_flag else 0}
							 | 
						||
| 
								 | 
							
								    self.__class__.cnt_speed += 1
							 | 
						||
| 
								 | 
							
								    return self.packer.make_can_msg_safety("ESP_Status", 0, values, fix_checksum=checksum)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _speed_msg_2(self, speed, quality_flag=True):
							 | 
						||
| 
								 | 
							
								    # Rivian has a dynamic max torque limit based on speed, so it checks two sources
							 | 
						||
| 
								 | 
							
								    return self._user_gas_msg(0, speed, quality_flag)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _user_brake_msg(self, brake):
							 | 
						||
| 
								 | 
							
								    values = {"iBESP2_BrakePedalApplied": brake}
							 | 
						||
| 
								 | 
							
								    return self.packer.make_can_msg_safety("iBESP2", 0, values)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _user_gas_msg(self, gas, speed=0, quality_flag=True):
							 | 
						||
| 
								 | 
							
								    values = {"VDM_AcceleratorPedalPosition": gas, "VDM_VehicleSpeed": speed * 3.6,
							 | 
						||
| 
								 | 
							
								              "VDM_PropStatus_Counter": self.cnt_speed_2 % 15, "VDM_VehicleSpeedQ": 1 if quality_flag else 0}
							 | 
						||
| 
								 | 
							
								    self.__class__.cnt_speed_2 += 1
							 | 
						||
| 
								 | 
							
								    return self.packer.make_can_msg_safety("VDM_PropStatus", 0, values, fix_checksum=checksum)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _pcm_status_msg(self, enable):
							 | 
						||
| 
								 | 
							
								    values = {"ACM_FeatureStatus": enable, "ACM_Unkown1": 1}
							 | 
						||
| 
								 | 
							
								    return self.packer.make_can_msg_safety("ACM_Status", 2, values)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def _accel_msg(self, accel: float):
							 | 
						||
| 
								 | 
							
								    values = {"ACM_AccelerationRequest": accel}
							 | 
						||
| 
								 | 
							
								    return self.packer.make_can_msg_safety("ACM_longitudinalRequest", 0, values)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def test_wheel_touch(self):
							 | 
						||
| 
								 | 
							
								    # For hiding hold wheel alert on engage
							 | 
						||
| 
								 | 
							
								    for controls_allowed in (True, False):
							 | 
						||
| 
								 | 
							
								      self.safety.set_controls_allowed(controls_allowed)
							 | 
						||
| 
								 | 
							
								      values = {
							 | 
						||
| 
								 | 
							
								        "SCCM_WheelTouch_HandsOn": 1 if controls_allowed else 0,
							 | 
						||
| 
								 | 
							
								        "SCCM_WheelTouch_CapacitiveValue": 100 if controls_allowed else 0,
							 | 
						||
| 
								 | 
							
								        "SETME_X52": 100,
							 | 
						||
| 
								 | 
							
								      }
							 | 
						||
| 
								 | 
							
								      self.assertTrue(self._tx(self.packer.make_can_msg_safety("SCCM_WheelTouch", 2, values)))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def test_rx_hook(self):
							 | 
						||
| 
								 | 
							
								    # checksum, counter, and quality flag checks
							 | 
						||
| 
								 | 
							
								    for quality_flag in (True, False):
							 | 
						||
| 
								 | 
							
								      for msg_type in ("speed", "speed_2"):
							 | 
						||
| 
								 | 
							
								        self.safety.set_controls_allowed(True)
							 | 
						||
| 
								 | 
							
								        # send multiple times to verify counter checks
							 | 
						||
| 
								 | 
							
								        for _ in range(10):
							 | 
						||
| 
								 | 
							
								          if msg_type == "speed":
							 | 
						||
| 
								 | 
							
								            msg = self._speed_msg(0, quality_flag=quality_flag)
							 | 
						||
| 
								 | 
							
								          elif msg_type == "speed_2":
							 | 
						||
| 
								 | 
							
								            msg = self._speed_msg_2(0, quality_flag=quality_flag)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								          self.assertEqual(quality_flag, self._rx(msg))
							 | 
						||
| 
								 | 
							
								          self.assertEqual(quality_flag, self.safety.get_controls_allowed())
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # Mess with checksum to make it fail
							 | 
						||
| 
								 | 
							
								        msg[0].data[0] = 0xff
							 | 
						||
| 
								 | 
							
								        self.assertFalse(self._rx(msg))
							 | 
						||
| 
								 | 
							
								        self.assertFalse(self.safety.get_controls_allowed())
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class TestRivianStockSafety(TestRivianSafetyBase):
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  LONGITUDINAL = False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def setUp(self):
							 | 
						||
| 
								 | 
							
								    self.packer = CANPackerSafety("rivian_primary_actuator")
							 | 
						||
| 
								 | 
							
								    self.safety = libsafety_py.libsafety
							 | 
						||
| 
								 | 
							
								    self.safety.set_safety_hooks(CarParams.SafetyModel.rivian, 0)
							 | 
						||
| 
								 | 
							
								    self.safety.init_tests()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def test_adas_status(self):
							 | 
						||
| 
								 | 
							
								    # For canceling stock ACC
							 | 
						||
| 
								 | 
							
								    for controls_allowed in (True, False):
							 | 
						||
| 
								 | 
							
								      self.safety.set_controls_allowed(controls_allowed)
							 | 
						||
| 
								 | 
							
								      for interface_status in range(4):
							 | 
						||
| 
								 | 
							
								        values = {"VDM_AdasInterfaceStatus": interface_status}
							 | 
						||
| 
								 | 
							
								        self.assertTrue(self._tx(self.packer.make_can_msg_safety("VDM_AdasSts", 2, values)))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class TestRivianLongitudinalSafety(TestRivianSafetyBase):
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  TX_MSGS = [[0x120, 0], [0x321, 2], [0x160, 0]]
							 | 
						||
| 
								 | 
							
								  RELAY_MALFUNCTION_ADDRS = {0: (0x120, 0x160), 2: (0x321,)}
							 | 
						||
| 
								 | 
							
								  FWD_BLACKLISTED_ADDRS = {0: [0x321], 2: [0x120, 0x160]}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def setUp(self):
							 | 
						||
| 
								 | 
							
								    self.packer = CANPackerSafety("rivian_primary_actuator")
							 | 
						||
| 
								 | 
							
								    self.safety = libsafety_py.libsafety
							 | 
						||
| 
								 | 
							
								    self.safety.set_safety_hooks(CarParams.SafetyModel.rivian, RivianSafetyFlags.LONG_CONTROL)
							 | 
						||
| 
								 | 
							
								    self.safety.init_tests()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								if __name__ == "__main__":
							 | 
						||
| 
								 | 
							
								  unittest.main()
							 |