#!/usr/bin/env python3
import sys
import numpy as np
import sympy as sp
from openpilot . selfdrive . locationd . models . constants import ObservationKind
from rednose . helpers . ekf_sym import gen_code , EKF_sym
class LaneKalman ( ) :
name = ' lane '
@staticmethod
def generate_code ( generated_dir ) :
# make functions and jacobians with sympy
# state variables
dim = 6
state = sp . MatrixSymbol ( ' state ' , dim , 1 )
dd = sp . Symbol ( ' dd ' ) # WARNING: NOT TIME
# Time derivative of the state as a function of state
state_dot = sp . Matrix ( np . zeros ( ( dim , 1 ) ) )
state_dot [ : 3 , 0 ] = sp . Matrix ( state [ 3 : 6 , 0 ] )
# Basic descretization, 1st order intergrator
# Can be pretty bad if dt is big
f_sym = sp . Matrix ( state ) + dd * state_dot
#
# Observation functions
#
h_lane_sym = sp . Matrix ( state [ : 3 , 0 ] )
obs_eqs = [ [ h_lane_sym , ObservationKind . LANE_PT , None ] ]
gen_code ( generated_dir , LaneKalman . name , f_sym , dd , state , obs_eqs , dim , dim )
def __init__ ( self , generated_dir , pt_std = 5 ) :
# state
# left and right lane centers in ecef
# WARNING: this is not a temporal model
# the 'time' in this kalman filter is
# the distance traveled by the vehicle,
# which should approximately be the
# distance along the lane path
# a more logical parametrization
# states 0-2 are ecef coordinates distance d
# states 3-5 is the 3d "velocity" of the
# lane in ecef (m/m).
x_initial = np . array ( [ 0 , 0 , 0 ,
0 , 0 , 0 ] )
# state covariance
P_initial = np . diag ( [ 1e16 , 1e16 , 1e16 ,
1 * * 2 , 1 * * 2 , 1 * * 2 ] )
# process noise
Q = np . diag ( [ 0.1 * * 2 , 0.1 * * 2 , 0.1 * * 2 ,
0.1 * * 2 , 0.1 * * 2 , 0.1 * 2 ] )
self . dim_state = len ( x_initial )
# init filter
self . filter = EKF_sym ( generated_dir , self . name , Q , x_initial , P_initial , x_initial . shape [ 0 ] , P_initial . shape [ 0 ] )
self . obs_noise = { ObservationKind . LANE_PT : np . diag ( [ pt_std * * 2 ] * 3 ) }
@property
def x ( self ) :
return self . filter . state ( )
@property
def P ( self ) :
return self . filter . covs ( )
def predict ( self , t ) :
return self . filter . predict ( t )
def rts_smooth ( self , estimates ) :
return self . filter . rts_smooth ( estimates , norm_quats = False )
def init_state ( self , state , covs_diag = None , covs = None , filter_time = None ) :
if covs_diag is not None :
P = np . diag ( covs_diag )
elif covs is not None :
P = covs
else :
P = self . filter . covs ( )
self . filter . init_state ( state , P , filter_time )
def predict_and_observe ( self , t , kind , data ) :
data = np . atleast_2d ( data )
return self . filter . predict_and_update_batch ( t , kind , data , self . get_R ( kind , len ( data ) ) )
def get_R ( self , kind , n ) :
obs_noise = self . obs_noise [ kind ]
dim = obs_noise . shape [ 0 ]
R = np . zeros ( ( n , dim , dim ) )
for i in range ( n ) :
R [ i , : , : ] = obs_noise
return R
if __name__ == " __main__ " :
generated_dir = sys . argv [ 2 ]
LaneKalman . generate_code ( generated_dir )