@ -1,10 +1,4 @@
import time
import numpy as np
from openpilot . common . realtime import DT_MDL
from openpilot . common . numpy_fast import interp
from openpilot . system . swaglog import cloudlog
from openpilot . selfdrive . controls . lib . lateral_mpc_lib . lat_mpc import LateralMpc
from openpilot . selfdrive . controls . lib . lateral_mpc_lib . lat_mpc import N as LAT_MPC_N
from openpilot . selfdrive . controls . lib . drive_helpers import CONTROL_N , MIN_SPEED , get_speed_error
from openpilot . selfdrive . controls . lib . desire_helper import DesireHelper
import cereal . messaging as messaging
@ -13,18 +7,6 @@ from cereal import log
TRAJECTORY_SIZE = 33
CAMERA_OFFSET = 0.04
PATH_COST = 1.0
LATERAL_MOTION_COST = 0.11
LATERAL_ACCEL_COST = 0.0
LATERAL_JERK_COST = 0.04
# Extreme steering rate is unpleasant, even
# when it does not cause bad jerk.
# TODO this cost should be lowered when low
# speed lateral control is stable on all cars
STEERING_RATE_COST = 700.0
class LateralPlanner :
def __init__ ( self , CP , debug = False ) :
self . DH = DesireHelper ( )
@ -37,42 +19,26 @@ class LateralPlanner:
self . path_xyz = np . zeros ( ( TRAJECTORY_SIZE , 3 ) )
self . velocity_xyz = np . zeros ( ( TRAJECTORY_SIZE , 3 ) )
self . plan_yaw = np . zeros ( ( TRAJECTORY_SIZE , ) )
self . plan_yaw_rate = np . zeros ( ( TRAJECTORY_SIZE , ) )
self . t_idxs = np . arange ( TRAJECTORY_SIZE )
self . y_pts = np . zeros ( ( TRAJECTORY_SIZE , ) )
self . v_plan = np . zeros ( ( TRAJECTORY_SIZE , ) )
self . v_ego = 0.0
self . x_sol = np . zeros ( ( TRAJECTORY_SIZE , 4 ) , dtype = np . float32 )
self . v_ego = MIN_SPEED
self . l_lane_change_prob = 0.0
self . r_lane_change_prob = 0.0
self . debug_mode = debug
self . lat_mpc = LateralMpc ( )
self . reset_mpc ( np . zeros ( 4 ) )
def reset_mpc ( self , x0 = None ) :
if x0 is None :
x0 = np . zeros ( 4 )
self . x0 = x0
self . lat_mpc . reset ( x0 = self . x0 )
def update ( self , sm ) :
# clip speed , lateral planning is not possible at 0 speed
measured_curvature = sm [ ' controlsState ' ] . curvature
v_ego_car = sm [ ' carState ' ] . vEgo
# Parse model predictions
md = sm [ ' modelV2 ' ]
if len ( md . position . x ) == TRAJECTORY_SIZE and len ( md . orienta tion. x ) == TRAJECTORY_SIZE :
if len ( md . position . x ) == TRAJECTORY_SIZE and len ( md . velocity . x ) == TRAJECTORY_SIZE and len ( md . lateralPlannerSolution . x ) == TRAJECTORY_SIZE :
self . path_xyz = np . column_stack ( [ md . position . x , md . position . y , md . position . z ] )
self . t_idxs = np . array ( md . position . t )
self . plan_yaw = np . array ( md . orientation . z )
self . plan_yaw_rate = np . array ( md . orientationRate . z )
self . velocity_xyz = np . column_stack ( [ md . velocity . x , md . velocity . y , md . velocity . z ] )
car_speed = np . linalg . norm ( self . velocity_xyz , axis = 1 ) - get_speed_error ( md , v_ego_car )
self . v_plan = np . clip ( car_speed , MIN_SPEED , np . inf )
self . v_ego = self . v_plan [ 0 ]
self . x_sol = np . column_stack ( [ md . lateralPlannerSolution . x , md . lateralPlannerSolution . y , md . lateralPlannerSolution . yaw , md . lateralPlannerSolution . yawRate ] )
# Lane change logic
desire_state = md . meta . desireState
@ -82,66 +48,23 @@ class LateralPlanner:
lane_change_prob = self . l_lane_change_prob + self . r_lane_change_prob
self . DH . update ( sm [ ' carState ' ] , sm [ ' carControl ' ] . latActive , lane_change_prob )
self . lat_mpc . set_weights ( PATH_COST , LATERAL_MOTION_COST ,
LATERAL_ACCEL_COST , LATERAL_JERK_COST ,
STEERING_RATE_COST )
y_pts = self . path_xyz [ : LAT_MPC_N + 1 , 1 ]
heading_pts = self . plan_yaw [ : LAT_MPC_N + 1 ]
yaw_rate_pts = self . plan_yaw_rate [ : LAT_MPC_N + 1 ]
self . y_pts = y_pts
assert len ( y_pts ) == LAT_MPC_N + 1
assert len ( heading_pts ) == LAT_MPC_N + 1
assert len ( yaw_rate_pts ) == LAT_MPC_N + 1
lateral_factor = np . clip ( self . factor1 - ( self . factor2 * self . v_plan * * 2 ) , 0.0 , np . inf )
p = np . column_stack ( [ self . v_plan , lateral_factor ] )
self . lat_mpc . run ( self . x0 ,
p ,
y_pts ,
heading_pts ,
yaw_rate_pts )
# init state for next iteration
# mpc.u_sol is the desired second derivative of psi given x0 curv state.
# with x0[3] = measured_yaw_rate, this would be the actual desired yaw rate.
# instead, interpolate x_sol so that x0[3] is the desired yaw rate for lat_control.
self . x0 [ 3 ] = interp ( DT_MDL , self . t_idxs [ : LAT_MPC_N + 1 ] , self . lat_mpc . x_sol [ : , 3 ] )
# Check for infeasible MPC solution
mpc_nans = np . isnan ( self . lat_mpc . x_sol [ : , 3 ] ) . any ( )
t = time . monotonic ( )
if mpc_nans or self . lat_mpc . solution_status != 0 :
self . reset_mpc ( )
self . x0 [ 3 ] = measured_curvature * self . v_ego
if t > self . last_cloudlog_t + 5.0 :
self . last_cloudlog_t = t
cloudlog . warning ( " Lateral mpc - nan: True " )
if self . lat_mpc . cost > 1e6 or mpc_nans :
self . solution_invalid_cnt + = 1
else :
self . solution_invalid_cnt = 0
def publish ( self , sm , pm ) :
plan_solution_valid = self . solution_invalid_cnt < 2
plan_send = messaging . new_message ( ' lateralPlan ' )
plan_send . valid = sm . all_checks ( service_list = [ ' carState ' , ' controlsState ' , ' modelV2 ' ] )
lateralPlan = plan_send . lateralPlan
lateralPlan . modelMonoTime = sm . logMonoTime [ ' modelV2 ' ]
lateralPlan . dPathPoints = self . y_ pts . tolist ( )
lateralPlan . psis = self . lat_mpc . x_sol [ 0 : CONTROL_N , 2 ] . tolist ( )
lateralPlan . dPathPoints = self . path_xyz [ : , 1 ] . tolist ( )
lateralPlan . psis = self . x_sol [ 0 : CONTROL_N , 2 ] . tolist ( )
lateralPlan . curvatures = ( self . lat_mpc . x_sol [ 0 : CONTROL_N , 3 ] / self . v_ego ) . tolist ( )
lateralPlan . curvatureRates = [ float ( x . item ( ) / self . v_ego ) for x in self . lat_mpc . u_sol [ 0 : CONTROL_N - 1 ] ] + [ 0.0 ]
lateralPlan . curvatures = ( self . x_sol [ 0 : CONTROL_N , 3 ] / self . v_ego ) . tolist ( )
lateralPlan . curvatureRates = [ float ( 0 ) for _ in range ( CONTROL_N - 1 ) ] # TODO: unused
lateralPlan . mpcSolutionValid = bool ( plan_solution_valid )
lateralPlan . solverExecutionTime = self . lat_mpc . solve_time
lateralPlan . mpcSolutionValid = bool ( 1 )
lateralPlan . solverExecutionTime = 0.0
if self . debug_mode :
lateralPlan . solverCost = self . lat_mpc . cost
lateralPlan . solverState = log . LateralPlan . SolverState . new_message ( )
lateralPlan . solverState . x = self . lat_mpc . x_sol . tolist ( )
lateralPlan . solverState . u = self . lat_mpc . u_sol . flatten ( ) . tolist ( )
lateralPlan . solverState . x = self . x_sol . tolist ( )
lateralPlan . desire = self . DH . desire
lateralPlan . useLaneLines = False