You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							153 lines
						
					
					
						
							5.4 KiB
						
					
					
				
			
		
		
	
	
							153 lines
						
					
					
						
							5.4 KiB
						
					
					
				| from cereal import car
 | |
| from openpilot.common.conversions import Conversions as CV
 | |
| from opendbc.can.can_define import CANDefine
 | |
| from opendbc.can.parser import CANParser
 | |
| from openpilot.selfdrive.car.interfaces import CarStateBase
 | |
| from openpilot.selfdrive.car.mazda.values import DBC, LKAS_LIMITS, MazdaFlags
 | |
| 
 | |
| class CarState(CarStateBase):
 | |
|   def __init__(self, CP):
 | |
|     super().__init__(CP)
 | |
| 
 | |
|     can_define = CANDefine(DBC[CP.carFingerprint]["pt"])
 | |
|     self.shifter_values = can_define.dv["GEAR"]["GEAR"]
 | |
| 
 | |
|     self.crz_btns_counter = 0
 | |
|     self.acc_active_last = False
 | |
|     self.low_speed_alert = False
 | |
|     self.lkas_allowed_speed = False
 | |
|     self.lkas_disabled = False
 | |
| 
 | |
|     self.prev_distance_button = 0
 | |
|     self.distance_button = 0
 | |
| 
 | |
|   def update(self, cp, cp_cam):
 | |
| 
 | |
|     ret = car.CarState.new_message()
 | |
| 
 | |
|     self.prev_distance_button = self.distance_button
 | |
|     self.distance_button = cp.vl["CRZ_BTNS"]["DISTANCE_LESS"]
 | |
| 
 | |
|     ret.wheelSpeeds = self.get_wheel_speeds(
 | |
|       cp.vl["WHEEL_SPEEDS"]["FL"],
 | |
|       cp.vl["WHEEL_SPEEDS"]["FR"],
 | |
|       cp.vl["WHEEL_SPEEDS"]["RL"],
 | |
|       cp.vl["WHEEL_SPEEDS"]["RR"],
 | |
|     )
 | |
|     ret.vEgoRaw = (ret.wheelSpeeds.fl + ret.wheelSpeeds.fr + ret.wheelSpeeds.rl + ret.wheelSpeeds.rr) / 4.
 | |
|     ret.vEgo, ret.aEgo = self.update_speed_kf(ret.vEgoRaw)
 | |
| 
 | |
|     # Match panda speed reading
 | |
|     speed_kph = cp.vl["ENGINE_DATA"]["SPEED"]
 | |
|     ret.standstill = speed_kph <= .1
 | |
| 
 | |
|     can_gear = int(cp.vl["GEAR"]["GEAR"])
 | |
|     ret.gearShifter = self.parse_gear_shifter(self.shifter_values.get(can_gear, None))
 | |
| 
 | |
|     ret.genericToggle = bool(cp.vl["BLINK_INFO"]["HIGH_BEAMS"])
 | |
|     ret.leftBlindspot = cp.vl["BSM"]["LEFT_BS_STATUS"] != 0
 | |
|     ret.rightBlindspot = cp.vl["BSM"]["RIGHT_BS_STATUS"] != 0
 | |
|     ret.leftBlinker, ret.rightBlinker = self.update_blinker_from_lamp(40, cp.vl["BLINK_INFO"]["LEFT_BLINK"] == 1,
 | |
|                                                                       cp.vl["BLINK_INFO"]["RIGHT_BLINK"] == 1)
 | |
| 
 | |
|     ret.steeringAngleDeg = cp.vl["STEER"]["STEER_ANGLE"]
 | |
|     ret.steeringTorque = cp.vl["STEER_TORQUE"]["STEER_TORQUE_SENSOR"]
 | |
|     ret.steeringPressed = abs(ret.steeringTorque) > LKAS_LIMITS.STEER_THRESHOLD
 | |
| 
 | |
|     ret.steeringTorqueEps = cp.vl["STEER_TORQUE"]["STEER_TORQUE_MOTOR"]
 | |
|     ret.steeringRateDeg = cp.vl["STEER_RATE"]["STEER_ANGLE_RATE"]
 | |
| 
 | |
|     # TODO: this should be from 0 - 1.
 | |
|     ret.brakePressed = cp.vl["PEDALS"]["BRAKE_ON"] == 1
 | |
|     ret.brake = cp.vl["BRAKE"]["BRAKE_PRESSURE"]
 | |
| 
 | |
|     ret.seatbeltUnlatched = cp.vl["SEATBELT"]["DRIVER_SEATBELT"] == 0
 | |
|     ret.doorOpen = any([cp.vl["DOORS"]["FL"], cp.vl["DOORS"]["FR"],
 | |
|                         cp.vl["DOORS"]["BL"], cp.vl["DOORS"]["BR"]])
 | |
| 
 | |
|     # TODO: this should be from 0 - 1.
 | |
