import  numpy  as  np 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . selfdrive . controls . lib . drive_helpers  import  CONTROL_N ,  MIN_SPEED ,  get_speed_error 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . selfdrive . controls . lib . desire_helper  import  DesireHelper 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  cereal . messaging  as  messaging 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal  import  log 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								TRAJECTORY_SIZE  =  33 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								CAMERA_OFFSET  =  0.04 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  LateralPlanner : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  __init__ ( self ,  CP ,  debug = False ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . DH  =  DesireHelper ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # Vehicle model parameters used to calculate lateral movement of car 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . factor1  =  CP . wheelbase  -  CP . centerToFront 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . factor2  =  ( CP . centerToFront  *  CP . mass )  /  ( CP . wheelbase  *  CP . tireStiffnessRear ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . last_cloudlog_t  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . solution_invalid_cnt  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . path_xyz  =  np . zeros ( ( TRAJECTORY_SIZE ,  3 ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . velocity_xyz  =  np . zeros ( ( TRAJECTORY_SIZE ,  3 ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . v_plan  =  np . zeros ( ( TRAJECTORY_SIZE , ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . x_sol  =  np . zeros ( ( TRAJECTORY_SIZE ,  4 ) ,  dtype = np . float32 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . v_ego  =  MIN_SPEED 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . l_lane_change_prob  =  0.0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . r_lane_change_prob  =  0.0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . debug_mode  =  debug 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  update ( self ,  sm ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    v_ego_car  =  sm [ ' carState ' ] . vEgo 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # Parse model predictions 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    md  =  sm [ ' modelV2 ' ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  len ( md . position . x )  ==  TRAJECTORY_SIZE  and  len ( md . velocity . x )  ==  TRAJECTORY_SIZE  and  len ( md . lateralPlannerSolution . x )  ==  TRAJECTORY_SIZE : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . path_xyz  =  np . column_stack ( [ md . position . x ,  md . position . y ,  md . position . z ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . velocity_xyz  =  np . column_stack ( [ md . velocity . x ,  md . velocity . y ,  md . velocity . z ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      car_speed  =  np . linalg . norm ( self . velocity_xyz ,  axis = 1 )  -  get_speed_error ( md ,  v_ego_car ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . v_plan  =  np . clip ( car_speed ,  MIN_SPEED ,  np . inf ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . v_ego  =  self . v_plan [ 0 ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . x_sol  =  np . column_stack ( [ md . lateralPlannerSolution . x ,  md . lateralPlannerSolution . y ,  md . lateralPlannerSolution . yaw ,  md . lateralPlannerSolution . yawRate ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Lane change logic 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    desire_state  =  md . meta . desireState 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  len ( desire_state ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . l_lane_change_prob  =  desire_state [ log . LateralPlan . Desire . laneChangeLeft ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . r_lane_change_prob  =  desire_state [ log . LateralPlan . Desire . laneChangeRight ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lane_change_prob  =  self . l_lane_change_prob  +  self . r_lane_change_prob 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . DH . update ( sm [ ' carState ' ] ,  sm [ ' carControl ' ] . latActive ,  lane_change_prob ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  publish ( self ,  sm ,  pm ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    plan_send  =  messaging . new_message ( ' lateralPlan ' ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    plan_send . valid  =  sm . all_checks ( service_list = [ ' carState ' ,  ' controlsState ' ,  ' modelV2 ' ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lateralPlan  =  plan_send . lateralPlan 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . modelMonoTime  =  sm . logMonoTime [ ' modelV2 ' ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . dPathPoints  =  self . path_xyz [ : , 1 ] . tolist ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lateralPlan . psis  =  self . x_sol [ 0 : CONTROL_N ,  2 ] . tolist ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . curvatures  =  ( self . x_sol [ 0 : CONTROL_N ,  3 ] / self . v_ego ) . tolist ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lateralPlan . curvatureRates  =  [ float ( 0 )  for  _  in  range ( CONTROL_N - 1 ) ]  # TODO: unused 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . mpcSolutionValid  =  bool ( 1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lateralPlan . solverExecutionTime  =  0.0 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  self . debug_mode : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      lateralPlan . solverState  =  log . LateralPlan . SolverState . new_message ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      lateralPlan . solverState . x  =  self . x_sol . tolist ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . desire  =  self . DH . desire 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . useLaneLines  =  False 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lateralPlan . laneChangeState  =  self . DH . lane_change_state 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lateralPlan . laneChangeDirection  =  self . DH . lane_change_direction 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pm . send ( ' lateralPlan ' ,  plan_send )