You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							231 lines
						
					
					
						
							9.2 KiB
						
					
					
				
			
		
		
	
	
							231 lines
						
					
					
						
							9.2 KiB
						
					
					
				| import os
 | |
| import math
 | |
| from common.realtime import sec_since_boot, DT_MDL
 | |
| from selfdrive.swaglog import cloudlog
 | |
| from selfdrive.controls.lib.lateral_mpc import libmpc_py
 | |
| from selfdrive.controls.lib.drive_helpers import MPC_COST_LAT
 | |
| from selfdrive.controls.lib.lane_planner import LanePlanner
 | |
| from selfdrive.config import Conversions as CV
 | |
| import cereal.messaging as messaging
 | |
| from cereal import log
 | |
| 
 | |
| LaneChangeState = log.PathPlan.LaneChangeState
 | |
| LaneChangeDirection = log.PathPlan.LaneChangeDirection
 | |
| 
 | |
| LOG_MPC = os.environ.get('LOG_MPC', False)
 | |
| 
 | |
| LANE_CHANGE_SPEED_MIN = 45 * CV.MPH_TO_MS
 | |
| LANE_CHANGE_TIME_MAX = 10.
 | |
| 
 | |
| DESIRES = {
 | |
|   LaneChangeDirection.none: {
 | |
|     LaneChangeState.off: log.PathPlan.Desire.none,
 | |
|     LaneChangeState.preLaneChange: log.PathPlan.Desire.none,
 | |
|     LaneChangeState.laneChangeStarting: log.PathPlan.Desire.none,
 | |
|     LaneChangeState.laneChangeFinishing: log.PathPlan.Desire.none,
 | |
|   },
 | |
|   LaneChangeDirection.left: {
 | |
|     LaneChangeState.off: log.PathPlan.Desire.none,
 | |
|     LaneChangeState.preLaneChange: log.PathPlan.Desire.none,
 | |
|     LaneChangeState.laneChangeStarting: log.PathPlan.Desire.laneChangeLeft,
 | |
|     LaneChangeState.laneChangeFinishing: log.PathPlan.Desire.laneChangeLeft,
 | |
|   },
 | |
|   LaneChangeDirection.right: {
 | |
|     LaneChangeState.off: log.PathPlan.Desire.none,
 | |
|     LaneChangeState.preLaneChange: log.PathPlan.Desire.none,
 | |
|     LaneChangeState.laneChangeStarting: log.PathPlan.Desire.laneChangeRight,
 | |
|     LaneChangeState.laneChangeFinishing: log.PathPlan.Desire.laneChangeRight,
 | |
|   },
 | |
| }
 | |
| 
 | |
| def calc_states_after_delay(states, v_ego, steer_angle, curvature_factor, steer_ratio, delay):
 | |
|   states[0].x = v_ego * delay
 | |
|   states[0].psi = v_ego * curvature_factor * math.radians(steer_angle) / steer_ratio * delay
 | |
|   return states
 | |
| 
 | |
| 
 | |
| class PathPlanner():
 | |
|   def __init__(self, CP):
 | |
|     self.LP = LanePlanner()
 | |
| 
 | |
|     self.last_cloudlog_t = 0
 | |
|     self.steer_rate_cost = CP.steerRateCost
 | |
| 
 | |
|     self.setup_mpc()
 | |
|     self.solution_invalid_cnt = 0
 | |
|     self.path_offset_i = 0.0
 | |
|     self.lane_change_state = LaneChangeState.off
 | |
|     self.lane_change_direction = LaneChangeDirection.none
 | |
|     self.lane_change_timer = 0.0
 | |
|     self.prev_one_blinker = False
 | |
| 
 | |
|   def setup_mpc(self):
 | |
|     self.libmpc = libmpc_py.libmpc
 | |
|     self.libmpc.init(MPC_COST_LAT.PATH, MPC_COST_LAT.LANE, MPC_COST_LAT.HEADING, self.steer_rate_cost)
 | |
| 
 | |
|     self.mpc_solution = libmpc_py.ffi.new("log_t *")
 | |
|     self.cur_state = libmpc_py.ffi.new("state_t *")
 | |
|     self.cur_state[0].x = 0.0
 | |
|     self.cur_state[0].y = 0.0
 | |
|     self.cur_state[0].psi = 0.0
 | |
|     self.cur_state[0].delta = 0.0
 | |
| 
 | |
|     self.angle_steers_des = 0.0
 | |
|     self.angle_steers_des_mpc = 0.0
 | |
|     self.angle_steers_des_prev = 0.0
 | |
|     self.angle_steers_des_time = 0.0
 | |
| 
 | |
|   def update(self, sm, pm, CP, VM):
 | |
|     v_ego = sm['carState'].vEgo
 | |
|     angle_steers = sm['carState'].steeringAngle
 | |
