openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

252 lines
8.7 KiB

# pylint: skip-file
import abc
import numpy as np
# The EKF class contains the framework for an Extended Kalman Filter, but must be subclassed to use.
# A subclass must implement:
# 1) calc_transfer_fun(); see bottom of file for more info.
# 2) __init__() to initialize self.state, self.covar, and self.process_noise appropriately
# Alternatively, the existing implementations of EKF can be used (e.g. EKF2D)
# Sensor classes are optionally used to pass measurement information into the EKF, to keep
# sensor parameters and processing methods for a each sensor together.
# Sensor classes have a read() method which takes raw sensor data and returns
# a SensorReading object, which can be passed to the EKF update() method.
# For usage, see run_ekf1d.py in selfdrive/new for a simple example.
# ekf.predict(dt) should be called between update cycles with the time since it was last called.
# Ideally, predict(dt) should be called at a relatively constant rate.
# update() should be called once per sensor, and can be called multiple times between predict steps.
# Access and set the state of the filter directly with ekf.state and ekf.covar.
class SensorReading:
# Given a perfect model and no noise, data = obs_model * state
def __init__(self, data, covar, obs_model):
self.data = data
self.obs_model = obs_model
self.covar = covar
def __repr__(self):
return "SensorReading(data={}, covar={}, obs_model={})".format(
repr(self.data), repr(self.covar), repr(self.obs_model))
# A generic sensor class that does no pre-processing of data
class SimpleSensor:
# obs_model can be
# a full observation model matrix, or
# an integer or tuple of indices into ekf.state, indicating which variables are being directly observed
# covar can be
# a full covariance matrix
# a float or tuple of individual covars for each component of the sensor reading
# dims is the number of states in the EKF
def __init__(self, obs_model, covar, dims):
# Allow for integer covar/obs_model
if not hasattr(obs_model, "__len__"):
obs_model = (obs_model, )
if not hasattr(covar, "__len__"):
covar = (covar, )
# Full observation model passed
if dims in np.array(obs_model).shape:
self.obs_model = np.asmatrix(obs_model)
self.covar = np.asmatrix(covar)
# Indices of unit observations passed
else:
self.obs_model = np.matlib.zeros((len(obs_model), dims))
self.obs_model[:, list(obs_model)] = np.identity(len(obs_model))
if np.asarray(covar).ndim == 2:
self.covar = np.asmatrix(covar)
elif len(covar) == len(obs_model):
self.covar = np.matlib.diag(covar)
else:
self.covar = np.matlib.identity(len(obs_model)) * covar
def read(self, data, covar=None):
if covar:
self.covar = covar
return SensorReading(data, self.covar, self.obs_model)
class EKF:
__metaclass__ = abc.ABCMeta
def __init__(self, debug=False):
self.DEBUG = debug
def __str__(self):
return "EKF(state={}, covar={})".format(self.state, self.covar)
# Measurement update
# Reading should be a SensorReading object with data, covar, and obs_model attributes
def update(self, reading):
# Potential improvements:
# deal with negative covars
# add noise to really low covars to ensure stability
# use mahalanobis distance to reject outliers
# wrap angles after state updates and innovation
# y = z - H*x
innovation = reading.data - reading.obs_model * self.state
if self.DEBUG:
print "reading:\n",reading.data
print "innovation:\n",innovation
# S = H*P*H' + R
innovation_covar = reading.obs_model * self.covar * reading.obs_model.T + reading.covar
# K = P*H'*S^-1
kalman_gain = self.covar * reading.obs_model.T * np.linalg.inv(
innovation_covar)
if self.DEBUG:
print "gain:\n", kalman_gain
print "innovation_covar:\n", innovation_covar
print "innovation: ", innovation
print "test: ", self.covar * reading.obs_model.T * (
reading.obs_model * self.covar * reading.obs_model.T + reading.covar *
0).I
# x = x + K*y
self.state += kalman_gain*innovation
# print "covar", np.diag(self.covar)
#self.state[(roll_vel, yaw_vel, pitch_vel),:] = reading.data
# Standard form: P = (I - K*H)*P
# self.covar = (self.identity - kalman_gain*reading.obs_model) * self.covar
# Use the Joseph form for numerical stability: P = (I-K*H)*P*(I - K*H)' + K*R*K'
aux_mtrx = (self.identity - kalman_gain * reading.obs_model)
self.covar = aux_mtrx * self.covar * aux_mtrx.T + kalman_gain * reading.covar * kalman_gain.T
if self.DEBUG:
print "After update"
print "state\n", self.state
print "covar:\n",self.covar
def update_scalar(self, reading):
# like update but knowing that measurement is a scalar
# this avoids matrix inversions and speeds up (surprisingly) drived.py a lot
# innovation = reading.data - np.matmul(reading.obs_model, self.state)
# innovation_covar = np.matmul(np.matmul(reading.obs_model, self.covar), reading.obs_model.T) + reading.covar
# kalman_gain = np.matmul(self.covar, reading.obs_model.T)/innovation_covar
# self.state += np.matmul(kalman_gain, innovation)
# aux_mtrx = self.identity - np.matmul(kalman_gain, reading.obs_model)
# self.covar = np.matmul(aux_mtrx, np.matmul(self.covar, aux_mtrx.T)) + np.matmul(kalman_gain, np.matmul(reading.covar, kalman_gain.T))
# written without np.matmul
es = np.einsum
ABC_T = "ij,jk,lk->il"
AB_T = "ij,kj->ik"
AB = "ij,jk->ik"
innovation = reading.data - es(AB, reading.obs_model, self.state)
innovation_covar = es(ABC_T, reading.obs_model, self.covar,
reading.obs_model) + reading.covar
kalman_gain = es(AB_T, self.covar, reading.obs_model) / innovation_covar
self.state += es(AB, kalman_gain, innovation)
aux_mtrx = self.identity - es(AB, kalman_gain, reading.obs_model)
self.covar = es(ABC_T, aux_mtrx, self.covar, aux_mtrx) + \
es(ABC_T, kalman_gain, reading.covar, kalman_gain)
# Prediction update
def predict(self, dt):
es = np.einsum
ABC_T = "ij,jk,lk->il"
AB = "ij,jk->ik"
# State update
transfer_fun, transfer_fun_jacobian = self.calc_transfer_fun(dt)
# self.state = np.matmul(transfer_fun, self.state)
# self.covar = np.matmul(np.matmul(transfer_fun_jacobian, self.covar), transfer_fun_jacobian.T) + self.process_noise * dt
# x = f(x, u), written in the form x = A(x, u)*x
self.state = es(AB, transfer_fun, self.state)
# P = J*P*J' + Q
self.covar = es(ABC_T, transfer_fun_jacobian, self.covar,
transfer_fun_jacobian) + self.process_noise * dt #!dt
#! Clip covariance to avoid explosions
self.covar = np.clip(self.covar,-1e10,1e10)
@abc.abstractmethod
def calc_transfer_fun(self, dt):
"""Return a tuple with the transfer function and transfer function jacobian
The transfer function and jacobian should both be a numpy matrix of size DIMSxDIMS
The transfer function matrix A should satisfy the state-update equation
x_(k+1) = A * x_k
The jacobian J is the direct jacobian A*x_k. For linear systems J=A.
Current implementations calculate A and J as functions of state. Control input
can be added trivially by adding a control parameter to predict() and calc_tranfer_update(),
and using it during calculation of A and J
"""
class FastEKF1D(EKF):
"""Fast version of EKF for 1D problems with scalar readings."""
def __init__(self, dt, var_init, Q):
super(FastEKF1D, self).__init__(False)
self.state = [0, 0]
self.covar = [var_init, var_init, 0]
# Process Noise
self.dtQ0 = dt * Q[0]
self.dtQ1 = dt * Q[1]
def update(self, reading):
raise NotImplementedError
def update_scalar(self, reading):
# TODO(mgraczyk): Delete this for speed.
# assert np.all(reading.obs_model == [1, 0])
rcov = reading.covar[0, 0]
x = self.state
S = self.covar
innovation = reading.data - x[0]
innovation_covar = S[0] + rcov
k0 = S[0] / innovation_covar
k1 = S[2] / innovation_covar
x[0] += k0 * innovation
x[1] += k1 * innovation
mk = 1 - k0
S[1] += k1 * (k1 * (S[0] + rcov) - 2 * S[2])
S[2] = mk * (S[2] - k1 * S[0]) + rcov * k0 * k1
S[0] = mk * mk * S[0] + rcov * k0 * k0
def predict(self, dt):
# State update
x = self.state
x[0] += dt * x[1]
# P = J*P*J' + Q
S = self.covar
S[0] += dt * (2 * S[2] + dt * S[1]) + self.dtQ0
S[2] += dt * S[1]
S[1] += self.dtQ1
# Clip covariance to avoid explosions
S = max(-1e10, min(S, 1e10))
def calc_transfer_fun(self, dt):
tf = np.identity(2)
tf[0, 1] = dt
tfj = tf
return tf, tfj