openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

122 lines
5.1 KiB

import numpy as np
from common.realtime import sec_since_boot, DT_MDL
from common.numpy_fast import interp
from system.swaglog import cloudlog
from selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import LateralMpc
from selfdrive.controls.lib.lateral_mpc_lib.lat_mpc import N as LAT_MPC_N
from selfdrive.controls.lib.drive_helpers import CONTROL_N
from selfdrive.controls.lib.desire_helper import DesireHelper
import cereal.messaging as messaging
from cereal import log
TRAJECTORY_SIZE = 33
CAMERA_OFFSET = 0.04
class LateralPlanner:
def __init__(self, CP):
self.DH = DesireHelper()
# Vehicle model parameters used to calculate lateral movement of car
self.factor1 = CP.wheelbase - CP.centerToFront
self.factor2 = (CP.centerToFront * CP.mass) / (CP.wheelbase * CP.tireStiffnessRear)
self.last_cloudlog_t = 0
self.solution_invalid_cnt = 0
self.path_xyz = np.zeros((TRAJECTORY_SIZE, 3))
self.plan_yaw = np.zeros((TRAJECTORY_SIZE,))
self.plan_yaw_rate = np.zeros((TRAJECTORY_SIZE,))
self.t_idxs = np.arange(TRAJECTORY_SIZE)
self.y_pts = np.zeros(TRAJECTORY_SIZE)
self.lat_mpc = LateralMpc()
self.reset_mpc(np.zeros(4))
def reset_mpc(self, x0=np.zeros(4)):
self.x0 = x0
self.lat_mpc.reset(x0=self.x0)
def update(self, sm):
v_ego = sm['carState'].vEgo
measured_curvature = sm['controlsState'].curvature
# Parse model predictions
md = sm['modelV2']
if len(md.position.x) == TRAJECTORY_SIZE and len(md.orientation.x) == TRAJECTORY_SIZE:
self.path_xyz = np.column_stack([md.position.x, md.position.y, md.position.z])
self.t_idxs = np.array(md.position.t)
self.plan_yaw = np.array(md.orientation.z)
self.plan_yaw_rate = np.array(md.orientationRate.z)
# Lane change logic
desire_state = md.meta.desireState
if len(desire_state):
self.l_lane_change_prob = desire_state[log.LateralPlan.Desire.laneChangeLeft]
self.r_lane_change_prob = desire_state[log.LateralPlan.Desire.laneChangeRight]
lane_change_prob = self.l_lane_change_prob + self.r_lane_change_prob
self.DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob)
d_path_xyz = self.path_xyz
# Heading cost is useful at low speed, otherwise end of plan can be off-heading
heading_cost = interp(v_ego, [5.0, 10.0], [1.0, 0.15])
self.lat_mpc.set_weights(1.0, heading_cost, 0.0, .075)
y_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:, 1])
heading_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw)
yaw_rate_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw_rate)
self.y_pts = y_pts
assert len(y_pts) == LAT_MPC_N + 1
assert len(heading_pts) == LAT_MPC_N + 1
assert len(yaw_rate_pts) == LAT_MPC_N + 1
lateral_factor = max(0, self.factor1 - (self.factor2 * v_ego**2))
p = np.array([v_ego, lateral_factor])
self.lat_mpc.run(self.x0,
p,
y_pts,
heading_pts,
yaw_rate_pts)
# init state for next iteration
# mpc.u_sol is the desired second derivative of psi given x0 curv state.
# with x0[3] = measured_yaw_rate, this would be the actual desired yaw rate.
# instead, interpolate x_sol so that x0[3] is the desired yaw rate for lat_control.
self.x0[3] = interp(DT_MDL, self.t_idxs[:LAT_MPC_N + 1], self.lat_mpc.x_sol[:, 3])
# Check for infeasible MPC solution
mpc_nans = np.isnan(self.lat_mpc.x_sol[:, 3]).any()
t = sec_since_boot()
if mpc_nans or self.lat_mpc.solution_status != 0:
self.reset_mpc()
self.x0[3] = measured_curvature
if t > self.last_cloudlog_t + 5.0:
self.last_cloudlog_t = t
cloudlog.warning("Lateral mpc - nan: True")
if self.lat_mpc.cost > 20000. or mpc_nans:
self.solution_invalid_cnt += 1
else:
self.solution_invalid_cnt = 0
def publish(self, sm, pm):
plan_solution_valid = self.solution_invalid_cnt < 2
plan_send = messaging.new_message('lateralPlan')
plan_send.valid = sm.all_checks(service_list=['carState', 'controlsState', 'modelV2'])
lateralPlan = plan_send.lateralPlan
lateralPlan.modelMonoTime = sm.logMonoTime['modelV2']
lateralPlan.dPathPoints = self.y_pts.tolist()
lateralPlan.psis = self.lat_mpc.x_sol[0:CONTROL_N, 2].tolist()
# clip speed for curv calculation at 1m/s, to prevent low speed extremes
clipped_speed = max(1.0, sm['carState'].vEgo)
lateralPlan.curvatures = (self.lat_mpc.x_sol[0:CONTROL_N, 3]/clipped_speed).tolist()
lateralPlan.curvatureRates = [float(x/clipped_speed) for x in self.lat_mpc.u_sol[0:CONTROL_N - 1]] + [0.0]
lateralPlan.mpcSolutionValid = bool(plan_solution_valid)
lateralPlan.solverExecutionTime = self.lat_mpc.solve_time
lateralPlan.desire = self.DH.desire
lateralPlan.useLaneLines = False
lateralPlan.laneChangeState = self.DH.lane_change_state
lateralPlan.laneChangeDirection = self.DH.lane_change_direction
pm.send('lateralPlan', plan_send)