|     ret.gas = cp.vl["ENGINE_DATA"]["PEDAL_GAS"]
 | |
|     ret.gasPressed = ret.gas > 0
 | |
| 
 | |
|     # Either due to low speed or hands off
 | |
|     lkas_blocked = cp.vl["STEER_RATE"]["LKAS_BLOCK"] == 1
 | |
| 
 | |
|     if self.CP.minSteerSpeed > 0:
 | |
|       # LKAS is enabled at 52kph going up and disabled at 45kph going down
 | |
|       # wait for LKAS_BLOCK signal to clear when going up since it lags behind the speed sometimes
 | |
|       if speed_kph > LKAS_LIMITS.ENABLE_SPEED and not lkas_blocked:
 | |
|         self.lkas_allowed_speed = True
 | |
|       elif speed_kph < LKAS_LIMITS.DISABLE_SPEED:
 | |
|         self.lkas_allowed_speed = False
 | |
|     else:
 | |
|       self.lkas_allowed_speed = True
 | |
| 
 | |
|     # TODO: the signal used for available seems to be the adaptive cruise signal, instead of the main on
 | |
|     #       it should be used for carState.cruiseState.nonAdaptive instead
 | |
|     ret.cruiseState.available = cp.vl["CRZ_CTRL"]["CRZ_AVAILABLE"] == 1
 | |
|     ret.cruiseState.enabled = cp.vl["CRZ_CTRL"]["CRZ_ACTIVE"] == 1
 | |
|     ret.cruiseState.standstill = cp.vl["PEDALS"]["STANDSTILL"] == 1
 | |
|     ret.cruiseState.speed = cp.vl["CRZ_EVENTS"]["CRZ_SPEED"] * CV.KPH_TO_MS
 | |
| 
 | |
|     if ret.cruiseState.enabled:
 | |
|       if not self.lkas_allowed_speed and self.acc_active_last:
 | |
|         self.low_speed_alert = True
 | |
|       else:
 | |
|         self.low_speed_alert = False
 | |
| 
 | |
|     # Check if LKAS is disabled due to lack of driver torque when all other states indicate
 | |
|     # it should be enabled (steer lockout). Don't warn until we actually get lkas active
 | |
|     # and lose it again, i.e, after initial lkas activation
 | |
|     ret.steerFaultTemporary = self.lkas_allowed_speed and lkas_blocked
 | |
| 
 | |
|     self.acc_active_last = ret.cruiseState.enabled
 | |
| 
 | |
|     self.crz_btns_counter = cp.vl["CRZ_BTNS"]["CTR"]
 | |
| 
 | |
|     # camera signals
 | |
|     self.lkas_disabled = cp_cam.vl["CAM_LANEINFO"]["LANE_LINES"] == 0
 | |
|     self.cam_lkas = cp_cam.vl["CAM_LKAS"]
 | |
|     self.cam_laneinfo = cp_cam.vl["CAM_LANEINFO"]
 | |
|     ret.steerFaultPermanent = cp_cam.vl["CAM_LKAS"]["ERR_BIT_1"] == 1
 | |
| 
 | |
|     return ret
 | |
| 
 | |
|   @staticmethod
 | |
|   def get_can_parser(CP):
 | |
|     messages = [
 | |
|       # sig_address, frequency
 | |
|       ("BLINK_INFO", 10),
 | |
|       ("STEER", 67),
 | |
|       ("STEER_RATE", 83),
 | |
|       ("STEER_TORQUE", 83),
 | |
|       ("WHEEL_SPEEDS", 100),
 | |
|     ]
 | |
| 
 | |
|     if CP.flags & MazdaFlags.GEN1:
 | |
|       messages += [
 | |
|         ("ENGINE_DATA", 100),
 | |
|         ("CRZ_CTRL", 50),
 | |
|         ("CRZ_EVENTS", 50),
 | |
|         ("CRZ_BTNS", 10),
 | |
|         ("PEDALS", 50),
 | |
|         ("BRAKE", 50),
 | |
|         ("SEATBELT", 10),
 | |
|         ("DOORS", 10),
 | |
|         ("GEAR", 20),
 | |
|         ("BSM", 10),
 | |
|       ]
 | |
| 
 | |
|     return CANParser(DBC[CP.carFingerprint]["pt"], messages, 0)
 | |
| 
 | |
|   @staticmethod
 | |
|   def get_cam_can_parser(CP):
 | |
|     messages = []
 | |
| 
 | |
|     if CP.flags & MazdaFlags.GEN1:
 | |
|       messages += [
 | |
|         # sig_address, frequency
 | |
|         ("CAM_LANEINFO", 2),
 | |
|         ("CAM_LKAS", 16),
 | |
|       ]
 | |
| 
 | |
|     return CANParser(DBC[CP.carFingerprint]["pt"], messages, 2)
 | |
| 
 |