import  numpy  as  np 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  common . realtime  import  sec_since_boot ,  DT_MDL 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  common . numpy_fast  import  interp 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  system . swaglog  import  cloudlog 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  selfdrive . controls . lib . lateral_mpc_lib . lat_mpc  import  LateralMpc 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  selfdrive . controls . lib . lateral_mpc_lib . lat_mpc  import  N  as  LAT_MPC_N 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  selfdrive . controls . lib . drive_helpers  import  CONTROL_N ,  MIN_SPEED ,  get_speed_error 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  selfdrive . controls . lib . desire_helper  import  DesireHelper 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  cereal . messaging  as  messaging 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal  import  log 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								TRAJECTORY_SIZE  =  33 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								CAMERA_OFFSET  =  0.04 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								PATH_COST  =  1.0 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								LATERAL_MOTION_COST  =  0.11 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								LATERAL_ACCEL_COST  =  0.0 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								LATERAL_JERK_COST  =  0.04 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								# Extreme steering rate is unpleasant, even 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# when it does not cause bad jerk. 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# TODO this cost should be lowered when low 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								# speed lateral control is stable on all cars 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								STEERING_RATE_COST  =  700.0 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  LateralPlanner : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  __init__ ( self ,  CP ,  debug = False ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . DH  =  DesireHelper ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # Vehicle model parameters used to calculate lateral movement of car 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . factor1  =  CP . wheelbase  -  CP . centerToFront 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . factor2  =  ( CP . centerToFront  *  CP . mass )  /  ( CP . wheelbase  *  CP . tireStiffnessRear ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . last_cloudlog_t  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . solution_invalid_cnt  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . path_xyz  =  np . zeros ( ( TRAJECTORY_SIZE ,  3 ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . velocity_xyz  =  np . zeros ( ( TRAJECTORY_SIZE ,  3 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . plan_yaw  =  np . zeros ( ( TRAJECTORY_SIZE , ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . plan_yaw_rate  =  np . zeros ( ( TRAJECTORY_SIZE , ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . t_idxs  =  np . arange ( TRAJECTORY_SIZE ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . y_pts  =  np . zeros ( ( TRAJECTORY_SIZE , ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . v_plan  =  np . zeros ( ( TRAJECTORY_SIZE , ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . v_ego  =  0.0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . l_lane_change_prob  =  0.0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . r_lane_change_prob  =  0.0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . debug_mode  =  debug 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . lat_mpc  =  LateralMpc ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . reset_mpc ( np . zeros ( 4 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  reset_mpc ( self ,  x0 = np . zeros ( 4 ) ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . x0  =  x0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . lat_mpc . reset ( x0 = self . x0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  update ( self ,  sm ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # clip speed , lateral planning is not possible at 0 speed 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    measured_curvature  =  sm [ ' controlsState ' ] . curvature 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    v_ego_car  =  sm [ ' carState ' ] . vEgo 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # Parse model predictions 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    md  =  sm [ ' modelV2 ' ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  len ( md . position . x )  ==  TRAJECTORY_SIZE  and  len ( md . orientation . x )  ==  TRAJECTORY_SIZE : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . path_xyz  =  np . column_stack ( [ md . position . x ,  md . position . y ,  md . position . z ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . t_idxs  =  np . array ( md . position . t ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . plan_yaw  =  np . array ( md . orientation . z ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . plan_yaw_rate  =  np . array ( md . orientationRate . z ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . velocity_xyz  =  np . column_stack ( [ md . velocity . x ,  md . velocity . y ,  md . velocity . z ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      car_speed  =  np . linalg . norm ( self . velocity_xyz ,  axis = 1 )  -  get_speed_error ( md ,  v_ego_car ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . v_plan  =  np . clip ( car_speed ,  MIN_SPEED ,  np . inf ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . v_ego  =  self . v_plan [ 0 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Lane change logic 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    desire_state  =  md . meta . desireState 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  len ( desire_state ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . l_lane_change_prob  =  desire_state [ log . LateralPlan . Desire . laneChangeLeft ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . r_lane_change_prob  =  desire_state [ log . LateralPlan . Desire . laneChangeRight ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lane_change_prob  =  self . l_lane_change_prob  +  self . r_lane_change_prob 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . DH . update ( sm [ ' carState ' ] ,  sm [ ' carControl ' ] . latActive ,  lane_change_prob ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . lat_mpc . set_weights ( PATH_COST ,  LATERAL_MOTION_COST , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								                             LATERAL_ACCEL_COST ,  LATERAL_JERK_COST , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                             STEERING_RATE_COST ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    y_pts  =  self . path_xyz [ : LAT_MPC_N + 1 ,  1 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    heading_pts  =  self . plan_yaw [ : LAT_MPC_N + 1 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    yaw_rate_pts  =  self . plan_yaw_rate [ : LAT_MPC_N + 1 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . y_pts  =  y_pts 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  len ( y_pts )  ==  LAT_MPC_N  +  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  len ( heading_pts )  ==  LAT_MPC_N  +  1 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  len ( yaw_rate_pts )  ==  LAT_MPC_N  +  1 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateral_factor  =  np . clip ( self . factor1  -  ( self . factor2  *  self . v_plan * * 2 ) ,  0.0 ,  np . inf ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    p  =  np . column_stack ( [ self . v_plan ,  lateral_factor ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . lat_mpc . run ( self . x0 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								                     p , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								                     y_pts , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								                     heading_pts , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								                     yaw_rate_pts ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # init state for next iteration 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # mpc.u_sol is the desired second derivative of psi given x0 curv state. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # with x0[3] = measured_yaw_rate, this would be the actual desired yaw rate. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # instead, interpolate x_sol so that x0[3] is the desired yaw rate for lat_control. 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . x0 [ 3 ]  =  interp ( DT_MDL ,  self . t_idxs [ : LAT_MPC_N  +  1 ] ,  self . lat_mpc . x_sol [ : ,  3 ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    #  Check for infeasible MPC solution 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    mpc_nans  =  np . isnan ( self . lat_mpc . x_sol [ : ,  3 ] ) . any ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    t  =  sec_since_boot ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  mpc_nans  or  self . lat_mpc . solution_status  !=  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . reset_mpc ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . x0 [ 3 ]  =  measured_curvature  *  self . v_ego 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  t  >  self . last_cloudlog_t  +  5.0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . last_cloudlog_t  =  t 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        cloudlog . warning ( " Lateral mpc - nan: True " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  self . lat_mpc . cost  >  1e6  or  mpc_nans : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . solution_invalid_cnt  + =  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . solution_invalid_cnt  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  publish ( self ,  sm ,  pm ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    plan_solution_valid  =  self . solution_invalid_cnt  <  2 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    plan_send  =  messaging . new_message ( ' lateralPlan ' ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    plan_send . valid  =  sm . all_checks ( service_list = [ ' carState ' ,  ' controlsState ' ,  ' modelV2 ' ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lateralPlan  =  plan_send . lateralPlan 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . modelMonoTime  =  sm . logMonoTime [ ' modelV2 ' ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . dPathPoints  =  self . y_pts . tolist ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lateralPlan . psis  =  self . lat_mpc . x_sol [ 0 : CONTROL_N ,  2 ] . tolist ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . curvatures  =  ( self . lat_mpc . x_sol [ 0 : CONTROL_N ,  3 ] / self . v_ego ) . tolist ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lateralPlan . curvatureRates  =  [ float ( x / self . v_ego )  for  x  in  self . lat_mpc . u_sol [ 0 : CONTROL_N  -  1 ] ]  +  [ 0.0 ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lateralPlan . mpcSolutionValid  =  bool ( plan_solution_valid ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . solverExecutionTime  =  self . lat_mpc . solve_time 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  self . debug_mode : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      lateralPlan . solverCost  =  self . lat_mpc . cost 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      lateralPlan . solverState  =  log . LateralPlan . SolverState . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      lateralPlan . solverState . x  =  self . lat_mpc . x_sol . tolist ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      lateralPlan . solverState . u  =  self . lat_mpc . u_sol . flatten ( ) . tolist ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . desire  =  self . DH . desire 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . useLaneLines  =  False 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . laneChangeState  =  self . DH . lane_change_state 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lateralPlan . laneChangeDirection  =  self . DH . lane_change_direction 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pm . send ( ' lateralPlan ' ,  plan_send )