|
|
@ -12,6 +12,15 @@ from cereal import log |
|
|
|
TRAJECTORY_SIZE = 33 |
|
|
|
TRAJECTORY_SIZE = 33 |
|
|
|
CAMERA_OFFSET = 0.04 |
|
|
|
CAMERA_OFFSET = 0.04 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PATH_COST = 1.0 |
|
|
|
|
|
|
|
LATERAL_MOTION_COST = 0.11 |
|
|
|
|
|
|
|
LATERAL_ACCEL_COST = 0.0 |
|
|
|
|
|
|
|
LATERAL_JERK_COST = 0.05 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MIN_SPEED = 1.5 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LateralPlanner: |
|
|
|
class LateralPlanner: |
|
|
|
def __init__(self, CP): |
|
|
|
def __init__(self, CP): |
|
|
|
self.DH = DesireHelper() |
|
|
|
self.DH = DesireHelper() |
|
|
@ -36,7 +45,8 @@ class LateralPlanner: |
|
|
|
self.lat_mpc.reset(x0=self.x0) |
|
|
|
self.lat_mpc.reset(x0=self.x0) |
|
|
|
|
|
|
|
|
|
|
|
def update(self, sm): |
|
|
|
def update(self, sm): |
|
|
|
v_ego = sm['carState'].vEgo |
|
|
|
# clip speed , lateral planning is not possible at 0 speed |
|
|
|
|
|
|
|
self.v_ego = max(MIN_SPEED, sm['carState'].vEgo) |
|
|
|
measured_curvature = sm['controlsState'].curvature |
|
|
|
measured_curvature = sm['controlsState'].curvature |
|
|
|
|
|
|
|
|
|
|
|
# Parse model predictions |
|
|
|
# Parse model predictions |
|
|
@ -56,20 +66,19 @@ class LateralPlanner: |
|
|
|
self.DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob) |
|
|
|
self.DH.update(sm['carState'], sm['carControl'].latActive, lane_change_prob) |
|
|
|
|
|
|
|
|
|
|
|
d_path_xyz = self.path_xyz |
|
|
|
d_path_xyz = self.path_xyz |
|
|
|
# Heading cost is useful at low speed, otherwise end of plan can be off-heading |
|
|
|
self.lat_mpc.set_weights(PATH_COST, LATERAL_MOTION_COST, |
|
|
|
heading_cost = interp(v_ego, [5.0, 10.0], [1.0, 0.15]) |
|
|
|
LATERAL_ACCEL_COST, LATERAL_JERK_COST) |
|
|
|
self.lat_mpc.set_weights(1.0, heading_cost, 0.0, .075) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
y_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:, 1]) |
|
|
|
y_pts = np.interp(self.v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:, 1]) |
|
|
|
heading_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw) |
|
|
|
heading_pts = np.interp(self.v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw) |
|
|
|
yaw_rate_pts = np.interp(v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw_rate) |
|
|
|
yaw_rate_pts = np.interp(self.v_ego * self.t_idxs[:LAT_MPC_N + 1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw_rate) |
|
|
|
self.y_pts = y_pts |
|
|
|
self.y_pts = y_pts |
|
|
|
|
|
|
|
|
|
|
|
assert len(y_pts) == LAT_MPC_N + 1 |
|
|
|
assert len(y_pts) == LAT_MPC_N + 1 |
|
|
|
assert len(heading_pts) == LAT_MPC_N + 1 |
|
|
|
assert len(heading_pts) == LAT_MPC_N + 1 |
|
|
|
assert len(yaw_rate_pts) == LAT_MPC_N + 1 |
|
|
|
assert len(yaw_rate_pts) == LAT_MPC_N + 1 |
|
|
|
lateral_factor = max(0, self.factor1 - (self.factor2 * v_ego**2)) |
|
|
|
lateral_factor = max(0, self.factor1 - (self.factor2 * self.v_ego**2)) |
|
|
|
p = np.array([v_ego, lateral_factor]) |
|
|
|
p = np.array([self.v_ego, lateral_factor]) |
|
|
|
self.lat_mpc.run(self.x0, |
|
|
|
self.lat_mpc.run(self.x0, |
|
|
|
p, |
|
|
|
p, |
|
|
|
y_pts, |
|
|
|
y_pts, |
|
|
@ -86,7 +95,7 @@ class LateralPlanner: |
|
|
|
t = sec_since_boot() |
|
|
|
t = sec_since_boot() |
|
|
|
if mpc_nans or self.lat_mpc.solution_status != 0: |
|
|
|
if mpc_nans or self.lat_mpc.solution_status != 0: |
|
|
|
self.reset_mpc() |
|
|
|
self.reset_mpc() |
|
|
|
self.x0[3] = measured_curvature |
|
|
|
self.x0[3] = measured_curvature * self.v_ego |
|
|
|
if t > self.last_cloudlog_t + 5.0: |
|
|
|
if t > self.last_cloudlog_t + 5.0: |
|
|
|
self.last_cloudlog_t = t |
|
|
|
self.last_cloudlog_t = t |
|
|
|
cloudlog.warning("Lateral mpc - nan: True") |
|
|
|
cloudlog.warning("Lateral mpc - nan: True") |
|
|
@ -106,10 +115,8 @@ class LateralPlanner: |
|
|
|
lateralPlan.dPathPoints = self.y_pts.tolist() |
|
|
|
lateralPlan.dPathPoints = self.y_pts.tolist() |
|
|
|
lateralPlan.psis = self.lat_mpc.x_sol[0:CONTROL_N, 2].tolist() |
|
|
|
lateralPlan.psis = self.lat_mpc.x_sol[0:CONTROL_N, 2].tolist() |
|
|
|
|
|
|
|
|
|
|
|
# clip speed for curv calculation at 1m/s, to prevent low speed extremes |
|
|
|
lateralPlan.curvatures = (self.lat_mpc.x_sol[0:CONTROL_N, 3]/self.v_ego).tolist() |
|
|
|
clipped_speed = max(1.0, sm['carState'].vEgo) |
|
|
|
lateralPlan.curvatureRates = [float(x/self.v_ego) for x in self.lat_mpc.u_sol[0:CONTROL_N - 1]] + [0.0] |
|
|
|
lateralPlan.curvatures = (self.lat_mpc.x_sol[0:CONTROL_N, 3]/clipped_speed).tolist() |
|
|
|
|
|
|
|
lateralPlan.curvatureRates = [float(x/clipped_speed) for x in self.lat_mpc.u_sol[0:CONTROL_N - 1]] + [0.0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
lateralPlan.mpcSolutionValid = bool(plan_solution_valid) |
|
|
|
lateralPlan.mpcSolutionValid = bool(plan_solution_valid) |
|
|
|
lateralPlan.solverExecutionTime = self.lat_mpc.solve_time |
|
|
|
lateralPlan.solverExecutionTime = self.lat_mpc.solve_time |
|
|
|