import math
import numpy as np
from common . realtime import sec_since_boot , DT_MDL
from common . numpy_fast import interp
from selfdrive . swaglog import cloudlog
from selfdrive . controls . lib . lateral_mpc_lib . lat_mpc import LateralMpc
from selfdrive . controls . lib . drive_helpers import CONTROL_N , MPC_COST_LAT , LAT_MPC_N , CAR_ROTATION_RADIUS
from selfdrive . controls . lib . lane_planner import LanePlanner , TRAJECTORY_SIZE
from selfdrive . config import Conversions as CV
import cereal . messaging as messaging
from cereal import log
LaneChangeState = log . LateralPlan . LaneChangeState
LaneChangeDirection = log . LateralPlan . LaneChangeDirection
LANE_CHANGE_SPEED_MIN = 30 * CV . MPH_TO_MS
LANE_CHANGE_TIME_MAX = 10.
DESIRES = {
LaneChangeDirection . none : {
LaneChangeState . off : log . LateralPlan . Desire . none ,
LaneChangeState . preLaneChange : log . LateralPlan . Desire . none ,
LaneChangeState . laneChangeStarting : log . LateralPlan . Desire . none ,
LaneChangeState . laneChangeFinishing : log . LateralPlan . Desire . none ,
} ,
LaneChangeDirection . left : {
LaneChangeState . off : log . LateralPlan . Desire . none ,
LaneChangeState . preLaneChange : log . LateralPlan . Desire . none ,
LaneChangeState . laneChangeStarting : log . LateralPlan . Desire . laneChangeLeft ,
LaneChangeState . laneChangeFinishing : log . LateralPlan . Desire . laneChangeLeft ,
} ,
LaneChangeDirection . right : {
LaneChangeState . off : log . LateralPlan . Desire . none ,
LaneChangeState . preLaneChange : log . LateralPlan . Desire . none ,
LaneChangeState . laneChangeStarting : log . LateralPlan . Desire . laneChangeRight ,
LaneChangeState . laneChangeFinishing : log . LateralPlan . Desire . laneChangeRight ,
} ,
}
class LateralPlanner :
def __init__ ( self , CP , use_lanelines = True , wide_camera = False ) :
self . use_lanelines = use_lanelines
self . LP = LanePlanner ( wide_camera )
self . last_cloudlog_t = 0
self . steer_rate_cost = CP . steerRateCost
self . solution_invalid_cnt = 0
self . lane_change_state = LaneChangeState . off
self . lane_change_direction = LaneChangeDirection . none
self . lane_change_timer = 0.0
self . lane_change_ll_prob = 1.0
self . keep_pulse_timer = 0.0
self . prev_one_blinker = False
self . desire = log . LateralPlan . Desire . none
self . path_xyz = np . zeros ( ( TRAJECTORY_SIZE , 3 ) )
self . path_xyz_stds = np . ones ( ( TRAJECTORY_SIZE , 3 ) )
self . plan_yaw = np . zeros ( ( TRAJECTORY_SIZE , ) )
self . t_idxs = np . arange ( TRAJECTORY_SIZE )
self . y_pts = np . zeros ( TRAJECTORY_SIZE )
self . lat_mpc = LateralMpc ( )
self . reset_mpc ( np . zeros ( 6 ) )
def reset_mpc ( self , x0 = np . zeros ( 6 ) ) :
self . x0 = x0
self . lat_mpc . reset ( x0 = self . x0 )
def update ( self , sm ) :
v_ego = sm [ ' carState ' ] . vEgo
active = sm [ ' controlsState ' ] . active
measured_curvature = sm [ ' controlsState ' ] . curvature
md = sm [ ' modelV2 ' ]
self . LP . parse_model ( sm [ ' modelV2 ' ] )
if len ( md . position . x ) == TRAJECTORY_SIZE and len ( md . orientation . x ) == TRAJECTORY_SIZE :
self . path_xyz = np . column_stack ( [ md . position . x , md . position . y , md . position . z ] )
self . t_idxs = np . array ( md . position . t )
self . plan_yaw = list ( md . orientation . z )
if len ( md . position . xStd ) == TRAJECTORY_SIZE :
self . path_xyz_stds = np . column_stack ( [ md . position . xStd , md . position . yStd , md . position . zStd ] )
# Lane change logic
one_blinker = sm [ ' carState ' ] . leftBlinker != sm [ ' carState ' ] . rightBlinker
below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN
if ( not active ) or ( self . lane_change_timer > LANE_CHANGE_TIME_MAX ) :
self . lane_change_state = LaneChangeState . off
self . lane_change_direction = LaneChangeDirection . none
else :
# LaneChangeState.off
if self . lane_change_state == LaneChangeState . off and one_blinker and not self . prev_one_blinker and not below_lane_change_speed :
self . lane_change_state = LaneChangeState . preLaneChange
self . lane_change_ll_prob = 1.0
# LaneChangeState.preLaneChange
elif self . lane_change_state == LaneChangeState . preLaneChange :
# Set lane change direction
if sm [ ' carState ' ] . leftBlinker :
self . lane_change_direction = LaneChangeDirection . left
elif sm [ ' carState ' ] . rightBlinker :
self . lane_change_direction = LaneChangeDirection . right
else : # If there are no blinkers we will go back to LaneChangeState.off
self . lane_change_direction = LaneChangeDirection . none
torque_applied = sm [ ' carState ' ] . steeringPressed and \
( ( sm [ ' carState ' ] . steeringTorque > 0 and self . lane_change_direction == LaneChangeDirection . left ) or
( sm [ ' carState ' ] . steeringTorque < 0 and self . lane_change_direction == LaneChangeDirection . right ) )
blindspot_detected = ( ( sm [ ' carState ' ] . leftBlindspot and self . lane_change_direction == LaneChangeDirection . left ) or
( sm [ ' carState ' ] . rightBlindspot and self . lane_change_direction == LaneChangeDirection . right ) )
if not one_blinker or below_lane_change_speed :
self . lane_change_state = LaneChangeState . off
elif torque_applied and not blindspot_detected :
self . lane_change_state = LaneChangeState . laneChangeStarting
# LaneChangeState.laneChangeStarting
elif self . lane_change_state == LaneChangeState . laneChangeStarting :
# fade out over .5s
self . lane_change_ll_prob = max ( self . lane_change_ll_prob - 2 * DT_MDL , 0.0 )
# 98% certainty
lane_change_prob = self . LP . l_lane_change_prob + self . LP . r_lane_change_prob
if lane_change_prob < 0.02 and self . lane_change_ll_prob < 0.01 :
self . lane_change_state = LaneChangeState . laneChangeFinishing
# LaneChangeState.laneChangeFinishing
elif self . lane_change_state == LaneChangeState . laneChangeFinishing :
# fade in laneline over 1s
self . lane_change_ll_prob = min ( self . lane_change_ll_prob + DT_MDL , 1.0 )
if self . lane_change_ll_prob > 0.99 :
self . lane_change_direction = LaneChangeDirection . none
if one_blinker :
self . lane_change_state = LaneChangeState . preLaneChange
else :
self . lane_change_state = LaneChangeState . off
if self . lane_change_state in ( LaneChangeState . off , LaneChangeState . preLaneChange ) :
self . lane_change_timer = 0.0
else :
self . lane_change_timer + = DT_MDL
self . prev_one_blinker = one_blinker
self . desire = DESIRES [ self . lane_change_direction ] [ self . lane_change_state ]
# Send keep pulse once per second during LaneChangeStart.preLaneChange
if self . lane_change_state in ( LaneChangeState . off , LaneChangeState . laneChangeStarting ) :
self . keep_pulse_timer = 0.0
elif self . lane_change_state == LaneChangeState . preLaneChange :
self . keep_pulse_timer + = DT_MDL
if self . keep_pulse_timer > 1.0 :
self . keep_pulse_timer = 0.0
elif self . desire in ( log . LateralPlan . Desire . keepLeft , log . LateralPlan . Desire . keepRight ) :
self . desire = log . LateralPlan . Desire . none
# Turn off lanes during lane change
if self . desire == log . LateralPlan . Desire . laneChangeRight or self . desire == log . LateralPlan . Desire . laneChangeLeft :
self . LP . lll_prob * = self . lane_change_ll_prob
self . LP . rll_prob * = self . lane_change_ll_prob
if self . use_lanelines :
d_path_xyz = self . LP . get_d_path ( v_ego , self . t_idxs , self . path_xyz )
self . lat_mpc . set_weights ( MPC_COST_LAT . PATH , MPC_COST_LAT . HEADING , self . steer_rate_cost )
else :
d_path_xyz = self . path_xyz
path_cost = np . clip ( abs ( self . path_xyz [ 0 , 1 ] / self . path_xyz_stds [ 0 , 1 ] ) , 0.5 , 1.5 ) * MPC_COST_LAT . PATH
# Heading cost is useful at low speed, otherwise end of plan can be off-heading
heading_cost = interp ( v_ego , [ 5.0 , 10.0 ] , [ MPC_COST_LAT . HEADING , 0.0 ] )
self . lat_mpc . set_weights ( path_cost , heading_cost , self . steer_rate_cost )
y_pts = np . interp ( v_ego * self . t_idxs [ : LAT_MPC_N + 1 ] , np . linalg . norm ( d_path_xyz , axis = 1 ) , d_path_xyz [ : , 1 ] )
heading_pts = np . interp ( v_ego * self . t_idxs [ : LAT_MPC_N + 1 ] , np . linalg . norm ( self . path_xyz , axis = 1 ) , self . plan_yaw )
self . y_pts = y_pts
assert len ( y_pts ) == LAT_MPC_N + 1
assert len ( heading_pts ) == LAT_MPC_N + 1
self . x0 [ 4 ] = v_ego
self . lat_mpc . run ( self . x0 ,
v_ego ,
CAR_ROTATION_RADIUS ,
y_pts ,
heading_pts )
# init state for next
self . x0 [ 3 ] = interp ( DT_MDL , self . t_idxs [ : LAT_MPC_N + 1 ] , self . lat_mpc . x_sol [ : , 3 ] )
# Check for infeasible MPC solution
mpc_nans = any ( math . isnan ( x ) for x in self . lat_mpc . x_sol [ : , 3 ] )
t = sec_since_boot ( )
if mpc_nans or self . lat_mpc . solution_status != 0 :
self . reset_mpc ( )
self . x0 [ 3 ] = measured_curvature
if t > self . last_cloudlog_t + 5.0 :
self . last_cloudlog_t = t
cloudlog . warning ( " Lateral mpc - nan: True " )
if self . lat_mpc . cost > 20000. or mpc_nans :
self . solution_invalid_cnt + = 1
else :
self . solution_invalid_cnt = 0
def publish ( self , sm , pm ) :
plan_solution_valid = self . solution_invalid_cnt < 2
plan_send = messaging . new_message ( ' lateralPlan ' )
plan_send . valid = sm . all_alive_and_valid ( service_list = [ ' carState ' , ' controlsState ' , ' modelV2 ' ] )
lateralPlan = plan_send . lateralPlan
lateralPlan . laneWidth = float ( self . LP . lane_width )
lateralPlan . dPathPoints = self . y_pts . tolist ( )
lateralPlan . psis = self . lat_mpc . x_sol [ 0 : CONTROL_N , 2 ] . tolist ( )
lateralPlan . curvatures = self . lat_mpc . x_sol [ 0 : CONTROL_N , 3 ] . tolist ( )
lateralPlan . curvatureRates = [ float ( x ) for x in self . lat_mpc . u_sol [ 0 : CONTROL_N - 1 ] ] + [ 0.0 ]
lateralPlan . lProb = float ( self . LP . lll_prob )
lateralPlan . rProb = float ( self . LP . rll_prob )
lateralPlan . dProb = float ( self . LP . d_prob )
lateralPlan . mpcSolutionValid = bool ( plan_solution_valid )
lateralPlan . desire = self . desire
lateralPlan . useLaneLines = self . use_lanelines
lateralPlan . laneChangeState = self . lane_change_state
lateralPlan . laneChangeDirection = self . lane_change_direction
pm . send ( ' lateralPlan ' , plan_send )