from cereal import car from common.numpy_fast import mean from common.filter_simple import FirstOrderFilter from common.realtime import DT_CTRL from opendbc.can.can_define import CANDefine from selfdrive.car.interfaces import CarStateBase from opendbc.can.parser import CANParser from selfdrive.config import Conversions as CV from selfdrive.car.toyota.values import CAR, DBC, STEER_THRESHOLD, NO_STOP_TIMER_CAR, TSS2_CAR class CarState(CarStateBase): def __init__(self, CP): super().__init__(CP) can_define = CANDefine(DBC[CP.carFingerprint]["pt"]) self.shifter_values = can_define.dv["GEAR_PACKET"]["GEAR"] # On cars with cp.vl["STEER_TORQUE_SENSOR"]["STEER_ANGLE"] # the signal is zeroed to where the steering angle is at start. # Need to apply an offset as soon as the steering angle measurements are both received self.needs_angle_offset = True self.accurate_steer_angle_seen = False self.angle_offset = FirstOrderFilter(None, 60.0, DT_CTRL, initialized=False) self.low_speed_lockout = False self.acc_type = 1 def update(self, cp, cp_cam): ret = car.CarState.new_message() ret.doorOpen = any([cp.vl["SEATS_DOORS"]["DOOR_OPEN_FL"], cp.vl["SEATS_DOORS"]["DOOR_OPEN_FR"], cp.vl["SEATS_DOORS"]["DOOR_OPEN_RL"], cp.vl["SEATS_DOORS"]["DOOR_OPEN_RR"]]) ret.seatbeltUnlatched = cp.vl["SEATS_DOORS"]["SEATBELT_DRIVER_UNLATCHED"] != 0 ret.brakePressed = cp.vl["BRAKE_MODULE"]["BRAKE_PRESSED"] != 0 ret.brakeHoldActive = cp.vl["ESP_CONTROL"]["BRAKE_HOLD_ACTIVE"] == 1 if self.CP.enableGasInterceptor: ret.gas = (cp.vl["GAS_SENSOR"]["INTERCEPTOR_GAS"] + cp.vl["GAS_SENSOR"]["INTERCEPTOR_GAS2"]) / 2. ret.gasPressed = ret.gas > 15 else: ret.gas = cp.vl["GAS_PEDAL"]["GAS_PEDAL"] ret.gasPressed = cp.vl["PCM_CRUISE"]["GAS_RELEASED"] == 0 ret.wheelSpeeds.fl = cp.vl["WHEEL_SPEEDS"]["WHEEL_SPEED_FL"] * CV.KPH_TO_MS ret.wheelSpeeds.fr = cp.vl["WHEEL_SPEEDS"]["WHEEL_SPEED_FR"] * CV.KPH_TO_MS ret.wheelSpeeds.rl = cp.vl["WHEEL_SPEEDS"]["WHEEL_SPEED_RL"] * CV.KPH_TO_MS ret.wheelSpeeds.rr = cp.vl["WHEEL_SPEEDS"]["WHEEL_SPEED_RR"] * CV.KPH_TO_MS ret.vEgoRaw = mean([ret.wheelSpeeds.fl, ret.wheelSpeeds.fr, ret.wheelSpeeds.rl, ret.wheelSpeeds.rr]) ret.vEgo, ret.aEgo = self.update_speed_kf(ret.vEgoRaw) ret.standstill = ret.vEgoRaw < 0.001 ret.steeringAngleDeg = cp.vl["STEER_ANGLE_SENSOR"]["STEER_ANGLE"] + cp.vl["STEER_ANGLE_SENSOR"]["STEER_FRACTION"] torque_sensor_angle_deg = cp.vl["STEER_TORQUE_SENSOR"]["STEER_ANGLE"] # Some newer models have a more accurate angle measurement in the TORQUE_SENSOR message. Use if non-zero if abs(torque_sensor_angle_deg) > 1e-3: self.accurate_steer_angle_seen = True if self.accurate_steer_angle_seen: # Offset seems to be invalid for large steering angles if abs(ret.steeringAngleDeg) < 90: self.angle_offset.update(torque_sensor_angle_deg - ret.steeringAngleDeg) if self.angle_offset.initialized: ret.steeringAngleOffsetDeg = self.angle_offset.x ret.steeringAngleDeg = torque_sensor_angle_deg - self.angle_offset.x ret.steeringRateDeg = cp.vl["STEER_ANGLE_SENSOR"]["STEER_RATE"] can_gear = int(cp.vl["GEAR_PACKET"]["GEAR"]) ret.gearShifter = self.parse_gear_shifter(self.shifter_values.get(can_gear, None)) ret.leftBlinker = cp.vl["STEERING_LEVERS"]["TURN_SIGNALS"] == 1 ret.rightBlinker = cp.vl["STEERING_LEVERS"]["TURN_SIGNALS"] == 2 ret.steeringTorque = cp.vl["STEER_TORQUE_SENSOR"]["STEER_TORQUE_DRIVER"] ret.steeringTorqueEps = cp.vl["STEER_TORQUE_SENSOR"]["STEER_TORQUE_EPS"] # we could use the override bit from dbc, but it's triggered at too high torque values ret.steeringPressed = abs(ret.steeringTorque) > STEER_THRESHOLD ret.steerWarning = cp.vl["EPS_STATUS"]["LKA_STATE"] not in [1, 5] if self.CP.carFingerprint == CAR.LEXUS_IS: ret.cruiseState.available = cp.vl["DSU_CRUISE"]["MAIN_ON"] != 0 ret.cruiseState.speed = cp.vl["DSU_CRUISE"]["SET_SPEED"] * CV.KPH_TO_MS else: ret.cruiseState.available = cp.vl["PCM_CRUISE_2"]["MAIN_ON"] != 0 ret.cruiseState.speed = cp.vl["PCM_CRUISE_2"]["SET_SPEED"] * CV.KPH_TO_MS if self.CP.carFingerprint in TSS2_CAR: self.acc_type = cp_cam.vl["ACC_CONTROL"]["ACC_TYPE"] # some TSS2 cars have low speed lockout permanently set, so ignore on those cars # these cars are identified by an ACC_TYPE value of 2. # TODO: it is possible to avoid the lockout and gain stop and go if you # send your own ACC_CONTROL msg on startup with ACC_TYPE set to 1 if (self.CP.carFingerprint not in TSS2_CAR and self.CP.carFingerprint != CAR.LEXUS_IS) or \ (self.CP.carFingerprint in TSS2_CAR and self.acc_type == 1): self.low_speed_lockout = cp.vl["PCM_CRUISE_2"]["LOW_SPEED_LOCKOUT"] == 2 self.pcm_acc_status = cp.vl["PCM_CRUISE"]["CRUISE_STATE"] if self.CP.carFingerprint in NO_STOP_TIMER_CAR or self.CP.enableGasInterceptor: # ignore standstill in hybrid vehicles, since pcm allows to restart without # receiving any special command. Also if interceptor is detected ret.cruiseState.standstill = False else: ret.cruiseState.standstill = self.pcm_acc_status == 7 ret.cruiseState.enabled = bool(cp.vl["PCM_CRUISE"]["CRUISE_ACTIVE"]) ret.cruiseState.nonAdaptive = cp.vl["PCM_CRUISE"]["CRUISE_STATE"] in [1, 2, 3, 4, 5, 6] ret.genericToggle = bool(cp.vl["LIGHT_STALK"]["AUTO_HIGH_BEAM"]) ret.stockAeb = bool(cp_cam.vl["PRE_COLLISION"]["PRECOLLISION_ACTIVE"] and cp_cam.vl["PRE_COLLISION"]["FORCE"] < -1e-5) ret.espDisabled = cp.vl["ESP_CONTROL"]["TC_DISABLED"] != 0 # 2 is standby, 10 is active. TODO: check that everything else is really a faulty state self.steer_state = cp.vl["EPS_STATUS"]["LKA_STATE"] if self.CP.enableBsm: ret.leftBlindspot = (cp.vl["BSM"]["L_ADJACENT"] == 1) or (cp.vl["BSM"]["L_APPROACHING"] == 1) ret.rightBlindspot = (cp.vl["BSM"]["R_ADJACENT"] == 1) or (cp.vl["BSM"]["R_APPROACHING"] == 1) return ret @staticmethod def get_can_parser(CP): signals = [ # sig_name, sig_address, default ("STEER_ANGLE", "STEER_ANGLE_SENSOR", 0), ("GEAR", "GEAR_PACKET", 0), ("BRAKE_PRESSED", "BRAKE_MODULE", 0), ("GAS_PEDAL", "GAS_PEDAL", 0), ("WHEEL_SPEED_FL", "WHEEL_SPEEDS", 0), ("WHEEL_SPEED_FR", "WHEEL_SPEEDS", 0), ("WHEEL_SPEED_RL", "WHEEL_SPEEDS", 0), ("WHEEL_SPEED_RR", "WHEEL_SPEEDS", 0), ("DOOR_OPEN_FL", "SEATS_DOORS", 1), ("DOOR_OPEN_FR", "SEATS_DOORS", 1), ("DOOR_OPEN_RL", "SEATS_DOORS", 1), ("DOOR_OPEN_RR", "SEATS_DOORS", 1), ("SEATBELT_DRIVER_UNLATCHED", "SEATS_DOORS", 1), ("TC_DISABLED", "ESP_CONTROL", 1), ("BRAKE_HOLD_ACTIVE", "ESP_CONTROL", 1), ("STEER_FRACTION", "STEER_ANGLE_SENSOR", 0), ("STEER_RATE", "STEER_ANGLE_SENSOR", 0), ("CRUISE_ACTIVE", "PCM_CRUISE", 0), ("CRUISE_STATE", "PCM_CRUISE", 0), ("GAS_RELEASED", "PCM_CRUISE", 1), ("STEER_TORQUE_DRIVER", "STEER_TORQUE_SENSOR", 0), ("STEER_TORQUE_EPS", "STEER_TORQUE_SENSOR", 0), ("STEER_ANGLE", "STEER_TORQUE_SENSOR", 0), ("TURN_SIGNALS", "STEERING_LEVERS", 3), # 3 is no blinkers ("LKA_STATE", "EPS_STATUS", 0), ("AUTO_HIGH_BEAM", "LIGHT_STALK", 0), ] checks = [ ("GEAR_PACKET", 1), ("LIGHT_STALK", 1), ("STEERING_LEVERS", 0.15), ("SEATS_DOORS", 3), ("ESP_CONTROL", 3), ("EPS_STATUS", 25), ("BRAKE_MODULE", 40), ("GAS_PEDAL", 33), ("WHEEL_SPEEDS", 80), ("STEER_ANGLE_SENSOR", 80), ("PCM_CRUISE", 33), ("STEER_TORQUE_SENSOR", 50), ] if CP.carFingerprint == CAR.LEXUS_IS: signals.append(("MAIN_ON", "DSU_CRUISE", 0)) signals.append(("SET_SPEED", "DSU_CRUISE", 0)) checks.append(("DSU_CRUISE", 5)) else: signals.append(("MAIN_ON", "PCM_CRUISE_2", 0)) signals.append(("SET_SPEED", "PCM_CRUISE_2", 0)) signals.append(("LOW_SPEED_LOCKOUT", "PCM_CRUISE_2", 0)) checks.append(("PCM_CRUISE_2", 33)) # add gas interceptor reading if we are using it if CP.enableGasInterceptor: signals.append(("INTERCEPTOR_GAS", "GAS_SENSOR", 0)) signals.append(("INTERCEPTOR_GAS2", "GAS_SENSOR", 0)) checks.append(("GAS_SENSOR", 50)) if CP.enableBsm: signals += [ ("L_ADJACENT", "BSM", 0), ("L_APPROACHING", "BSM", 0), ("R_ADJACENT", "BSM", 0), ("R_APPROACHING", "BSM", 0), ] checks += [ ("BSM", 1) ] return CANParser(DBC[CP.carFingerprint]["pt"], signals, checks, 0) @staticmethod def get_cam_can_parser(CP): signals = [ ("FORCE", "PRE_COLLISION", 0), ("PRECOLLISION_ACTIVE", "PRE_COLLISION", 0), ] # use steering message to check if panda is connected to frc checks = [ ("STEERING_LKA", 42), ("PRE_COLLISION", 0), # TODO: figure out why freq is inconsistent ] if CP.carFingerprint in TSS2_CAR: signals.append(("ACC_TYPE", "ACC_CONTROL", 0)) checks.append(("ACC_CONTROL", 33)) return CANParser(DBC[CP.carFingerprint]["pt"], signals, checks, 2)