from cereal import car from opendbc.can.packer import CANPacker from openpilot.selfdrive.car import DT_CTRL, apply_driver_steer_torque_limits from openpilot.selfdrive.car.gm import gmcan from openpilot.selfdrive.car.conversions import Conversions as CV from openpilot.selfdrive.car.structs import CarParams from openpilot.selfdrive.car.gm.values import DBC, CanBus, CarControllerParams, CruiseButtons from openpilot.selfdrive.car.helpers import interp from openpilot.selfdrive.car.interfaces import CarControllerBase VisualAlert = car.CarControl.HUDControl.VisualAlert NetworkLocation = CarParams.NetworkLocation LongCtrlState = car.CarControl.Actuators.LongControlState # Camera cancels up to 0.1s after brake is pressed, ECM allows 0.5s CAMERA_CANCEL_DELAY_FRAMES = 10 # Enforce a minimum interval between steering messages to avoid a fault MIN_STEER_MSG_INTERVAL_MS = 15 class CarController(CarControllerBase): def __init__(self, dbc_name, CP): super().__init__(dbc_name, CP) self.start_time = 0. self.apply_steer_last = 0 self.apply_gas = 0 self.apply_brake = 0 self.last_steer_frame = 0 self.last_button_frame = 0 self.cancel_counter = 0 self.lka_steering_cmd_counter = 0 self.lka_icon_status_last = (False, False) self.params = CarControllerParams(self.CP) self.packer_pt = CANPacker(DBC[self.CP.carFingerprint]['pt']) self.packer_obj = CANPacker(DBC[self.CP.carFingerprint]['radar']) self.packer_ch = CANPacker(DBC[self.CP.carFingerprint]['chassis']) def update(self, CC, CS, now_nanos): actuators = CC.actuators hud_control = CC.hudControl hud_alert = hud_control.visualAlert hud_v_cruise = hud_control.setSpeed if hud_v_cruise > 70: hud_v_cruise = 0 # Send CAN commands. can_sends = [] # Steering (Active: 50Hz, inactive: 10Hz) steer_step = self.params.STEER_STEP if CC.latActive else self.params.INACTIVE_STEER_STEP if self.CP.networkLocation == NetworkLocation.fwdCamera: # Also send at 50Hz: # - on startup, first few msgs are blocked # - until we're in sync with camera so counters align when relay closes, preventing a fault. # openpilot can subtly drift, so this is activated throughout a drive to stay synced out_of_sync = self.lka_steering_cmd_counter % 4 != (CS.cam_lka_steering_cmd_counter + 1) % 4 if CS.loopback_lka_steering_cmd_ts_nanos == 0 or out_of_sync: steer_step = self.params.STEER_STEP self.lka_steering_cmd_counter += 1 if CS.loopback_lka_steering_cmd_updated else 0 # Avoid GM EPS faults when transmitting messages too close together: skip this transmit if we # received the ASCMLKASteeringCmd loopback confirmation too recently last_lka_steer_msg_ms = (now_nanos - CS.loopback_lka_steering_cmd_ts_nanos) * 1e-6 if (self.frame - self.last_steer_frame) >= steer_step and last_lka_steer_msg_ms > MIN_STEER_MSG_INTERVAL_MS: # Initialize ASCMLKASteeringCmd counter using the camera until we get a msg on the bus if CS.loopback_lka_steering_cmd_ts_nanos == 0: self.lka_steering_cmd_counter = CS.pt_lka_steering_cmd_counter + 1 if CC.latActive: new_steer = int(round(actuators.steer * self.params.STEER_MAX)) apply_steer = apply_driver_steer_torque_limits(new_steer, self.apply_steer_last, CS.out.steeringTorque, self.params) else: apply_steer = 0 self.last_steer_frame = self.frame self.apply_steer_last = apply_steer idx = self.lka_steering_cmd_counter % 4 can_sends.append(gmcan.create_steering_control(self.packer_pt, CanBus.POWERTRAIN, apply_steer, idx, CC.latActive)) if self.CP.openpilotLongitudinalControl: # Gas/regen, brakes, and UI commands - all at 25Hz if self.frame % 4 == 0: stopping = actuators.longControlState == LongCtrlState.stopping if not CC.longActive: # ASCM sends max regen when not enabled self.apply_gas = self.params.INACTIVE_REGEN self.apply_brake = 0 else: self.apply_gas = int(round(interp(actuators.accel, self.params.GAS_LOOKUP_BP, self.params.GAS_LOOKUP_V))) self.apply_brake = int(round(interp(actuators.accel, self.params.BRAKE_LOOKUP_BP, self.params.BRAKE_LOOKUP_V))) # Don't allow any gas above inactive regen while stopping # FIXME: brakes aren't applied immediately when enabling at a stop if stopping: self.apply_gas = self.params.INACTIVE_REGEN idx = (self.frame // 4) % 4 at_full_stop = CC.longActive and CS.out.standstill near_stop = CC.longActive and (CS.out.vEgo < self.params.NEAR_STOP_BRAKE_PHASE) friction_brake_bus = CanBus.CHASSIS # GM Camera exceptions # TODO: can we always check the longControlState? if self.CP.networkLocation == NetworkLocation.fwdCamera: at_full_stop = at_full_stop and stopping friction_brake_bus = CanBus.POWERTRAIN # GasRegenCmdActive needs to be 1 to avoid cruise faults. It describes the ACC state, not actuation can_sends.append(gmcan.create_gas_regen_command(self.packer_pt, CanBus.POWERTRAIN, self.apply_gas, idx, CC.enabled, at_full_stop)) can_sends.append(gmcan.create_friction_brake_command(self.packer_ch, friction_brake_bus, self.apply_brake, idx, CC.enabled, near_stop, at_full_stop, self.CP)) # Send dashboard UI commands (ACC status) send_fcw = hud_alert == VisualAlert.fcw can_sends.append(gmcan.create_acc_dashboard_command(self.packer_pt, CanBus.POWERTRAIN, CC.enabled, hud_v_cruise * CV.MS_TO_KPH, hud_control, send_fcw)) # Radar needs to know current speed and yaw rate (50hz), # and that ADAS is alive (10hz) if not self.CP.radarUnavailable: tt = self.frame * DT_CTRL time_and_headlights_step = 10 if self.frame % time_and_headlights_step == 0: idx = (self.frame // time_and_headlights_step) % 4 can_sends.append(gmcan.create_adas_time_status(CanBus.OBSTACLE, int((tt - self.start_time) * 60), idx)) can_sends.append(gmcan.create_adas_headlights_status(self.packer_obj, CanBus.OBSTACLE)) speed_and_accelerometer_step = 2 if self.frame % speed_and_accelerometer_step == 0: idx = (self.frame // speed_and_accelerometer_step) % 4 can_sends.append(gmcan.create_adas_steering_status(CanBus.OBSTACLE, idx)) can_sends.append(gmcan.create_adas_accelerometer_speed_status(CanBus.OBSTACLE, CS.out.vEgo, idx)) if self.CP.networkLocation == NetworkLocation.gateway and self.frame % self.params.ADAS_KEEPALIVE_STEP == 0: can_sends += gmcan.create_adas_keepalive(CanBus.POWERTRAIN) else: # While car is braking, cancel button causes ECM to enter a soft disable state with a fault status. # A delayed cancellation allows camera to cancel and avoids a fault when user depresses brake quickly self.cancel_counter = self.cancel_counter + 1 if CC.cruiseControl.cancel else 0 # Stock longitudinal, integrated at camera if (self.frame - self.last_button_frame) * DT_CTRL > 0.04: if self.cancel_counter > CAMERA_CANCEL_DELAY_FRAMES: self.last_button_frame = self.frame can_sends.append(gmcan.create_buttons(self.packer_pt, CanBus.CAMERA, CS.buttons_counter, CruiseButtons.CANCEL)) if self.CP.networkLocation == NetworkLocation.fwdCamera: # Silence "Take Steering" alert sent by camera, forward PSCMStatus with HandsOffSWlDetectionStatus=1 if self.frame % 10 == 0: can_sends.append(gmcan.create_pscm_status(self.packer_pt, CanBus.CAMERA, CS.pscm_status)) new_actuators = actuators.as_builder() new_actuators.steer = self.apply_steer_last / self.params.STEER_MAX new_actuators.steerOutputCan = self.apply_steer_last new_actuators.gas = self.apply_gas new_actuators.brake = self.apply_brake self.frame += 1 return new_actuators, can_sends