You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
127 lines
5.5 KiB
127 lines
5.5 KiB
import copy
|
|
from opendbc.can import CANDefine, CANParser
|
|
from opendbc.car import Bus, structs
|
|
from opendbc.car.common.conversions import Conversions as CV
|
|
from opendbc.car.interfaces import CarStateBase
|
|
from opendbc.car.tesla.values import DBC, CANBUS, GEAR_MAP, STEER_THRESHOLD, CAR
|
|
|
|
ButtonType = structs.CarState.ButtonEvent.Type
|
|
|
|
|
|
class CarState(CarStateBase):
|
|
def __init__(self, CP):
|
|
super().__init__(CP)
|
|
self.can_define = CANDefine(DBC[CP.carFingerprint][Bus.party])
|
|
self.shifter_values = self.can_define.dv["DI_systemStatus"]["DI_gear"]
|
|
|
|
self.autopark = False
|
|
self.autopark_prev = False
|
|
self.cruise_enabled_prev = False
|
|
|
|
self.hands_on_level = 0
|
|
self.das_control = None
|
|
|
|
def update_autopark_state(self, autopark_state: str, cruise_enabled: bool):
|
|
autopark_now = autopark_state in ("ACTIVE", "COMPLETE", "SELFPARK_STARTED")
|
|
if autopark_now and not self.autopark_prev and not self.cruise_enabled_prev:
|
|
self.autopark = True
|
|
if not autopark_now:
|
|
self.autopark = False
|
|
self.autopark_prev = autopark_now
|
|
self.cruise_enabled_prev = cruise_enabled
|
|
|
|
def update(self, can_parsers) -> structs.CarState:
|
|
cp_party = can_parsers[Bus.party]
|
|
cp_ap_party = can_parsers[Bus.ap_party]
|
|
ret = structs.CarState()
|
|
|
|
# Vehicle speed
|
|
ret.vEgoRaw = cp_party.vl["DI_speed"]["DI_vehicleSpeed"] * CV.KPH_TO_MS
|
|
ret.vEgo, ret.aEgo = self.update_speed_kf(ret.vEgoRaw)
|
|
|
|
# Gas pedal
|
|
ret.gasPressed = cp_party.vl["DI_systemStatus"]["DI_accelPedalPos"] > 0
|
|
|
|
# Brake pedal
|
|
ret.brake = 0
|
|
ret.brakePressed = cp_party.vl["IBST_status"]["IBST_driverBrakeApply"] == 2
|
|
|
|
# Steering wheel
|
|
epas_status = cp_party.vl["EPAS3S_sysStatus"]
|
|
self.hands_on_level = epas_status["EPAS3S_handsOnLevel"]
|
|
ret.steeringAngleDeg = -epas_status["EPAS3S_internalSAS"]
|
|
ret.steeringRateDeg = -cp_ap_party.vl["SCCM_steeringAngleSensor"]["SCCM_steeringAngleSpeed"]
|
|
ret.steeringTorque = -epas_status["EPAS3S_torsionBarTorque"]
|
|
|
|
# stock handsOnLevel uses >0.5 for 0.25s, but is too slow
|
|
ret.steeringPressed = self.update_steering_pressed(abs(ret.steeringTorque) > STEER_THRESHOLD, 5)
|
|
|
|
eac_status = self.can_define.dv["EPAS3S_sysStatus"]["EPAS3S_eacStatus"].get(int(epas_status["EPAS3S_eacStatus"]), None)
|
|
ret.steerFaultPermanent = eac_status == "EAC_FAULT"
|
|
ret.steerFaultTemporary = eac_status == "EAC_INHIBITED"
|
|
|
|
# FSD disengages using union of handsOnLevel (slow overrides) and high angle rate faults (fast overrides, high speed)
|
|
eac_error_code = self.can_define.dv["EPAS3S_sysStatus"]["EPAS3S_eacErrorCode"].get(int(epas_status["EPAS3S_eacErrorCode"]), None)
|
|
ret.steeringDisengage = self.hands_on_level >= 3 or (eac_status == "EAC_INHIBITED" and
|
|
eac_error_code == "EAC_ERROR_HIGH_ANGLE_RATE_SAFETY")
|
|
|
|
# Cruise state
|
|
cruise_state = self.can_define.dv["DI_state"]["DI_cruiseState"].get(int(cp_party.vl["DI_state"]["DI_cruiseState"]), None)
|
|
speed_units = self.can_define.dv["DI_state"]["DI_speedUnits"].get(int(cp_party.vl["DI_state"]["DI_speedUnits"]), None)
|
|
|
|
autopark_state = self.can_define.dv["DI_state"]["DI_autoparkState"].get(int(cp_party.vl["DI_state"]["DI_autoparkState"]), None)
|
|
cruise_enabled = cruise_state in ("ENABLED", "STANDSTILL", "OVERRIDE", "PRE_FAULT", "PRE_CANCEL")
|
|
self.update_autopark_state(autopark_state, cruise_enabled)
|
|
|
|
# Match panda safety cruise engaged logic
|
|
ret.cruiseState.enabled = cruise_enabled and not self.autopark
|
|
if speed_units == "KPH":
|
|
ret.cruiseState.speed = max(cp_party.vl["DI_state"]["DI_digitalSpeed"] * CV.KPH_TO_MS, 1e-3)
|
|
elif speed_units == "MPH":
|
|
ret.cruiseState.speed = max(cp_party.vl["DI_state"]["DI_digitalSpeed"] * CV.MPH_TO_MS, 1e-3)
|
|
ret.cruiseState.available = cruise_state == "STANDBY" or ret.cruiseState.enabled
|
|
ret.cruiseState.standstill = False # This needs to be false, since we can resume from stop without sending anything special
|
|
ret.standstill = cp_party.vl["ESP_B"]["ESP_vehicleStandstillSts"] == 1
|
|
ret.accFaulted = cruise_state == "FAULT"
|
|
|
|
# Gear
|
|
ret.gearShifter = GEAR_MAP[self.can_define.dv["DI_systemStatus"]["DI_gear"].get(int(cp_party.vl["DI_systemStatus"]["DI_gear"]), "DI_GEAR_INVALID")]
|
|
|
|
# Doors
|
|
ret.doorOpen = cp_party.vl["UI_warning"]["anyDoorOpen"] == 1
|
|
|
|
# Blinkers
|
|
ret.leftBlinker = cp_party.vl["UI_warning"]["leftBlinkerBlinking"] in (1, 2)
|
|
ret.rightBlinker = cp_party.vl["UI_warning"]["rightBlinkerBlinking"] in (1, 2)
|
|
|
|
# Seatbelt
|
|
ret.seatbeltUnlatched = cp_party.vl["UI_warning"]["buckleStatus"] != 1
|
|
|
|
# Blindspot
|
|
ret.leftBlindspot = cp_ap_party.vl["DAS_status"]["DAS_blindSpotRearLeft"] != 0
|
|
ret.rightBlindspot = cp_ap_party.vl["DAS_status"]["DAS_blindSpotRearRight"] != 0
|
|
|
|
# AEB
|
|
ret.stockAeb = cp_ap_party.vl["DAS_control"]["DAS_aebEvent"] == 1
|
|
|
|
# LKAS
|
|
ret.stockLkas = cp_ap_party.vl["DAS_steeringControl"]["DAS_steeringControlType"] == 2 # LANE_KEEP_ASSIST
|
|
|
|
# Stock Autosteer should be off (includes FSD)
|
|
if self.CP.carFingerprint in (CAR.TESLA_MODEL_3, CAR.TESLA_MODEL_Y):
|
|
ret.invalidLkasSetting = cp_ap_party.vl["DAS_settings"]["DAS_autosteerEnabled"] != 0
|
|
else:
|
|
pass
|
|
# Buttons # ToDo: add Gap adjust button
|
|
|
|
# Messages needed by carcontroller
|
|
self.das_control = copy.copy(cp_ap_party.vl["DAS_control"])
|
|
|
|
return ret
|
|
|
|
@staticmethod
|
|
def get_can_parsers(CP):
|
|
return {
|
|
Bus.party: CANParser(DBC[CP.carFingerprint][Bus.party], [], CANBUS.party),
|
|
Bus.ap_party: CANParser(DBC[CP.carFingerprint][Bus.party], [], CANBUS.autopilot_party)
|
|
}
|
|
|