openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

167 lines
6.8 KiB

#!/usr/bin/env python3
import math
import numpy as np
from common.numpy_fast import clip, interp
import cereal.messaging as messaging
from common.conversions import Conversions as CV
from common.filter_simple import FirstOrderFilter
from common.params import Params
from common.realtime import DT_MDL
from selfdrive.modeld.constants import T_IDXS
from selfdrive.controls.lib.longcontrol import LongCtrlState
from selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import LongitudinalMpc
from selfdrive.controls.lib.longitudinal_mpc_lib.long_mpc import T_IDXS as T_IDXS_MPC
from selfdrive.controls.lib.drive_helpers import V_CRUISE_MAX, CONTROL_N
from system.swaglog import cloudlog
LON_MPC_STEP = 0.2 # first step is 0.2s
AWARENESS_DECEL = -0.2 # car smoothly decel at .2m/s^2 when user is distracted
A_CRUISE_MIN = -1.2
A_CRUISE_MAX_VALS = [1.6, 1.2, 0.8, 0.6]
A_CRUISE_MAX_BP = [0., 10.0, 25., 40.]
# Lookup table for turns
_A_TOTAL_MAX_V = [1.7, 3.2]
_A_TOTAL_MAX_BP = [20., 40.]
def get_max_accel(v_ego):
return interp(v_ego, A_CRUISE_MAX_BP, A_CRUISE_MAX_VALS)
def limit_accel_in_turns(v_ego, angle_steers, a_target, CP):
"""
This function returns a limited long acceleration allowed, depending on the existing lateral acceleration
this should avoid accelerating when losing the target in turns
"""
# FIXME: This function to calculate lateral accel is incorrect and should use the VehicleModel
# The lookup table for turns should also be updated if we do this
a_total_max = interp(v_ego, _A_TOTAL_MAX_BP, _A_TOTAL_MAX_V)
a_y = v_ego ** 2 * angle_steers * CV.DEG_TO_RAD / (CP.steerRatio * CP.wheelbase)
a_x_allowed = math.sqrt(max(a_total_max ** 2 - a_y ** 2, 0.))
return [a_target[0], min(a_target[1], a_x_allowed)]
class LongitudinalPlanner:
def __init__(self, CP, init_v=0.0, init_a=0.0):
self.CP = CP
self.params = Params()
self.param_read_counter = 0
self.mpc = LongitudinalMpc()
self.read_param()
self.fcw = False
self.a_desired = init_a
self.v_desired_filter = FirstOrderFilter(init_v, 2.0, DT_MDL)
self.v_model_error = 0.0
self.v_desired_trajectory = np.zeros(CONTROL_N)
self.a_desired_trajectory = np.zeros(CONTROL_N)
self.j_desired_trajectory = np.zeros(CONTROL_N)
self.solverExecutionTime = 0.0
def read_param(self):
e2e = self.params.get_bool('ExperimentalMode') and self.CP.openpilotLongitudinalControl
self.mpc.mode = 'blended' if e2e else 'acc'
def parse_model(self, model_msg, model_error):
if (len(model_msg.position.x) == 33 and
len(model_msg.velocity.x) == 33 and
len(model_msg.acceleration.x) == 33):
x = np.interp(T_IDXS_MPC, T_IDXS, model_msg.position.x) - model_error * T_IDXS_MPC
v = np.interp(T_IDXS_MPC, T_IDXS, model_msg.velocity.x) - model_error
a = np.interp(T_IDXS_MPC, T_IDXS, model_msg.acceleration.x)
j = np.zeros(len(T_IDXS_MPC))
else:
x = np.zeros(len(T_IDXS_MPC))
v = np.zeros(len(T_IDXS_MPC))
a = np.zeros(len(T_IDXS_MPC))
j = np.zeros(len(T_IDXS_MPC))
return x, v, a, j
def update(self, sm, read=True):
if self.param_read_counter % 50 == 0 and read:
self.read_param()
self.param_read_counter += 1
v_ego = sm['carState'].vEgo
v_cruise_kph = sm['controlsState'].vCruise
v_cruise_kph = min(v_cruise_kph, V_CRUISE_MAX)
v_cruise = v_cruise_kph * CV.KPH_TO_MS
long_control_off = sm['controlsState'].longControlState == LongCtrlState.off
force_slow_decel = sm['controlsState'].forceDecel
# Reset current state when not engaged, or user is controlling the speed
reset_state = long_control_off if self.CP.openpilotLongitudinalControl else not sm['controlsState'].enabled
# No change cost when user is controlling the speed, or when standstill
prev_accel_constraint = not (reset_state or sm['carState'].standstill)
accel_limits = [A_CRUISE_MIN, get_max_accel(v_ego)]
accel_limits_turns = limit_accel_in_turns(v_ego, sm['carState'].steeringAngleDeg, accel_limits, self.CP)
if reset_state:
self.v_desired_filter.x = v_ego
# Clip aEgo to cruise limits to prevent large accelerations when becoming active
self.a_desired = clip(sm['carState'].aEgo, accel_limits[0], accel_limits[1])
# Prevent divergence, smooth in current v_ego
self.v_desired_filter.x = max(0.0, self.v_desired_filter.update(v_ego))
# Compute model v_ego error
if len(sm['modelV2'].temporalPose.trans):
self.v_model_error = sm['modelV2'].temporalPose.trans[0] - v_ego
if force_slow_decel:
# if required so, force a smooth deceleration
accel_limits_turns[1] = min(accel_limits_turns[1], AWARENESS_DECEL)
accel_limits_turns[0] = min(accel_limits_turns[0], accel_limits_turns[1])
# clip limits, cannot init MPC outside of bounds
accel_limits_turns[0] = min(accel_limits_turns[0], self.a_desired + 0.05)
accel_limits_turns[1] = max(accel_limits_turns[1], self.a_desired - 0.05)
self.mpc.set_weights(prev_accel_constraint)
self.mpc.set_accel_limits(accel_limits_turns[0], accel_limits_turns[1])
self.mpc.set_cur_state(self.v_desired_filter.x, self.a_desired)
x, v, a, j = self.parse_model(sm['modelV2'], self.v_model_error)
self.mpc.update(sm['carState'], sm['radarState'], v_cruise, x, v, a, j)
self.v_desired_trajectory = np.interp(T_IDXS[:CONTROL_N], T_IDXS_MPC, self.mpc.v_solution)
self.a_desired_trajectory = np.interp(T_IDXS[:CONTROL_N], T_IDXS_MPC, self.mpc.a_solution)
self.j_desired_trajectory = np.interp(T_IDXS[:CONTROL_N], T_IDXS_MPC[:-1], self.mpc.j_solution)
# TODO counter is only needed because radar is glitchy, remove once radar is gone
self.fcw = self.mpc.crash_cnt > 2 and not sm['carState'].standstill
if self.fcw:
cloudlog.info("FCW triggered")
# Interpolate 0.05 seconds and save as starting point for next iteration
a_prev = self.a_desired
self.a_desired = float(interp(DT_MDL, T_IDXS[:CONTROL_N], self.a_desired_trajectory))
self.v_desired_filter.x = self.v_desired_filter.x + DT_MDL * (self.a_desired + a_prev) / 2.0
def publish(self, sm, pm):
plan_send = messaging.new_message('longitudinalPlan')
plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState'])
longitudinalPlan = plan_send.longitudinalPlan
longitudinalPlan.modelMonoTime = sm.logMonoTime['modelV2']
longitudinalPlan.processingDelay = (plan_send.logMonoTime / 1e9) - sm.logMonoTime['modelV2']
longitudinalPlan.speeds = self.v_desired_trajectory.tolist()
longitudinalPlan.accels = self.a_desired_trajectory.tolist()
longitudinalPlan.jerks = self.j_desired_trajectory.tolist()
longitudinalPlan.hasLead = sm['radarState'].leadOne.status
longitudinalPlan.longitudinalPlanSource = self.mpc.source
longitudinalPlan.fcw = self.fcw
longitudinalPlan.solverExecutionTime = self.mpc.solve_time
pm.send('longitudinalPlan', plan_send)