import numpy as np
from common . realtime import sec_since_boot , DT_MDL
from common . numpy_fast import interp
from system . swaglog import cloudlog
from selfdrive . controls . lib . lateral_mpc_lib . lat_mpc import LateralMpc
from selfdrive . controls . lib . lateral_mpc_lib . lat_mpc import N as LAT_MPC_N
from selfdrive . controls . lib . drive_helpers import CONTROL_N
from selfdrive . controls . lib . desire_helper import DesireHelper
import cereal . messaging as messaging
from cereal import log
TRAJECTORY_SIZE = 33
CAMERA_OFFSET = 0.04
class LateralPlanner :
def __init__ ( self , CP ) :
self . DH = DesireHelper ( )
# Vehicle model parameters used to calculate lateral movement of car
self . factor1 = CP . wheelbase - CP . centerToFront
self . factor2 = ( CP . centerToFront * CP . mass ) / ( CP . wheelbase * CP . tireStiffnessRear )
self . last_cloudlog_t = 0
self . solution_invalid_cnt = 0
self . path_xyz = np . zeros ( ( TRAJECTORY_SIZE , 3 ) )
self . plan_yaw = np . zeros ( ( TRAJECTORY_SIZE , ) )
self . plan_yaw_rate = np . zeros ( ( TRAJECTORY_SIZE , ) )
self . t_idxs = np . arange ( TRAJECTORY_SIZE )
self . y_pts = np . zeros ( TRAJECTORY_SIZE )
self . lat_mpc = LateralMpc ( )
self . reset_mpc ( np . zeros ( 4 ) )
def reset_mpc ( self , x0 = np . zeros ( 4 ) ) :
self . x0 = x0
self . lat_mpc . reset ( x0 = self . x0 )
def update ( self , sm ) :
v_ego = sm [ ' carState ' ] . vEgo
measured_curvature = sm [ ' controlsState ' ] . curvature
# Parse model predictions
md = sm [ ' modelV2 ' ]
if len ( md . position . x ) == TRAJECTORY_SIZE and len ( md . orientation . x ) == TRAJECTORY_SIZE :
self . path_xyz = np . column_stack ( [ md . position . x , md . position . y , md . position . z ] )
self . t_idxs = np . array ( md . position . t )
self . plan_yaw = np . array ( md . orientation . z )
self . plan_yaw_rate = np . array ( md . orientationRate . z )
# Lane change logic
desire_state = md . meta . desireState
if len ( desire_state ) :
self . l_lane_change_prob = desire_state [ log . LateralPlan . Desire . laneChangeLeft ]
self . r_lane_change_prob = desire_state [ log . LateralPlan . Desire . laneChangeRight ]
lane_change_prob = self . l_lane_change_prob + self . r_lane_change_prob
self . DH . update ( sm [ ' carState ' ] , sm [ ' carControl ' ] . latActive , lane_change_prob )
d_path_xyz = self . path_xyz
# Heading cost is useful at low speed, otherwise end of plan can be off-heading
heading_cost = interp ( v_ego , [ 5.0 , 10.0 ] , [ 1.0 , 0.15 ] )
self . lat_mpc . set_weights ( 1.0 , heading_cost , 0.0 , .075 )
y_pts = np . interp ( v_ego * self . t_idxs [ : LAT_MPC_N + 1 ] , np . linalg . norm ( d_path_xyz , axis = 1 ) , d_path_xyz [ : , 1 ] )
heading_pts = np . interp ( v_ego * self . t_idxs [ : LAT_MPC_N + 1 ] , np . linalg . norm ( self . path_xyz , axis = 1 ) , self . plan_yaw )
yaw_rate_pts = np . interp ( v_ego * self . t_idxs [ : LAT_MPC_N + 1 ] , np . linalg . norm ( self . path_xyz , axis = 1 ) , self . plan_yaw_rate )
self . y_pts = y_pts
assert len ( y_pts ) == LAT_MPC_N + 1
assert len ( heading_pts ) == LAT_MPC_N + 1
assert len ( yaw_rate_pts ) == LAT_MPC_N + 1
lateral_factor = max ( 0 , self . factor1 - ( self . factor2 * v_ego * * 2 ) )
p = np . array ( [ v_ego , lateral_factor ] )
self . lat_mpc . run ( self . x0 ,
p ,
y_pts ,
heading_pts ,
yaw_rate_pts )
# init state for next
# mpc.u_sol is the desired curvature rate given x0 curv state.
# with x0[3] = measured_curvature, this would be the actual desired rate.
# instead, interpolate x_sol so that x0[3] is the desired curvature for lat_control.
self . x0 [ 3 ] = interp ( DT_MDL , self . t_idxs [ : LAT_MPC_N + 1 ] , self . lat_mpc . x_sol [ : , 3 ] )
# Check for infeasible MPC solution
mpc_nans = np . isnan ( self . lat_mpc . x_sol [ : , 3 ] ) . any ( )
t = sec_since_boot ( )
if mpc_nans or self . lat_mpc . solution_status != 0 :
self . reset_mpc ( )
self . x0 [ 3 ] = measured_curvature
if t > self . last_cloudlog_t + 5.0 :
self . last_cloudlog_t = t
cloudlog . warning ( " Lateral mpc - nan: True " )
if self . lat_mpc . cost > 20000. or mpc_nans :
self . solution_invalid_cnt + = 1
else :
self . solution_invalid_cnt = 0
def publish ( self , sm , pm ) :
plan_solution_valid = self . solution_invalid_cnt < 2
plan_send = messaging . new_message ( ' lateralPlan ' )
plan_send . valid = sm . all_checks ( service_list = [ ' carState ' , ' controlsState ' , ' modelV2 ' ] )
lateralPlan = plan_send . lateralPlan
lateralPlan . modelMonoTime = sm . logMonoTime [ ' modelV2 ' ]
lateralPlan . dPathPoints = self . y_pts . tolist ( )
lateralPlan . psis = self . lat_mpc . x_sol [ 0 : CONTROL_N , 2 ] . tolist ( )
lateralPlan . curvatures = ( self . lat_mpc . x_sol [ 0 : CONTROL_N , 3 ] / sm [ ' carState ' ] . vEgo ) . tolist ( )
lateralPlan . curvatureRates = [ float ( x ) for x in self . lat_mpc . u_sol [ 0 : CONTROL_N - 1 ] ] + [ 0.0 ]
lateralPlan . mpcSolutionValid = bool ( plan_solution_valid )
lateralPlan . solverExecutionTime = self . lat_mpc . solve_time
lateralPlan . desire = self . DH . desire
lateralPlan . useLaneLines = False
lateralPlan . laneChangeState = self . DH . lane_change_state
lateralPlan . laneChangeDirection = self . DH . lane_change_direction
pm . send ( ' lateralPlan ' , plan_send )