parent
							
								
									164880524a
								
							
						
					
					
						commit
						af97a96080
					
				
				 2 changed files with 106 additions and 1 deletions
			
			
		@ -0,0 +1,105 @@ | 
				
			||||
#!/usr/bin/env python3 | 
				
			||||
import sys | 
				
			||||
import numpy as np | 
				
			||||
import sympy as sp | 
				
			||||
 | 
				
			||||
from selfdrive.locationd.models.constants import ObservationKind | 
				
			||||
from rednose.helpers.ekf_sym import gen_code, EKF_sym | 
				
			||||
 | 
				
			||||
 | 
				
			||||
class LaneKalman(): | 
				
			||||
  name = 'lane' | 
				
			||||
 | 
				
			||||
  @staticmethod | 
				
			||||
  def generate_code(generated_dir): | 
				
			||||
    # make functions and jacobians with sympy | 
				
			||||
    #  state variables | 
				
			||||
    dim = 6 | 
				
			||||
    state = sp.MatrixSymbol('state', dim, 1) | 
				
			||||
 | 
				
			||||
    dd = sp.Symbol('dd')  # WARNING: NOT TIME | 
				
			||||
 | 
				
			||||
    # Time derivative of the state as a function of state | 
				
			||||
    state_dot = sp.Matrix(np.zeros((dim, 1))) | 
				
			||||
    state_dot[:3,0] = sp.Matrix(state[3:6,0]) | 
				
			||||
 | 
				
			||||
    # Basic descretization, 1st order intergrator | 
				
			||||
    # Can be pretty bad if dt is big | 
				
			||||
    f_sym = sp.Matrix(state) + dd*state_dot | 
				
			||||
 | 
				
			||||
    # | 
				
			||||
    # Observation functions | 
				
			||||
    # | 
				
			||||
    h_lane_sym = sp.Matrix(state[:3,0]) | 
				
			||||
    obs_eqs = [[h_lane_sym, ObservationKind.LANE_PT, None]] | 
				
			||||
    gen_code(generated_dir, LaneKalman.name, f_sym, dd, state, obs_eqs, dim, dim) | 
				
			||||
 | 
				
			||||
  def __init__(self, generated_dir, pt_std=5): | 
				
			||||
    # state | 
				
			||||
    # left and right lane centers in ecef | 
				
			||||
    # WARNING: this is not a temporal model | 
				
			||||
    # the 'time' in this kalman filter is | 
				
			||||
    # the distance traveled by the vehicle, | 
				
			||||
    # which should approximately be the | 
				
			||||
    # distance along the lane path | 
				
			||||
    # a more logical parametrization | 
				
			||||
    # states 0-2 are ecef coordinates distance d | 
				
			||||
    # states 3-5  is the 3d "velocity" of the | 
				
			||||
    # lane in ecef (m/m). | 
				
			||||
    x_initial = np.array([0,0,0, | 
				
			||||
                          0,0,0]) | 
				
			||||
 | 
				
			||||
    # state covariance | 
				
			||||
    P_initial = np.diag([1e16, 1e16, 1e16, | 
				
			||||
                         1**2, 1**2, 1**2]) | 
				
			||||
 | 
				
			||||
    # process noise | 
				
			||||
    Q = np.diag([0.1**2, 0.1**2, 0.1**2, | 
				
			||||
                 0.1**2, 0.1**2, 0.1*2]) | 
				
			||||
 | 
				
			||||
    self.dim_state = len(x_initial) | 
				
			||||
 | 
				
			||||
    # init filter | 
				
			||||
    self.filter = EKF_sym(generated_dir, self.name, Q, x_initial, P_initial, x_initial.shape[0], P_initial.shape[0]) | 
				
			||||
    self.obs_noise = {ObservationKind.LANE_PT: np.diag([pt_std**2]*3)} | 
				
			||||
 | 
				
			||||
  @property | 
				
			||||
  def x(self): | 
				
			||||
    return self.filter.state() | 
				
			||||
 | 
				
			||||
  @property | 
				
			||||
  def P(self): | 
				
			||||
    return self.filter.covs() | 
				
			||||
 | 
				
			||||
  def predict(self, t): | 
				
			||||
    return self.filter.predict(t) | 
				
			||||
 | 
				
			||||
  def rts_smooth(self, estimates): | 
				
			||||
    return self.filter.rts_smooth(estimates, norm_quats=False) | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  def init_state(self, state, covs_diag=None, covs=None, filter_time=None): | 
				
			||||
    if covs_diag is not None: | 
				
			||||
      P = np.diag(covs_diag) | 
				
			||||
    elif covs is not None: | 
				
			||||
      P = covs | 
				
			||||
    else: | 
				
			||||
      P = self.filter.covs() | 
				
			||||
    self.filter.init_state(state, P, filter_time) | 
				
			||||
 | 
				
			||||
  def predict_and_observe(self, t, kind, data): | 
				
			||||
    data = np.atleast_2d(data) | 
				
			||||
    return self.filter.predict_and_update_batch(t, kind, data, self.get_R(kind, len(data))) | 
				
			||||
 | 
				
			||||
  def get_R(self, kind, n): | 
				
			||||
    obs_noise = self.obs_noise[kind] | 
				
			||||
    dim = obs_noise.shape[0] | 
				
			||||
    R = np.zeros((n, dim, dim)) | 
				
			||||
    for i in range(n): | 
				
			||||
      R[i,:,:] = obs_noise | 
				
			||||
    return R | 
				
			||||
 | 
				
			||||
 | 
				
			||||
if __name__ == "__main__": | 
				
			||||
  generated_dir = sys.argv[2] | 
				
			||||
  LaneKalman.generate_code(generated_dir) | 
				
			||||
					Loading…
					
					
				
		Reference in new issue