|     active = sm['controlsState'].active
 | |
| 
 | |
|     angle_offset = sm['liveParameters'].angleOffset
 | |
| 
 | |
|     # Run MPC
 | |
|     self.angle_steers_des_prev = self.angle_steers_des_mpc
 | |
|     VM.update_params(sm['liveParameters'].stiffnessFactor, sm['liveParameters'].steerRatio)
 | |
|     curvature_factor = VM.curvature_factor(v_ego)
 | |
| 
 | |
|     self.LP.parse_model(sm['model'])
 | |
| 
 | |
|     # Lane change logic
 | |
|     one_blinker = sm['carState'].leftBlinker != sm['carState'].rightBlinker
 | |
|     below_lane_change_speed = v_ego < LANE_CHANGE_SPEED_MIN
 | |
| 
 | |
|     if sm['carState'].leftBlinker:
 | |
|       self.lane_change_direction = LaneChangeDirection.left
 | |
|     elif sm['carState'].rightBlinker:
 | |
|       self.lane_change_direction = LaneChangeDirection.right
 | |
| 
 | |
|     if not active or self.lane_change_timer > LANE_CHANGE_TIME_MAX:
 | |
|       self.lane_change_state = LaneChangeState.off
 | |
|       self.lane_change_direction = LaneChangeDirection.none
 | |
|     else:
 | |
|       torque_applied = sm['carState'].steeringPressed and \
 | |
