|
|
@ -71,35 +71,32 @@ class PathPlanner(): |
|
|
|
self.cur_state[0].x = 0.0 |
|
|
|
self.cur_state[0].x = 0.0 |
|
|
|
self.cur_state[0].y = 0.0 |
|
|
|
self.cur_state[0].y = 0.0 |
|
|
|
self.cur_state[0].psi = 0.0 |
|
|
|
self.cur_state[0].psi = 0.0 |
|
|
|
self.cur_state[0].delta = 0.0 |
|
|
|
self.cur_state[0].tire_angle = 0.0 |
|
|
|
|
|
|
|
|
|
|
|
self.angle_steers_des = 0.0 |
|
|
|
self.angle_steers_des = 0.0 |
|
|
|
self.angle_steers_des_mpc = 0.0 |
|
|
|
self.angle_steers_des_mpc = 0.0 |
|
|
|
self.angle_steers_des_prev = 0.0 |
|
|
|
|
|
|
|
self.angle_steers_des_time = 0.0 |
|
|
|
self.angle_steers_des_time = 0.0 |
|
|
|
|
|
|
|
|
|
|
|
def update(self, sm, pm, CP, VM): |
|
|
|
def update(self, sm, pm, CP, VM): |
|
|
|
v_ego = sm['carState'].vEgo |
|
|
|
v_ego = sm['carState'].vEgo |
|
|
|
angle_steers = sm['carState'].steeringAngle |
|
|
|
|
|
|
|
active = sm['controlsState'].active |
|
|
|
active = sm['controlsState'].active |
|
|
|
|
|
|
|
steering_wheel_angle_offset_deg = sm['liveParameters'].angleOffset |
|
|
|
|
|
|
|
steering_wheel_angle_deg = sm['carState'].steeringAngle |
|
|
|
|
|
|
|
measured_tire_angle = -math.radians(steering_wheel_angle_deg - steering_wheel_angle_offset_deg) / VM.sR |
|
|
|
|
|
|
|
|
|
|
|
angle_offset = sm['liveParameters'].angleOffset |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Run MPC |
|
|
|
|
|
|
|
self.angle_steers_des_prev = self.angle_steers_des_mpc |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Update vehicle model |
|
|
|
# Update vehicle model |
|
|
|
x = max(sm['liveParameters'].stiffnessFactor, 0.1) |
|
|
|
x = max(sm['liveParameters'].stiffnessFactor, 0.1) |
|
|
|
sr = max(sm['liveParameters'].steerRatio, 0.1) |
|
|
|
sr = max(sm['liveParameters'].steerRatio, 0.1) |
|
|
|
VM.update_params(x, sr) |
|
|
|
VM.update_params(x, sr) |
|
|
|
|
|
|
|
|
|
|
|
curvature_factor = VM.curvature_factor(v_ego) |
|
|
|
curvature_factor = VM.curvature_factor(v_ego) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
md = sm['modelV2'] |
|
|
|
md = sm['modelV2'] |
|
|
|
self.LP.parse_model(sm['modelV2']) |
|
|
|
self.LP.parse_model(sm['modelV2']) |
|
|
|
if len(md.position.x) == TRAJECTORY_SIZE and len(md.orientation.x) == TRAJECTORY_SIZE: |
|
|
|
if len(md.position.x) == TRAJECTORY_SIZE and len(md.orientation.x) == TRAJECTORY_SIZE: |
|
|
|
self.path_xyz = np.column_stack([md.position.x, md.position.y, md.position.z]) |
|
|
|
self.path_xyz = np.column_stack([md.position.x, md.position.y, md.position.z]) |
|
|
|
self.t_idxs = list(md.position.t) |
|
|
|
self.t_idxs = np.array(md.position.t) |
|
|
|
self.plan_yaw = list(md.orientation.z) |
|
|
|
self.plan_yaw = list(md.orientation.z) |
|
|
|
|
|
|
|
|
|
|
|
# Lane change logic |
|
|
|
# Lane change logic |
|
|
@ -168,16 +165,14 @@ class PathPlanner(): |
|
|
|
self.LP.lll_prob *= self.lane_change_ll_prob |
|
|
|
self.LP.lll_prob *= self.lane_change_ll_prob |
|
|
|
self.LP.rll_prob *= self.lane_change_ll_prob |
|
|
|
self.LP.rll_prob *= self.lane_change_ll_prob |
|
|
|
d_path_xyz = self.LP.get_d_path(v_ego, self.t_idxs, self.path_xyz) |
|
|
|
d_path_xyz = self.LP.get_d_path(v_ego, self.t_idxs, self.path_xyz) |
|
|
|
y_pts = np.interp(self.t_idxs[:MPC_N+1], np.linalg.norm(d_path_xyz, axis=1)/v_ego, d_path_xyz[:,1]) |
|
|
|
y_pts = np.interp(v_ego * self.t_idxs[:MPC_N+1], np.linalg.norm(d_path_xyz, axis=1), d_path_xyz[:,1]) |
|
|
|
heading_pts = np.interp(self.t_idxs[:MPC_N+1], np.linalg.norm(self.path_xyz, axis=1)/v_ego, self.plan_yaw) |
|
|
|
heading_pts = np.interp(v_ego * self.t_idxs[:MPC_N+1], np.linalg.norm(self.path_xyz, axis=1), self.plan_yaw) |
|
|
|
|
|
|
|
|
|
|
|
v_ego_mpc = max(v_ego, 5.0) # avoid mpc roughness due to low speed |
|
|
|
v_ego_mpc = max(v_ego, 5.0) # avoid mpc roughness due to low speed |
|
|
|
v_poly = [0.0, 0.0, 0.0, v_ego_mpc] |
|
|
|
|
|
|
|
assert len(v_poly) == 4 |
|
|
|
|
|
|
|
assert len(y_pts) == MPC_N + 1 |
|
|
|
assert len(y_pts) == MPC_N + 1 |
|
|
|
assert len(heading_pts) == MPC_N + 1 |
|
|
|
assert len(heading_pts) == MPC_N + 1 |
|
|
|
self.libmpc.run_mpc(self.cur_state, self.mpc_solution, |
|
|
|
self.libmpc.run_mpc(self.cur_state, self.mpc_solution, |
|
|
|
v_poly, |
|
|
|
float(v_ego_mpc), |
|
|
|
curvature_factor, |
|
|
|
curvature_factor, |
|
|
|
CAR_ROTATION_RADIUS, |
|
|
|
CAR_ROTATION_RADIUS, |
|
|
|
list(y_pts), |
|
|
|
list(y_pts), |
|
|
@ -186,30 +181,31 @@ class PathPlanner(): |
|
|
|
self.cur_state.x = 0.0 |
|
|
|
self.cur_state.x = 0.0 |
|
|
|
self.cur_state.y = 0.0 |
|
|
|
self.cur_state.y = 0.0 |
|
|
|
self.cur_state.psi = 0.0 |
|
|
|
self.cur_state.psi = 0.0 |
|
|
|
self.cur_state.delta = interp(DT_MDL, self.t_idxs[:MPC_N+1], self.mpc_solution.delta) |
|
|
|
self.cur_state.tire_angle = interp(DT_MDL, self.t_idxs[:MPC_N+1], self.mpc_solution.tire_angle) |
|
|
|
|
|
|
|
|
|
|
|
# TODO this needs more thought, use .2s extra for now to estimate other delays |
|
|
|
# TODO this needs more thought, use .2s extra for now to estimate other delays |
|
|
|
delay = CP.steerActuatorDelay + .2 |
|
|
|
delay = CP.steerActuatorDelay + .2 |
|
|
|
# TODO negative sign, still run mpc in ENU, make NED |
|
|
|
next_tire_angle = interp(DT_MDL + delay, self.t_idxs[:MPC_N+1], self.mpc_solution.tire_angle) |
|
|
|
next_delta = -interp(DT_MDL + delay, self.t_idxs[:MPC_N+1], self.mpc_solution.delta) |
|
|
|
next_tire_angle_rate = interp(delay, self.t_idxs[:MPC_N], self.mpc_solution.tire_angle_rate) |
|
|
|
next_rate = -interp(delay, self.t_idxs[:MPC_N], self.mpc_solution.rate) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# reset to current steer angle if not active or overriding |
|
|
|
# reset to current steer angle if not active or overriding |
|
|
|
if active: |
|
|
|
if active: |
|
|
|
delta_desired = next_delta |
|
|
|
tire_angle_desired = next_tire_angle |
|
|
|
rate_desired = math.degrees(next_rate * VM.sR) |
|
|
|
desired_tire_angle_rate = next_tire_angle_rate |
|
|
|
else: |
|
|
|
else: |
|
|
|
delta_desired = math.radians(angle_steers - angle_offset) / VM.sR |
|
|
|
tire_angle_desired = measured_tire_angle |
|
|
|
rate_desired = 0.0 |
|
|
|
desired_tire_angle_rate = 0.0 |
|
|
|
|
|
|
|
|
|
|
|
self.angle_steers_des_mpc = float(math.degrees(delta_desired * VM.sR) + angle_offset) |
|
|
|
# negative sign, controls uses different convention |
|
|
|
|
|
|
|
self.desired_steering_wheel_angle_deg = -float(math.degrees(tire_angle_desired * VM.sR)) + steering_wheel_angle_offset_deg |
|
|
|
|
|
|
|
self.desired_steering_wheel_angle_rate_deg = -float(math.degrees(desired_tire_angle_rate * VM.sR)) |
|
|
|
|
|
|
|
|
|
|
|
# Check for infeasable MPC solution |
|
|
|
# Check for infeasable MPC solution |
|
|
|
mpc_nans = any(math.isnan(x) for x in self.mpc_solution.delta) |
|
|
|
mpc_nans = any(math.isnan(x) for x in self.mpc_solution.tire_angle) |
|
|
|
t = sec_since_boot() |
|
|
|
t = sec_since_boot() |
|
|
|
if mpc_nans: |
|
|
|
if mpc_nans: |
|
|
|
self.libmpc.init(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, CP.steerRateCost) |
|
|
|
self.libmpc.init(MPC_COST_LAT.PATH, MPC_COST_LAT.HEADING, CP.steerRateCost) |
|
|
|
self.cur_state[0].delta = math.radians(angle_steers - angle_offset) / VM.sR |
|
|
|
self.cur_state.tire_angle = measured_tire_angle |
|
|
|
|
|
|
|
|
|
|
|
if t > self.last_cloudlog_t + 5.0: |
|
|
|
if t > self.last_cloudlog_t + 5.0: |
|
|
|
self.last_cloudlog_t = t |
|
|
|
self.last_cloudlog_t = t |
|
|
@ -223,14 +219,13 @@ class PathPlanner(): |
|
|
|
plan_send = messaging.new_message('pathPlan') |
|
|
|
plan_send = messaging.new_message('pathPlan') |
|
|
|
plan_send.valid = sm.all_alive_and_valid(service_list=['carState', 'controlsState', 'liveParameters', 'modelV2']) |
|
|
|
plan_send.valid = sm.all_alive_and_valid(service_list=['carState', 'controlsState', 'liveParameters', 'modelV2']) |
|
|
|
plan_send.pathPlan.laneWidth = float(self.LP.lane_width) |
|
|
|
plan_send.pathPlan.laneWidth = float(self.LP.lane_width) |
|
|
|
plan_send.pathPlan.dPoly = [0,0,0,0] |
|
|
|
plan_send.pathPlan.dPathPoints = [float(x) for x in y_pts] |
|
|
|
plan_send.pathPlan.lPoly = [0,0,0,0] |
|
|
|
|
|
|
|
plan_send.pathPlan.rPoly = [0,0,0,0] |
|
|
|
|
|
|
|
plan_send.pathPlan.lProb = float(self.LP.lll_prob) |
|
|
|
plan_send.pathPlan.lProb = float(self.LP.lll_prob) |
|
|
|
plan_send.pathPlan.rProb = float(self.LP.rll_prob) |
|
|
|
plan_send.pathPlan.rProb = float(self.LP.rll_prob) |
|
|
|
|
|
|
|
plan_send.pathPlan.dProb = float(self.LP.d_prob) |
|
|
|
|
|
|
|
|
|
|
|
plan_send.pathPlan.angleSteers = float(self.angle_steers_des_mpc) |
|
|
|
plan_send.pathPlan.angleSteers = float(self.desired_steering_wheel_angle_deg) |
|
|
|
plan_send.pathPlan.rateSteers = float(rate_desired) |
|
|
|
plan_send.pathPlan.rateSteers = float(self.desired_steering_wheel_angle_rate_deg) |
|
|
|
plan_send.pathPlan.angleOffset = float(sm['liveParameters'].angleOffsetAverage) |
|
|
|
plan_send.pathPlan.angleOffset = float(sm['liveParameters'].angleOffsetAverage) |
|
|
|
plan_send.pathPlan.mpcSolutionValid = bool(plan_solution_valid) |
|
|
|
plan_send.pathPlan.mpcSolutionValid = bool(plan_solution_valid) |
|
|
|
plan_send.pathPlan.paramsValid = bool(sm['liveParameters'].valid) |
|
|
|
plan_send.pathPlan.paramsValid = bool(sm['liveParameters'].valid) |
|
|
@ -246,6 +241,6 @@ class PathPlanner(): |
|
|
|
dat.liveMpc.x = list(self.mpc_solution[0].x) |
|
|
|
dat.liveMpc.x = list(self.mpc_solution[0].x) |
|
|
|
dat.liveMpc.y = list(self.mpc_solution[0].y) |
|
|
|
dat.liveMpc.y = list(self.mpc_solution[0].y) |
|
|
|
dat.liveMpc.psi = list(self.mpc_solution[0].psi) |
|
|
|
dat.liveMpc.psi = list(self.mpc_solution[0].psi) |
|
|
|
dat.liveMpc.delta = list(self.mpc_solution[0].delta) |
|
|
|
dat.liveMpc.tire_angle = list(self.mpc_solution[0].tire_angle) |
|
|
|
dat.liveMpc.cost = self.mpc_solution[0].cost |
|
|
|
dat.liveMpc.cost = self.mpc_solution[0].cost |
|
|
|
pm.send('liveMpc', dat) |
|
|
|
pm.send('liveMpc', dat) |
|
|
|