|
|
|
@ -38,7 +38,7 @@ DESIRES = { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LateralPlanner(): |
|
|
|
|
class LateralPlanner: |
|
|
|
|
def __init__(self, CP, use_lanelines=True, wide_camera=False): |
|
|
|
|
self.use_lanelines = use_lanelines |
|
|
|
|
self.LP = LanePlanner(wide_camera) |
|
|
|
@ -55,8 +55,8 @@ class LateralPlanner(): |
|
|
|
|
self.prev_one_blinker = False |
|
|
|
|
self.desire = log.LateralPlan.Desire.none |
|
|
|
|
|
|
|
|
|
self.path_xyz = np.zeros((TRAJECTORY_SIZE,3)) |
|
|
|
|
self.path_xyz_stds = np.ones((TRAJECTORY_SIZE,3)) |
|
|
|
|
self.path_xyz = np.zeros((TRAJECTORY_SIZE, 3)) |
|
|
|
|
self.path_xyz_stds = np.ones((TRAJECTORY_SIZE, 3)) |
|
|
|
|
self.plan_yaw = np.zeros((TRAJECTORY_SIZE,)) |
|
|
|
|
self.t_idxs = np.arange(TRAJECTORY_SIZE) |
|
|
|
|
self.y_pts = np.zeros(TRAJECTORY_SIZE) |
|
|
|
@ -67,12 +67,8 @@ class LateralPlanner(): |
|
|
|
|
def reset_mpc(self, x0=np.zeros(6)): |
|
|
|
|
self.x0 = x0 |
|
|
|
|
self.lat_mpc.reset(x0=self.x0) |
|
|
|
|
self.desired_curvature = 0.0 |
|
|
|
|
self.safe_desired_curvature = 0.0 |
|
|
|
|
self.desired_curvature_rate = 0.0 |
|
|
|
|
self.safe_desired_curvature_rate = 0.0 |
|
|
|
|
|
|
|
|
|
def update(self, sm, CP): |
|
|
|
|
def update(self, sm): |
|
|
|
|
v_ego = sm['carState'].vEgo |
|
|
|
|
active = sm['controlsState'].active |
|
|
|
|
measured_curvature = sm['controlsState'].curvature |
|
|
|
@ -110,7 +106,7 @@ class LateralPlanner(): |
|
|
|
|
self.lane_change_direction = LaneChangeDirection.none |
|
|
|
|
|
|
|
|
|
torque_applied = sm['carState'].steeringPressed and \ |
|
|
|
|
((sm['carState'].steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or |
|
|
|
|
((sm['carState'].steeringTorque > 0 and self.lane_change_direction == LaneChangeDirection.left) or |
|
|
|
|
(sm['carState'].steeringTorque < 0 and self.lane_change_direction == LaneChangeDirection.right)) |
|
|
|
|
|
|
|
|
|
blindspot_detected = ((sm['carState'].leftBlindspot and self.lane_change_direction == LaneChangeDirection.left) or |
|
|
|
@ -124,7 +120,7 @@ class LateralPlanner(): |
|
|
|
|
# LaneChangeState.laneChangeStarting |
|
|
|
|
elif self.lane_change_state == LaneChangeState.laneChangeStarting: |
|
|
|
|
# fade out over .5s |
|
|
|
|
self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2*DT_MDL, 0.0) |
|
|
|
|
self.lane_change_ll_prob = max(self.lane_change_ll_prob - 2 * DT_MDL, 0.0) |
|
|
|
|
|
|
|
|
|
# 98% certainty |
|
|
|
|
lane_change_prob = self.LP.l_lane_change_prob + self.LP.r_lane_change_prob |
|
|
|
@ -167,14 +163,14 @@ class LateralPlanner(): |
|
|
|
|
self.LP.rll_prob *= self.lane_change_ll_prob |
|
|
|
|
if self.use_lanelines: |
|
|
|
|
d_path_xyz = self.LP.get_d_path(v_ego, self.t_idxs, self.path_xyz) |
|
|
|
|
self.lat_mpc.set_weights(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, CP.steerRateCost) |
|
|
|
|
self.lat_mpc.set_weights(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, self.steer_rate_cost) |
|
|
|
|
else: |
|
|
|
|
d_path_xyz = self.path_xyz |
|
|
|
|
path_cost = np.clip(abs(self.path_xyz[0,1]/self.path_xyz_stds[0,1]), 0.5, 1.5) * MPC_COST_LAT.PATH |
|
|
|
|
path_cost = np.clip(abs(self.path_xyz[0, 1] / self.path_xyz_stds[0, 1]), 0.5, 1.5) * MPC_COST_LAT.PATH |
|
|
|
|
# Heading cost is useful at low speed, otherwise end of plan can be off-heading |
|
|
|
|
heading_cost = interp(v_ego, [5.0, 10.0], [MPC_COST_LAT.HEADING, 0.0]) |
|
|
|
|
self.lat_mpc.set_weights(path_cost, heading_cost, CP.steerRateCost) |
|
|
|
|
y_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:,1]) |
|
|
|
|
self.lat_mpc.set_weights(path_cost, heading_cost, self.steer_rate_cost) |
|
|
|
|
y_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:, 1]) |
|
|
|
|
heading_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw) |
|
|
|
|
self.y_pts = y_pts |
|
|
|
|
|
|
|
|
@ -187,11 +183,10 @@ class LateralPlanner(): |
|
|
|
|
y_pts, |
|
|
|
|
heading_pts) |
|
|
|
|
# init state for next |
|
|
|
|
self.x0[3] = interp(DT_MDL, self.t_idxs[:LAT_MPC_N + 1], self.lat_mpc.x_sol[:,3]) |
|
|
|
|
self.x0[3] = interp(DT_MDL, self.t_idxs[:LAT_MPC_N + 1], self.lat_mpc.x_sol[:, 3]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Check for infeasable MPC solution |
|
|
|
|
mpc_nans = any(math.isnan(x) for x in self.lat_mpc.x_sol[:,3]) |
|
|
|
|
# Check for infeasible MPC solution |
|
|
|
|
mpc_nans = any(math.isnan(x) for x in self.lat_mpc.x_sol[:, 3]) |
|
|
|
|
t = sec_since_boot() |
|
|
|
|
if mpc_nans or self.lat_mpc.solution_status != 0: |
|
|
|
|
self.reset_mpc() |
|
|
|
@ -212,8 +207,8 @@ class LateralPlanner(): |
|
|
|
|
plan_send.lateralPlan.laneWidth = float(self.LP.lane_width) |
|
|
|
|
plan_send.lateralPlan.dPathPoints = [float(x) for x in self.y_pts] |
|
|
|
|
plan_send.lateralPlan.psis = [float(x) for x in self.lat_mpc.x_sol[0:CONTROL_N, 2]] |
|
|
|
|
plan_send.lateralPlan.curvatures = [float(x) for x in self.lat_mpc.x_sol[0:CONTROL_N,3]] |
|
|
|
|
plan_send.lateralPlan.curvatureRates = [float(x) for x in self.lat_mpc.u_sol[0:CONTROL_N-1]] +[0.0] |
|
|
|
|
plan_send.lateralPlan.curvatures = [float(x) for x in self.lat_mpc.x_sol[0:CONTROL_N, 3]] |
|
|
|
|
plan_send.lateralPlan.curvatureRates = [float(x) for x in self.lat_mpc.u_sol[0:CONTROL_N - 1]] + [0.0] |
|
|
|
|
plan_send.lateralPlan.lProb = float(self.LP.lll_prob) |
|
|
|
|
plan_send.lateralPlan.rProb = float(self.LP.rll_prob) |
|
|
|
|
plan_send.lateralPlan.dProb = float(self.LP.d_prob) |
|
|
|
|