|  |  |  | import numpy as np
 | 
					
						
							|  |  |  | import math
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from selfdrive.swaglog import cloudlog
 | 
					
						
							|  |  |  | from common.realtime import sec_since_boot
 | 
					
						
							|  |  |  | from selfdrive.controls.lib.longitudinal_mpc_model import libmpc_py
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class LongitudinalMpcModel():
 | 
					
						
							|  |  |  |   def __init__(self):
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.setup_mpc()
 | 
					
						
							|  |  |  |     self.v_mpc = 0.0
 | 
					
						
							|  |  |  |     self.v_mpc_future = 0.0
 | 
					
						
							|  |  |  |     self.a_mpc = 0.0
 | 
					
						
							|  |  |  |     self.last_cloudlog_t = 0.0
 | 
					
						
							|  |  |  |     self.ts = list(range(10))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.valid = False
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def setup_mpc(self, v_ego=0.0):
 | 
					
						
							|  |  |  |     self.libmpc = libmpc_py.libmpc
 | 
					
						
							|  |  |  |     self.libmpc.init(1.0, 1.0, 1.0, 1.0, 1.0)
 | 
					
						
							|  |  |  |     self.libmpc.init_with_simulation(v_ego)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.mpc_solution = libmpc_py.ffi.new("log_t *")
 | 
					
						
							|  |  |  |     self.cur_state = libmpc_py.ffi.new("state_t *")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.cur_state[0].x_ego = 0
 | 
					
						
							|  |  |  |     self.cur_state[0].v_ego = 0
 | 
					
						
							|  |  |  |     self.cur_state[0].a_ego = 0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def set_cur_state(self, v, a):
 | 
					
						
							|  |  |  |     self.cur_state[0].x_ego = 0.0
 | 
					
						
							|  |  |  |     self.cur_state[0].v_ego = v
 | 
					
						
							|  |  |  |     self.cur_state[0].a_ego = a
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def update(self, v_ego, a_ego, poss, speeds, accels):
 | 
					
						
							|  |  |  |     if len(poss) == 0:
 | 
					
						
							|  |  |  |       self.valid = False
 | 
					
						
							|  |  |  |       return
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     x_poly = list(map(float, np.polyfit(self.ts, poss, 3)))
 | 
					
						
							|  |  |  |     v_poly = list(map(float, np.polyfit(self.ts, speeds, 3)))
 | 
					
						
							|  |  |  |     a_poly = list(map(float, np.polyfit(self.ts, accels, 3)))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Calculate mpc
 | 
					
						
							|  |  |  |     self.libmpc.run_mpc(self.cur_state, self.mpc_solution, x_poly, v_poly, a_poly)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Get solution. MPC timestep is 0.2 s, so interpolation to 0.05 s is needed
 | 
					
						
							|  |  |  |     self.v_mpc = self.mpc_solution[0].v_ego[1]
 | 
					
						
							|  |  |  |     self.a_mpc = self.mpc_solution[0].a_ego[1]
 | 
					
						
							|  |  |  |     self.v_mpc_future = self.mpc_solution[0].v_ego[10]
 | 
					
						
							|  |  |  |     self.valid = True
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Reset if NaN or goes through lead car
 | 
					
						
							|  |  |  |     nans = any(math.isnan(x) for x in self.mpc_solution[0].v_ego)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     t = sec_since_boot()
 | 
					
						
							|  |  |  |     if nans:
 | 
					
						
							|  |  |  |       if t > self.last_cloudlog_t + 5.0:
 | 
					
						
							|  |  |  |         self.last_cloudlog_t = t
 | 
					
						
							|  |  |  |         cloudlog.warning("Longitudinal model mpc reset - backwards")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       self.libmpc.init(1.0, 1.0, 1.0, 1.0, 1.0)
 | 
					
						
							|  |  |  |       self.libmpc.init_with_simulation(v_ego)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       self.cur_state[0].v_ego = v_ego
 | 
					
						
							|  |  |  |       self.cur_state[0].a_ego = 0.0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       self.v_mpc = v_ego
 | 
					
						
							|  |  |  |       self.a_mpc = a_ego
 | 
					
						
							|  |  |  |       self.valid = False
 |