|                        ((sm['carState'].steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or \
 | |
|                         (sm['carState'].steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right))
 | |
| 
 | |
|       lane_change_prob = self.LP.l_lane_change_prob + self.LP.r_lane_change_prob
 | |
| 
 | |
|       # State transitions
 | |
|       # off
 | |
|       if self.lane_change_state == LaneChangeState.off and one_blinker and not self.prev_one_blinker and not below_lane_change_speed:
 | |
|         self.lane_change_state = LaneChangeState.preLaneChange
 | |
| 
 | |
|       # pre
 | |
|       elif self.lane_change_state == LaneChangeState.preLaneChange:
 | |
|         if not one_blinker or below_lane_change_speed:
 | |
|           self.lane_change_state = LaneChangeState.off
 | |
|         elif torque_applied:
 | |
|           self.lane_change_state = LaneChangeState.laneChangeStarting
 | |
| 
 | |
|       # starting
 | |
|       elif self.lane_change_state == LaneChangeState.laneChangeStarting and lane_change_prob > 0.5:
 | |
|         self.lane_change_state = LaneChangeState.laneChangeFinishing
 | |
| 
 | |
|       # finishing
 | |
|       elif self.lane_change_state == LaneChangeState.laneChangeFinishing and lane_change_prob < 0.2:
 | |
|         if one_blinker:
 | |
|           self.lane_change_state = LaneChangeState.preLaneChange
 | |
|         else:
 | |
|           self.lane_change_state = LaneChangeState.off
 | |
| 
 | |
|     if self.lane_change_state in [LaneChangeState.off, LaneChangeState.preLaneChange]:
 | |
|       self.lane_change_timer = 0.0
 | |
|     else:
 | |
|       self.lane_change_timer += DT_MDL
 | |
| 
 | |
|     self.prev_one_blinker = one_blinker
 | |
| 
 | |
|     desire = DESIRES[self.lane_change_direction][self.lane_change_state]
 | |
| 
 | |
|     # Turn off lanes during lane change
 | |
|     if desire == log.PathPlan.Desire.laneChangeRight or desire == log.PathPlan.Desire.laneChangeLeft:
 | |
|       self.LP.l_prob = 0.
 | |
|       self.LP.r_prob = 0.
 | |
|       self.libmpc.init_weights(MPC_COST_LAT.PATH / 10.0, MPC_COST_LAT.LANE, MPC_COST_LAT.HEADING, self.steer_rate_cost)
 | |
|     else:
 | |
|       self.libmpc.init_weights(MPC_COST_LAT.PATH, MPC_COST_LAT.LANE, MPC_COST_LAT.HEADING, self.steer_rate_cost)
 | |
| 
 | |
|     self.LP.update_d_poly(v_ego)
 | |
| 
 | |
| 
 | |
|     # TODO: Check for active, override, and saturation
 | |
|     # if active:
 | |
|     #   self.path_offset_i += self.LP.d_poly[3] / (60.0 * 20.0)
 | |
|     #   self.path_offset_i = clip(self.path_offset_i, -0.5,  0.5)
 | |
|     #   self.LP.d_poly[3] += self.path_offset_i
 | |
|     # else:
 | |
|     #   self.path_offset_i = 0.0
 | |
| 
 | |
|     # account for actuation delay
 | |
|     self.cur_state = calc_states_after_delay(self.cur_state, v_ego, angle_steers - angle_offset, curvature_factor, VM.sR, CP.steerActuatorDelay)
 | |
| 
 | |
|     v_ego_mpc = max(v_ego, 5.0)  # avoid mpc roughness due to low speed
 | |
|     self.libmpc.run_mpc(self.cur_state, self.mpc_solution,
 | |
|                         list(self.LP.l_poly), list(self.LP.r_poly), list(self.LP.d_poly),
 | |
|                         self.LP.l_prob, self.LP.r_prob, curvature_factor, v_ego_mpc, self.LP.lane_width)
 | |
| 
 | |
|     # reset to current steer angle if not active or overriding
 | |
|     if active:
 | |
|       delta_desired = self.mpc_solution[0].delta[1]
 | |
|       rate_desired = math.degrees(self.mpc_solution[0].rate[0] * VM.sR)
 | |
|     else:
 | |
|       delta_desired = math.radians(angle_steers - angle_offset) / VM.sR
 | |
|       rate_desired = 0.0
 | |
| 
 | |
|     self.cur_state[0].delta = delta_desired
 | |
| 
 | |
|     self.angle_steers_des_mpc = float(math.degrees(delta_desired * VM.sR) + angle_offset)
 | |
| 
 | |
|     #  Check for infeasable MPC solution
 | |
|     mpc_nans = any(math.isnan(x) for x in self.mpc_solution[0].delta)
 | |
|     t = sec_since_boot()
 | |
|     if mpc_nans:
 | |
|       self.libmpc.init(MPC_COST_LAT.PATH, MPC_COST_LAT.LANE, MPC_COST_LAT.HEADING, CP.steerRateCost)
 | |
|       self.cur_state[0].delta = math.radians(angle_steers - angle_offset) / VM.sR
 | |
| 
 | |
|       if t > self.last_cloudlog_t + 5.0:
 | |
|         self.last_cloudlog_t = t
 | |
|         cloudlog.warning("Lateral mpc - nan: True")
 | |
| 
 | |
|     if self.mpc_solution[0].cost > 20000. or mpc_nans:   # TODO: find a better way to detect when MPC did not converge
 | |
|       self.solution_invalid_cnt += 1
 | |
|     else:
 | |
|       self.solution_invalid_cnt = 0
 | |
|     plan_solution_valid = self.solution_invalid_cnt < 2
 | |
| 
 | |
|     plan_send = messaging.new_message()
 | |
|     plan_send.init('pathPlan')
 | |
|     plan_send.valid = sm.all_alive_and_valid(service_list=['carState', 'controlsState', 'liveParameters', 'model'])
 | |
|     plan_send.pathPlan.laneWidth = float(self.LP.lane_width)
 | |
|     plan_send.pathPlan.dPoly = [float(x) for x in self.LP.d_poly]
 | |
|     plan_send.pathPlan.lPoly = [float(x) for x in self.LP.l_poly]
 | |
|     plan_send.pathPlan.lProb = float(self.LP.l_prob)
 | |
|     plan_send.pathPlan.rPoly = [float(x) for x in self.LP.r_poly]
 | |
|     plan_send.pathPlan.rProb = float(self.LP.r_prob)
 | |
| 
 | |
|     plan_send.pathPlan.angleSteers = float(self.angle_steers_des_mpc)
 | |
|     plan_send.pathPlan.rateSteers = float(rate_desired)
 | |
|     plan_send.pathPlan.angleOffset = float(sm['liveParameters'].angleOffsetAverage)
 | |
|     plan_send.pathPlan.mpcSolutionValid = bool(plan_solution_valid)
 | |
|     plan_send.pathPlan.paramsValid = bool(sm['liveParameters'].valid)
 | |
|     plan_send.pathPlan.sensorValid = bool(sm['liveParameters'].sensorValid)
 | |
|     plan_send.pathPlan.posenetValid = bool(sm['liveParameters'].posenetValid)
 | |
| 
 | |
|     plan_send.pathPlan.desire = desire
 | |
|     plan_send.pathPlan.laneChangeState = self.lane_change_state
 | |
|     plan_send.pathPlan.laneChangeDirection = self.lane_change_direction
 | |
| 
 | |
|     pm.send('pathPlan', plan_send)
 | |
| 
 | |
|     if LOG_MPC:
 | |
|       dat = messaging.new_message()
 | |
|       dat.init('liveMpc')
 | |
|       dat.liveMpc.x = list(self.mpc_solution[0].x)
 | |
|       dat.liveMpc.y = list(self.mpc_solution[0].y)
 | |
|       dat.liveMpc.psi = list(self.mpc_solution[0].psi)
 | |
|       dat.liveMpc.delta = list(self.mpc_solution[0].delta)
 | |
|       dat.liveMpc.cost = self.mpc_solution[0].cost
 | |
|       pm.send('liveMpc', dat)
 | |
| 
 |