openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

122 lines
4.5 KiB

import zmq
import math
import numpy as np
from common.realtime import sec_since_boot
from selfdrive.services import service_list
from selfdrive.swaglog import cloudlog
from selfdrive.controls.lib.lateral_mpc import libmpc_py
from selfdrive.controls.lib.drive_helpers import MPC_COST_LAT
from selfdrive.controls.lib.model_parser import ModelParser
import selfdrive.messaging as messaging
def calc_states_after_delay(states, v_ego, steer_angle, curvature_factor, steer_ratio, delay):
states[0].x = v_ego * delay
states[0].psi = v_ego * curvature_factor * math.radians(steer_angle) / steer_ratio * delay
return states
class PathPlanner(object):
def __init__(self, CP):
self.MP = ModelParser()
self.last_cloudlog_t = 0
context = zmq.Context()
self.plan = messaging.pub_sock(context, service_list['pathPlan'].port)
self.livempc = messaging.pub_sock(context, service_list['liveMpc'].port)
self.setup_mpc(CP.steerRateCost)
self.invalid_counter = 0
def setup_mpc(self, steer_rate_cost):
self.libmpc = libmpc_py.libmpc
self.libmpc.init(MPC_COST_LAT.PATH, MPC_COST_LAT.LANE, MPC_COST_LAT.HEADING, steer_rate_cost)
self.mpc_solution = libmpc_py.ffi.new("log_t *")
self.cur_state = libmpc_py.ffi.new("state_t *")
self.cur_state[0].x = 0.0
self.cur_state[0].y = 0.0
self.cur_state[0].psi = 0.0
self.cur_state[0].delta = 0.0
self.angle_steers_des = 0.0
self.angle_steers_des_mpc = 0.0
self.angle_steers_des_prev = 0.0
self.angle_steers_des_time = 0.0
def update(self, CP, VM, CS, md, live100):
v_ego = CS.carState.vEgo
angle_steers = CS.carState.steeringAngle
active = live100.live100.active
angle_offset = live100.live100.angleOffset
self.MP.update(v_ego, md)
# Run MPC
self.angle_steers_des_prev = self.angle_steers_des_mpc
curvature_factor = VM.curvature_factor(v_ego)
l_poly = libmpc_py.ffi.new("double[4]", list(self.MP.l_poly))
r_poly = libmpc_py.ffi.new("double[4]", list(self.MP.r_poly))
p_poly = libmpc_py.ffi.new("double[4]", list(self.MP.p_poly))
# account for actuation delay
self.cur_state = calc_states_after_delay(self.cur_state, v_ego, angle_steers, curvature_factor, CP.steerRatio, CP.steerActuatorDelay)
v_ego_mpc = max(v_ego, 5.0) # avoid mpc roughness due to low speed
self.libmpc.run_mpc(self.cur_state, self.mpc_solution,
l_poly, r_poly, p_poly,
self.MP.l_prob, self.MP.r_prob, self.MP.p_prob, curvature_factor, v_ego_mpc, self.MP.lane_width)
# reset to current steer angle if not active or overriding
if active:
delta_desired = self.mpc_solution[0].delta[1]
else:
delta_desired = math.radians(angle_steers - angle_offset) / CP.steerRatio
self.cur_state[0].delta = delta_desired
self.angle_steers_des_mpc = float(math.degrees(delta_desired * CP.steerRatio) + angle_offset)
# Check for infeasable MPC solution
mpc_nans = np.any(np.isnan(list(self.mpc_solution[0].delta)))
t = sec_since_boot()
if mpc_nans:
self.libmpc.init(MPC_COST_LAT.PATH, MPC_COST_LAT.LANE, MPC_COST_LAT.HEADING, CP.steerRateCost)
self.cur_state[0].delta = math.radians(angle_steers) / CP.steerRatio
if t > self.last_cloudlog_t + 5.0:
self.last_cloudlog_t = t
cloudlog.warning("Lateral mpc - nan: True")
if self.mpc_solution[0].cost > 20000. or mpc_nans: # TODO: find a better way to detect when MPC did not converge
self.invalid_counter += 1
else:
self.invalid_counter = 0
plan_valid = self.invalid_counter < 2
plan_send = messaging.new_message()
plan_send.init('pathPlan')
plan_send.pathPlan.laneWidth = float(self.MP.lane_width)
plan_send.pathPlan.dPoly = map(float, self.MP.d_poly)
plan_send.pathPlan.cPoly = map(float, self.MP.c_poly)
plan_send.pathPlan.cProb = float(self.MP.c_prob)
plan_send.pathPlan.lPoly = map(float, l_poly)
plan_send.pathPlan.lProb = float(self.MP.l_prob)
plan_send.pathPlan.rPoly = map(float, r_poly)
plan_send.pathPlan.rProb = float(self.MP.r_prob)
plan_send.pathPlan.angleSteers = float(self.angle_steers_des_mpc)
plan_send.pathPlan.valid = bool(plan_valid)
self.plan.send(plan_send.to_bytes())
dat = messaging.new_message()
dat.init('liveMpc')
dat.liveMpc.x = list(self.mpc_solution[0].x)
dat.liveMpc.y = list(self.mpc_solution[0].y)
dat.liveMpc.psi = list(self.mpc_solution[0].psi)
dat.liveMpc.delta = list(self.mpc_solution[0].delta)
dat.liveMpc.cost = self.mpc_solution[0].cost
self.livempc.send(dat.to_bytes())