You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
73 lines
2.1 KiB
73 lines
2.1 KiB
import numpy as np
|
|
import math
|
|
|
|
from selfdrive.swaglog import cloudlog
|
|
from common.realtime import sec_since_boot
|
|
from selfdrive.controls.lib.longitudinal_mpc_model import libmpc_py
|
|
|
|
|
|
class LongitudinalMpcModel():
|
|
def __init__(self):
|
|
|
|
self.setup_mpc()
|
|
self.v_mpc = 0.0
|
|
self.v_mpc_future = 0.0
|
|
self.a_mpc = 0.0
|
|
self.last_cloudlog_t = 0.0
|
|
self.ts = list(range(10))
|
|
|
|
self.valid = False
|
|
|
|
def setup_mpc(self, v_ego=0.0):
|
|
self.libmpc = libmpc_py.libmpc
|
|
self.libmpc.init(1.0, 1.0, 1.0, 1.0, 1.0)
|
|
self.libmpc.init_with_simulation(v_ego)
|
|
|
|
self.mpc_solution = libmpc_py.ffi.new("log_t *")
|
|
self.cur_state = libmpc_py.ffi.new("state_t *")
|
|
|
|
self.cur_state[0].x_ego = 0
|
|
self.cur_state[0].v_ego = 0
|
|
self.cur_state[0].a_ego = 0
|
|
|
|
def set_cur_state(self, v, a):
|
|
self.cur_state[0].x_ego = 0.0
|
|
self.cur_state[0].v_ego = v
|
|
self.cur_state[0].a_ego = a
|
|
|
|
def update(self, v_ego, a_ego, poss, speeds, accels):
|
|
if len(poss) == 0:
|
|
self.valid = False
|
|
return
|
|
|
|
x_poly = list(map(float, np.polyfit(self.ts, poss, 3)))
|
|
v_poly = list(map(float, np.polyfit(self.ts, speeds, 3)))
|
|
a_poly = list(map(float, np.polyfit(self.ts, accels, 3)))
|
|
|
|
# Calculate mpc
|
|
self.libmpc.run_mpc(self.cur_state, self.mpc_solution, x_poly, v_poly, a_poly)
|
|
|
|
# Get solution. MPC timestep is 0.2 s, so interpolation to 0.05 s is needed
|
|
self.v_mpc = self.mpc_solution[0].v_ego[1]
|
|
self.a_mpc = self.mpc_solution[0].a_ego[1]
|
|
self.v_mpc_future = self.mpc_solution[0].v_ego[10]
|
|
self.valid = True
|
|
|
|
# Reset if NaN or goes through lead car
|
|
nans = any(math.isnan(x) for x in self.mpc_solution[0].v_ego)
|
|
|
|
t = sec_since_boot()
|
|
if nans:
|
|
if t > self.last_cloudlog_t + 5.0:
|
|
self.last_cloudlog_t = t
|
|
cloudlog.warning("Longitudinal model mpc reset - backwards")
|
|
|
|
self.libmpc.init(1.0, 1.0, 1.0, 1.0, 1.0)
|
|
self.libmpc.init_with_simulation(v_ego)
|
|
|
|
self.cur_state[0].v_ego = v_ego
|
|
self.cur_state[0].a_ego = 0.0
|
|
|
|
self.v_mpc = v_ego
|
|
self.a_mpc = a_ego
|
|
self.valid = False
|
|
|