#!/usr/bin/env python3 import math from typing import SupportsFloat from cereal import car, log import cereal.messaging as messaging from openpilot.common.conversions import Conversions as CV from openpilot.common.params import Params from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper from openpilot.common.swaglog import cloudlog from opendbc.car.car_helpers import get_car_interface from opendbc.car.vehicle_model import VehicleModel from openpilot.selfdrive.controls.lib.drive_helpers import clip_curvature from openpilot.selfdrive.controls.lib.latcontrol import LatControl, MIN_LATERAL_CONTROL_SPEED from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque from openpilot.selfdrive.controls.lib.longcontrol import LongControl from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose State = log.SelfdriveState.OpenpilotState LaneChangeState = log.LaneChangeState LaneChangeDirection = log.LaneChangeDirection ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys()) class Controls: def __init__(self) -> None: self.params = Params() cloudlog.info("controlsd is waiting for CarParams") self.CP = messaging.log_from_bytes(self.params.get("CarParams", block=True), car.CarParams) cloudlog.info("controlsd got CarParams") self.CI = get_car_interface(self.CP) self.sm = messaging.SubMaster(['liveParameters', 'liveTorqueParameters', 'modelV2', 'selfdriveState', 'liveCalibration', 'livePose', 'longitudinalPlan', 'carState', 'carOutput', 'driverMonitoringState', 'onroadEvents', 'driverAssistance'], poll='selfdriveState') self.pm = messaging.PubMaster(['carControl', 'controlsState']) self.steer_limited_by_controls = False self.desired_curvature = 0.0 self.pose_calibrator = PoseCalibrator() self.calibrated_pose: Pose | None = None self.LoC = LongControl(self.CP) self.VM = VehicleModel(self.CP) self.LaC: LatControl if self.CP.steerControlType == car.CarParams.SteerControlType.angle: self.LaC = LatControlAngle(self.CP, self.CI) elif self.CP.lateralTuning.which() == 'pid': self.LaC = LatControlPID(self.CP, self.CI) elif self.CP.lateralTuning.which() == 'torque': self.LaC = LatControlTorque(self.CP, self.CI) def update(self): self.sm.update(15) if self.sm.updated["liveCalibration"]: self.pose_calibrator.feed_live_calib(self.sm['liveCalibration']) if self.sm.updated["livePose"]: device_pose = Pose.from_live_pose(self.sm['livePose']) self.calibrated_pose = self.pose_calibrator.build_calibrated_pose(device_pose) def state_control(self): CS = self.sm['carState'] # Update VehicleModel lp = self.sm['liveParameters'] x = max(lp.stiffnessFactor, 0.1) sr = max(lp.steerRatio, 0.1) self.VM.update_params(x, sr) # Update Torque Params if self.CP.lateralTuning.which() == 'torque': torque_params = self.sm['liveTorqueParameters'] if self.sm.all_checks(['liveTorqueParameters']) and torque_params.useParams: self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered, torque_params.frictionCoefficientFiltered) long_plan = self.sm['longitudinalPlan'] model_v2 = self.sm['modelV2'] CC = car.CarControl.new_message() CC.enabled = self.sm['selfdriveState'].enabled # Check which actuators can be enabled standstill = abs(CS.vEgo) <= max(self.CP.minSteerSpeed, MIN_LATERAL_CONTROL_SPEED) or CS.standstill CC.latActive = self.sm['selfdriveState'].active and not CS.steerFaultTemporary and not CS.steerFaultPermanent and not standstill CC.longActive = CC.enabled and not any(e.overrideLongitudinal for e in self.sm['onroadEvents']) and self.CP.openpilotLongitudinalControl actuators = CC.actuators actuators.longControlState = self.LoC.long_control_state # Enable blinkers while lane changing if model_v2.meta.laneChangeState != LaneChangeState.off: CC.leftBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.left CC.rightBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.right if not CC.latActive: self.LaC.reset() if not CC.longActive: self.LoC.reset() # accel PID loop pid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, CS.vCruise * CV.KPH_TO_MS) actuators.accel = float(self.LoC.update(CC.longActive, CS, long_plan.aTarget, long_plan.shouldStop, pid_accel_limits)) # Steering PID loop and lateral MPC self.desired_curvature = clip_curvature(CS.vEgo, self.desired_curvature, model_v2.action.desiredCurvature) actuators.curvature = float(self.desired_curvature) steer, steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp, self.steer_limited_by_controls, self.desired_curvature, self.calibrated_pose) # TODO what if not available actuators.torque = float(steer) actuators.steeringAngleDeg = float(steeringAngleDeg) # Ensure no NaNs/Infs for p in ACTUATOR_FIELDS: attr = getattr(actuators, p) if not isinstance(attr, SupportsFloat): continue if not math.isfinite(attr): cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}") setattr(actuators, p, 0.0) return CC, lac_log def publish(self, CC, lac_log): CS = self.sm['carState'] # Orientation and angle rates can be useful for carcontroller # Only calibrated (car) frame is relevant for the carcontroller if self.calibrated_pose is not None: CC.orientationNED = self.calibrated_pose.orientation.xyz.tolist() CC.angularVelocity = self.calibrated_pose.angular_velocity.xyz.tolist() CC.cruiseControl.override = CC.enabled and not CC.longActive and self.CP.openpilotLongitudinalControl CC.cruiseControl.cancel = CS.cruiseState.enabled and (not CC.enabled or not self.CP.pcmCruise) speeds = self.sm['longitudinalPlan'].speeds if len(speeds): CC.cruiseControl.resume = CC.enabled and CS.cruiseState.standstill and speeds[-1] > 0.1 hudControl = CC.hudControl hudControl.setSpeed = float(CS.vCruiseCluster * CV.KPH_TO_MS) hudControl.speedVisible = CC.enabled hudControl.lanesVisible = CC.enabled hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead hudControl.leadDistanceBars = self.sm['selfdriveState'].personality.raw + 1 hudControl.visualAlert = self.sm['selfdriveState'].alertHudVisual hudControl.rightLaneVisible = True hudControl.leftLaneVisible = True if self.sm.valid['driverAssistance']: hudControl.leftLaneDepart = self.sm['driverAssistance'].leftLaneDeparture hudControl.rightLaneDepart = self.sm['driverAssistance'].rightLaneDeparture if self.sm['selfdriveState'].active: CO = self.sm['carOutput'] if self.CP.steerControlType == car.CarParams.SteerControlType.angle: self.steer_limited_by_controls = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \ STEER_ANGLE_SATURATION_THRESHOLD else: self.steer_limited_by_controls = abs(CC.actuators.torque - CO.actuatorsOutput.torque) > 1e-2 # TODO: both controlsState and carControl valids should be set by # sm.all_checks(), but this creates a circular dependency # controlsState dat = messaging.new_message('controlsState') dat.valid = CS.canValid cs = dat.controlsState lp = self.sm['liveParameters'] steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg) cs.curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll) cs.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan'] cs.lateralPlanMonoTime = self.sm.logMonoTime['modelV2'] cs.desiredCurvature = float(self.desired_curvature) cs.longControlState = self.LoC.long_control_state cs.upAccelCmd = float(self.LoC.pid.p) cs.uiAccelCmd = float(self.LoC.pid.i) cs.ufAccelCmd = float(self.LoC.pid.f) cs.forceDecel = bool((self.sm['driverMonitoringState'].awarenessStatus < 0.) or (self.sm['selfdriveState'].state == State.softDisabling)) lat_tuning = self.CP.lateralTuning.which() if self.CP.steerControlType == car.CarParams.SteerControlType.angle: cs.lateralControlState.angleState = lac_log elif lat_tuning == 'pid': cs.lateralControlState.pidState = lac_log elif lat_tuning == 'torque': cs.lateralControlState.torqueState = lac_log self.pm.send('controlsState', dat) # carControl cc_send = messaging.new_message('carControl') cc_send.valid = CS.canValid cc_send.carControl = CC self.pm.send('carControl', cc_send) def run(self): rk = Ratekeeper(100, print_delay_threshold=None) while True: self.update() CC, lac_log = self.state_control() self.publish(CC, lac_log) rk.monitor_time() def main(): config_realtime_process(4, Priority.CTRL_HIGH) controls = Controls() controls.run() if __name__ == "__main__": main()