openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

227 lines
9.5 KiB

#!/usr/bin/env python3
import math
from typing import SupportsFloat
from cereal import car, log
import cereal.messaging as messaging
from openpilot.common.conversions import Conversions as CV
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, Priority, Ratekeeper
from openpilot.common.swaglog import cloudlog
from opendbc.car.car_helpers import interfaces
from opendbc.car.vehicle_model import VehicleModel
from openpilot.selfdrive.controls.lib.drive_helpers import clip_curvature
from openpilot.selfdrive.controls.lib.latcontrol import LatControl
from openpilot.selfdrive.controls.lib.latcontrol_pid import LatControlPID
from openpilot.selfdrive.controls.lib.latcontrol_angle import LatControlAngle, STEER_ANGLE_SATURATION_THRESHOLD
from openpilot.selfdrive.controls.lib.latcontrol_torque import LatControlTorque
from openpilot.selfdrive.controls.lib.longcontrol import LongControl
from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose
State = log.SelfdriveState.OpenpilotState
LaneChangeState = log.LaneChangeState
LaneChangeDirection = log.LaneChangeDirection
ACTUATOR_FIELDS = tuple(car.CarControl.Actuators.schema.fields.keys())
class Controls:
def __init__(self) -> None:
self.params = Params()
cloudlog.info("controlsd is waiting for CarParams")
self.CP = messaging.log_from_bytes(self.params.get("CarParams", block=True), car.CarParams)
cloudlog.info("controlsd got CarParams")
self.CI = interfaces[self.CP.carFingerprint](self.CP)
self.sm = messaging.SubMaster(['liveParameters', 'liveTorqueParameters', 'modelV2', 'selfdriveState',
'liveCalibration', 'livePose', 'longitudinalPlan', 'carState', 'carOutput',
'driverMonitoringState', 'onroadEvents', 'driverAssistance'], poll='selfdriveState')
self.pm = messaging.PubMaster(['carControl', 'controlsState'])
self.steer_limited_by_controls = False
self.curvature = 0.0
self.desired_curvature = 0.0
self.pose_calibrator = PoseCalibrator()
self.calibrated_pose: Pose | None = None
self.LoC = LongControl(self.CP)
self.VM = VehicleModel(self.CP)
self.LaC: LatControl
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.LaC = LatControlAngle(self.CP, self.CI)
elif self.CP.lateralTuning.which() == 'pid':
self.LaC = LatControlPID(self.CP, self.CI)
elif self.CP.lateralTuning.which() == 'torque':
self.LaC = LatControlTorque(self.CP, self.CI)
def update(self):
self.sm.update(15)
if self.sm.updated["liveCalibration"]:
self.pose_calibrator.feed_live_calib(self.sm['liveCalibration'])
if self.sm.updated["livePose"]:
device_pose = Pose.from_live_pose(self.sm['livePose'])
self.calibrated_pose = self.pose_calibrator.build_calibrated_pose(device_pose)
def state_control(self):
CS = self.sm['carState']
# Update VehicleModel
lp = self.sm['liveParameters']
x = max(lp.stiffnessFactor, 0.1)
sr = max(lp.steerRatio, 0.1)
self.VM.update_params(x, sr)
steer_angle_without_offset = math.radians(CS.steeringAngleDeg - lp.angleOffsetDeg)
self.curvature = -self.VM.calc_curvature(steer_angle_without_offset, CS.vEgo, lp.roll)
Live torque (#25456) * wip torqued * add basic logic * setup in manager * check sanity and publish msg * add first order filter to outputs * wire up controlsd, and update gains * rename intercept to offset * add cloudlog, live values are not updated * fix bugs, do not reset points for now * fix crashes * rename to main * fix bugs, works offline * fix float in cereal bug * add latacc filter * randomly choose points, approx for iid * add variable decay * local param to capnp instead of dict * verify works in replay * use torqued output in controlsd * use in controlsd; use points from past routes * controlsd bugfix * filter before updating gains, needs to be replaced * save all points to ensure smooth transition across routes, revert friction factor to 1.5 * add filters to prevent noisy low-speed data points; improve fit sanity * add engaged buffer * revert lat_acc thresh * use paramsd realtime process config * make latacc-to-torque generic, and overrideable * move freq to 4Hz, avoid storing in np.array, don't publish points in the message * float instead of np * remove constant while storing pts * rename slope, offset to lat_accet_factor, offset * resolve issues * use camelcase in all capnp params * use camelcase everywhere * reduce latacc threshold or sanity, add car_sane todo, save points properly * add and check tag * write param to disk at end of route * remove args * rebase op, cereal * save on exit * restore default handler * cpu usage check * add to process replay * handle reset better, reduce unnecessary computation * always publish raw values - useful for debug * regen routes * update refs * checks on cache restore * check tuning vals too * clean that up * reduce cpu usage * reduce cpu usage by 75% * cleanup * optimize further * handle reset condition better, don't put points in init, use only in corolla * bump cereal after rebasing * update refs * Update common/params.cc Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * remove unnecessary checks * Update RELEASES.md Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> old-commit-hash: 4fa62f146426f76c9c1c2867d9729b33ec612b59
3 years ago
# Update Torque Params
if self.CP.lateralTuning.which() == 'torque':
torque_params = self.sm['liveTorqueParameters']
if self.sm.all_checks(['liveTorqueParameters']) and torque_params.useParams:
self.LaC.update_live_torque_params(torque_params.latAccelFactorFiltered, torque_params.latAccelOffsetFiltered,
torque_params.frictionCoefficientFiltered)
Live torque (#25456) * wip torqued * add basic logic * setup in manager * check sanity and publish msg * add first order filter to outputs * wire up controlsd, and update gains * rename intercept to offset * add cloudlog, live values are not updated * fix bugs, do not reset points for now * fix crashes * rename to main * fix bugs, works offline * fix float in cereal bug * add latacc filter * randomly choose points, approx for iid * add variable decay * local param to capnp instead of dict * verify works in replay * use torqued output in controlsd * use in controlsd; use points from past routes * controlsd bugfix * filter before updating gains, needs to be replaced * save all points to ensure smooth transition across routes, revert friction factor to 1.5 * add filters to prevent noisy low-speed data points; improve fit sanity * add engaged buffer * revert lat_acc thresh * use paramsd realtime process config * make latacc-to-torque generic, and overrideable * move freq to 4Hz, avoid storing in np.array, don't publish points in the message * float instead of np * remove constant while storing pts * rename slope, offset to lat_accet_factor, offset * resolve issues * use camelcase in all capnp params * use camelcase everywhere * reduce latacc threshold or sanity, add car_sane todo, save points properly * add and check tag * write param to disk at end of route * remove args * rebase op, cereal * save on exit * restore default handler * cpu usage check * add to process replay * handle reset better, reduce unnecessary computation * always publish raw values - useful for debug * regen routes * update refs * checks on cache restore * check tuning vals too * clean that up * reduce cpu usage * reduce cpu usage by 75% * cleanup * optimize further * handle reset condition better, don't put points in init, use only in corolla * bump cereal after rebasing * update refs * Update common/params.cc Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * remove unnecessary checks * Update RELEASES.md Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> old-commit-hash: 4fa62f146426f76c9c1c2867d9729b33ec612b59
3 years ago
long_plan = self.sm['longitudinalPlan']
model_v2 = self.sm['modelV2']
CC = car.CarControl.new_message()
CC.enabled = self.sm['selfdriveState'].enabled
# Check which actuators can be enabled
standstill = abs(CS.vEgo) <= max(self.CP.minSteerSpeed, 0.3) or CS.standstill
CC.latActive = self.sm['selfdriveState'].active and not CS.steerFaultTemporary and not CS.steerFaultPermanent and \
(not standstill or self.CP.steerAtStandstill)
CC.longActive = CC.enabled and not any(e.overrideLongitudinal for e in self.sm['onroadEvents']) and self.CP.openpilotLongitudinalControl
actuators = CC.actuators
actuators.longControlState = self.LoC.long_control_state
# Enable blinkers while lane changing
if model_v2.meta.laneChangeState != LaneChangeState.off:
CC.leftBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.left
CC.rightBlinker = model_v2.meta.laneChangeDirection == LaneChangeDirection.right
if not CC.latActive:
self.LaC.reset()
if not CC.longActive:
self.LoC.reset()
# accel PID loop
pid_accel_limits = self.CI.get_pid_accel_limits(self.CP, CS.vEgo, CS.vCruise * CV.KPH_TO_MS)
actuators.accel = float(self.LoC.update(CC.longActive, CS, long_plan.aTarget, long_plan.shouldStop, pid_accel_limits))
# Steering PID loop and lateral MPC
# Reset desired curvature to current to avoid violating the limits on engage
new_desired_curvature = model_v2.action.desiredCurvature if CC.latActive else self.curvature
self.desired_curvature, curvature_limited = clip_curvature(CS.vEgo, self.desired_curvature, new_desired_curvature, lp.roll)
actuators.curvature = self.desired_curvature
steer, steeringAngleDeg, lac_log = self.LaC.update(CC.latActive, CS, self.VM, lp,
self.steer_limited_by_controls, self.desired_curvature,
curvature_limited) # TODO what if not available
actuators.torque = float(steer)
actuators.steeringAngleDeg = float(steeringAngleDeg)
# Ensure no NaNs/Infs
for p in ACTUATOR_FIELDS:
attr = getattr(actuators, p)
if not isinstance(attr, SupportsFloat):
continue
if not math.isfinite(attr):
cloudlog.error(f"actuators.{p} not finite {actuators.to_dict()}")
setattr(actuators, p, 0.0)
return CC, lac_log
def publish(self, CC, lac_log):
CS = self.sm['carState']
# Orientation and angle rates can be useful for carcontroller
# Only calibrated (car) frame is relevant for the carcontroller
CC.currentCurvature = self.curvature
if self.calibrated_pose is not None:
CC.orientationNED = self.calibrated_pose.orientation.xyz.tolist()
CC.angularVelocity = self.calibrated_pose.angular_velocity.xyz.tolist()
CC.cruiseControl.override = CC.enabled and not CC.longActive and self.CP.openpilotLongitudinalControl
CC.cruiseControl.cancel = CS.cruiseState.enabled and (not CC.enabled or not self.CP.pcmCruise)
CC.cruiseControl.resume = CC.enabled and CS.cruiseState.standstill and not self.sm['longitudinalPlan'].shouldStop
hudControl = CC.hudControl
hudControl.setSpeed = float(CS.vCruiseCluster * CV.KPH_TO_MS)
hudControl.speedVisible = CC.enabled
hudControl.lanesVisible = CC.enabled
hudControl.leadVisible = self.sm['longitudinalPlan'].hasLead
hudControl.leadDistanceBars = self.sm['selfdriveState'].personality.raw + 1
hudControl.visualAlert = self.sm['selfdriveState'].alertHudVisual
hudControl.rightLaneVisible = True
hudControl.leftLaneVisible = True
if self.sm.valid['driverAssistance']:
hudControl.leftLaneDepart = self.sm['driverAssistance'].leftLaneDeparture
hudControl.rightLaneDepart = self.sm['driverAssistance'].rightLaneDeparture
if self.sm['selfdriveState'].active:
CO = self.sm['carOutput']
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
self.steer_limited_by_controls = abs(CC.actuators.steeringAngleDeg - CO.actuatorsOutput.steeringAngleDeg) > \
STEER_ANGLE_SATURATION_THRESHOLD
else:
self.steer_limited_by_controls = abs(CC.actuators.torque - CO.actuatorsOutput.torque) > 1e-2
# TODO: both controlsState and carControl valids should be set by
# sm.all_checks(), but this creates a circular dependency
# controlsState
dat = messaging.new_message('controlsState')
dat.valid = CS.canValid
cs = dat.controlsState
cs.curvature = self.curvature
cs.longitudinalPlanMonoTime = self.sm.logMonoTime['longitudinalPlan']
cs.lateralPlanMonoTime = self.sm.logMonoTime['modelV2']
cs.desiredCurvature = self.desired_curvature
cs.longControlState = self.LoC.long_control_state
cs.upAccelCmd = float(self.LoC.pid.p)
cs.uiAccelCmd = float(self.LoC.pid.i)
cs.ufAccelCmd = float(self.LoC.pid.f)
cs.forceDecel = bool((self.sm['driverMonitoringState'].awarenessStatus < 0.) or
(self.sm['selfdriveState'].state == State.softDisabling))
lat_tuning = self.CP.lateralTuning.which()
if self.CP.steerControlType == car.CarParams.SteerControlType.angle:
cs.lateralControlState.angleState = lac_log
elif lat_tuning == 'pid':
cs.lateralControlState.pidState = lac_log
elif lat_tuning == 'torque':
cs.lateralControlState.torqueState = lac_log
self.pm.send('controlsState', dat)
# carControl
cc_send = messaging.new_message('carControl')
cc_send.valid = CS.canValid
cc_send.carControl = CC
self.pm.send('carControl', cc_send)
def run(self):
rk = Ratekeeper(100, print_delay_threshold=None)
while True:
self.update()
CC, lac_log = self.state_control()
self.publish(CC, lac_log)
rk.monitor_time()
def main():
config_realtime_process(4, Priority.CTRL_HIGH)
controls = Controls()
controls.run()
if __name__ == "__main__":
main()