Kalman filter to identify vehicle parameters (#1123)
* full vehicle model simulator * Add vehicle model * Model compiles * Close enough * Simulation works * Add fast angle offset * Tune fast angle offset learner * Create live service for paramsd * Better clamping * Fix rotation matrix * Cleanup before merge * move debug script to debug/internal * revert plannerd change * switch vehicle model to corolla * fix valid flag * Bigger stiffness range * Lower process noise on steer ratio * Tuning * Decimation * No maha testspull/1163/head
parent
63b52b60cb
commit
c9ecab2139
7 changed files with 495 additions and 24 deletions
@ -0,0 +1,129 @@ |
||||
#!/usr/bin/env python3 |
||||
import numpy as np |
||||
import math |
||||
from tqdm import tqdm |
||||
|
||||
import seaborn as sns |
||||
import matplotlib.pyplot as plt |
||||
|
||||
|
||||
from selfdrive.car.honda.interface import CarInterface |
||||
from selfdrive.car.honda.values import CAR |
||||
from selfdrive.controls.lib.vehicle_model import VehicleModel, create_dyn_state_matrices |
||||
from selfdrive.locationd.kalman.models.car_kf import CarKalman, ObservationKind, States |
||||
|
||||
T_SIM = 5 * 60 # s |
||||
DT = 0.01 |
||||
|
||||
|
||||
CP = CarInterface.get_params(CAR.CIVIC) |
||||
VM = VehicleModel(CP) |
||||
|
||||
x, y = 0, 0 # m, m |
||||
psi = math.radians(0) # rad |
||||
|
||||
# The state is x = [v, r]^T |
||||
# with v lateral speed [m/s], and r rotational speed [rad/s] |
||||
state = np.array([[0.0], [0.0]]) |
||||
|
||||
|
||||
ts = np.arange(0, T_SIM, DT) |
||||
speeds = 10 * np.sin(2 * np.pi * ts / 200.) + 25 |
||||
|
||||
angle_offsets = math.radians(1.0) * np.ones_like(ts) |
||||
angle_offsets[ts > 60] = 0 |
||||
steering_angles = np.radians(5 * np.cos(2 * np.pi * ts / 100.)) |
||||
|
||||
xs = [] |
||||
ys = [] |
||||
psis = [] |
||||
yaw_rates = [] |
||||
speed_ys = [] |
||||
|
||||
|
||||
kf_states = [] |
||||
kf_ps = [] |
||||
|
||||
kf = CarKalman() |
||||
|
||||
for i, t in tqdm(list(enumerate(ts))): |
||||
u = speeds[i] |
||||
sa = steering_angles[i] |
||||
ao = angle_offsets[i] |
||||
|
||||
A, B = create_dyn_state_matrices(u, VM) |
||||
|
||||
state += DT * (A.dot(state) + B.dot(sa + ao)) |
||||
|
||||
x += u * math.cos(psi) * DT |
||||
y += (float(state[0]) * math.sin(psi) + u * math.sin(psi)) * DT |
||||
psi += float(state[1]) * DT |
||||
|
||||
kf.predict_and_observe(t, ObservationKind.CAL_DEVICE_FRAME_YAW_RATE, [float(state[1])]) |
||||
kf.predict_and_observe(t, ObservationKind.CAL_DEVICE_FRAME_XY_SPEED, [[u, float(state[0])]]) |
||||
kf.predict_and_observe(t, ObservationKind.STEER_ANGLE, [sa]) |
||||
kf.predict_and_observe(t, ObservationKind.ANGLE_OFFSET_FAST, [0]) |
||||
kf.predict(t) |
||||
|
||||
speed_ys.append(float(state[0])) |
||||
yaw_rates.append(float(state[1])) |
||||
kf_states.append(kf.x.copy()) |
||||
kf_ps.append(kf.P.copy()) |
||||
|
||||
xs.append(x) |
||||
ys.append(y) |
||||
psis.append(psi) |
||||
|
||||
|
||||
xs = np.asarray(xs) |
||||
ys = np.asarray(ys) |
||||
psis = np.asarray(psis) |
||||
speed_ys = np.asarray(speed_ys) |
||||
kf_states = np.asarray(kf_states) |
||||
kf_ps = np.asarray(kf_ps) |
||||
|
||||
|
||||
palette = sns.color_palette() |
||||
|
||||
def plot_with_bands(ts, state, label, ax, idx=1, converter=None): |
||||
mean = kf_states[:, state].flatten() |
||||
stds = np.sqrt(kf_ps[:, state, state].flatten()) |
||||
|
||||
if converter is not None: |
||||
mean = converter(mean) |
||||
stds = converter(stds) |
||||
|
||||
sns.lineplot(ts, mean, label=label, ax=ax) |
||||
ax.fill_between(ts, mean - stds, mean + stds, alpha=.2, color=palette[idx]) |
||||
|
||||
|
||||
print(kf.x) |
||||
|
||||
sns.set_context("paper") |
||||
f, axes = plt.subplots(6, 1) |
||||
|
||||
sns.lineplot(ts, np.degrees(steering_angles), label='Steering Angle [deg]', ax=axes[0]) |
||||
plot_with_bands(ts, States.STEER_ANGLE, 'Steering Angle kf [deg]', axes[0], converter=np.degrees) |
||||
|
||||
sns.lineplot(ts, np.degrees(yaw_rates), label='Yaw Rate [deg]', ax=axes[1]) |
||||
plot_with_bands(ts, States.YAW_RATE, 'Yaw Rate kf [deg]', axes[1], converter=np.degrees) |
||||
|
||||
sns.lineplot(ts, np.ones_like(ts) * VM.sR, label='Steer ratio [-]', ax=axes[2]) |
||||
plot_with_bands(ts, States.STEER_RATIO, 'Steer ratio kf [-]', axes[2]) |
||||
axes[2].set_ylim([10, 20]) |
||||
|
||||
|
||||
sns.lineplot(ts, np.ones_like(ts), label='Tire stiffness[-]', ax=axes[3]) |
||||
plot_with_bands(ts, States.STIFFNESS, 'Tire stiffness kf [-]', axes[3]) |
||||
axes[3].set_ylim([0.8, 1.2]) |
||||
|
||||
|
||||
sns.lineplot(ts, np.degrees(angle_offsets), label='Angle offset [deg]', ax=axes[4]) |
||||
plot_with_bands(ts, States.ANGLE_OFFSET, 'Angle offset kf deg', axes[4], converter=np.degrees) |
||||
plot_with_bands(ts, States.ANGLE_OFFSET_FAST, 'Fast Angle offset kf deg', axes[4], converter=np.degrees, idx=2) |
||||
|
||||
axes[4].set_ylim([-2, 2]) |
||||
|
||||
sns.lineplot(ts, speeds, ax=axes[5]) |
||||
|
||||
plt.show() |
@ -0,0 +1,198 @@ |
||||
#!/usr/bin/env python3 |
||||
|
||||
import math |
||||
import numpy as np |
||||
import sympy as sp |
||||
|
||||
from selfdrive.locationd.kalman.helpers import ObservationKind |
||||
from selfdrive.locationd.kalman.helpers.ekf_sym import EKF_sym, gen_code |
||||
|
||||
i = 0 |
||||
|
||||
|
||||
def _slice(n): |
||||
global i |
||||
s = slice(i, i + n) |
||||
i += n |
||||
|
||||
return s |
||||
|
||||
|
||||
class States(): |
||||
# Vehicle model params |
||||
STIFFNESS = _slice(1) # [-] |
||||
STEER_RATIO = _slice(1) # [-] |
||||
ANGLE_OFFSET = _slice(1) # [rad] |
||||
ANGLE_OFFSET_FAST = _slice(1) # [rad] |
||||
|
||||
VELOCITY = _slice(2) # (x, y) [m/s] |
||||
YAW_RATE = _slice(1) # [rad/s] |
||||
STEER_ANGLE = _slice(1) # [rad] |
||||
|
||||
|
||||
class CarKalman(): |
||||
name = 'car' |
||||
|
||||
x_initial = np.array([ |
||||
1.0, |
||||
15.0, |
||||
0.0, |
||||
0.0, |
||||
|
||||
10.0, 0.0, |
||||
0.0, |
||||
0.0, |
||||
]) |
||||
|
||||
# state covariance |
||||
P_initial = np.diag([ |
||||
.1**2, |
||||
.1**2, |
||||
math.radians(0.1)**2, |
||||
math.radians(0.1)**2, |
||||
|
||||
10**2, 10**2, |
||||
1**2, |
||||
1**2, |
||||
]) |
||||
|
||||
# process noise |
||||
Q = np.diag([ |
||||
(.05/10)**2, |
||||
.0001**2, |
||||
math.radians(0.01)**2, |
||||
math.radians(0.2)**2, |
||||
|
||||
.1**2, .1**2, |
||||
math.radians(0.1)**2, |
||||
math.radians(0.1)**2, |
||||
]) |
||||
|
||||
obs_noise = { |
||||
ObservationKind.CAL_DEVICE_FRAME_XY_SPEED: np.diag([0.1**2, 0.1**2]), |
||||
ObservationKind.CAL_DEVICE_FRAME_YAW_RATE: np.atleast_2d(math.radians(0.1)**2), |
||||
ObservationKind.STEER_ANGLE: np.atleast_2d(math.radians(0.1)**2), |
||||
ObservationKind.ANGLE_OFFSET_FAST: np.atleast_2d(math.radians(5.0)**2), |
||||
ObservationKind.STEER_RATIO: np.atleast_2d(50.0**2), |
||||
ObservationKind.STIFFNESS: np.atleast_2d(50.0**2), |
||||
} |
||||
|
||||
maha_test_kinds = [] # [ObservationKind.CAL_DEVICE_FRAME_YAW_RATE, ObservationKind.CAL_DEVICE_FRAME_XY_SPEED] |
||||
|
||||
@staticmethod |
||||
def generate_code(): |
||||
dim_state = CarKalman.x_initial.shape[0] |
||||
name = CarKalman.name |
||||
maha_test_kinds = CarKalman.maha_test_kinds |
||||
|
||||
# make functions and jacobians with sympy |
||||
# state variables |
||||
state_sym = sp.MatrixSymbol('state', dim_state, 1) |
||||
state = sp.Matrix(state_sym) |
||||
|
||||
# Vehicle model constants |
||||
# TODO: Read from car params at runtime |
||||
from selfdrive.controls.lib.vehicle_model import VehicleModel |
||||
from selfdrive.car.toyota.interface import CarInterface |
||||
from selfdrive.car.toyota.values import CAR |
||||
|
||||
CP = CarInterface.get_params(CAR.COROLLA_TSS2) |
||||
VM = VehicleModel(CP) |
||||
|
||||
m = VM.m |
||||
j = VM.j |
||||
aF = VM.aF |
||||
aR = VM.aR |
||||
|
||||
x = state[States.STIFFNESS, :][0, 0] |
||||
|
||||
cF, cR = x * VM.cF, x * VM.cR |
||||
angle_offset = state[States.ANGLE_OFFSET, :][0, 0] |
||||
angle_offset_fast = state[States.ANGLE_OFFSET_FAST, :][0, 0] |
||||
sa = state[States.STEER_ANGLE, :][0, 0] |
||||
|
||||
sR = state[States.STEER_RATIO, :][0, 0] |
||||
u, v = state[States.VELOCITY, :] |
||||
r = state[States.YAW_RATE, :][0, 0] |
||||
|
||||
A = sp.Matrix(np.zeros((2, 2))) |
||||
A[0, 0] = -(cF + cR) / (m * u) |
||||
A[0, 1] = -(cF * aF - cR * aR) / (m * u) - u |
||||
A[1, 0] = -(cF * aF - cR * aR) / (j * u) |
||||
A[1, 1] = -(cF * aF**2 + cR * aR**2) / (j * u) |
||||
|
||||
B = sp.Matrix(np.zeros((2, 1))) |
||||
B[0, 0] = cF / m / sR |
||||
B[1, 0] = (cF * aF) / j / sR |
||||
|
||||
x = sp.Matrix([v, r]) # lateral velocity, yaw rate |
||||
x_dot = A * x + B * (sa - angle_offset - angle_offset_fast) |
||||
|
||||
dt = sp.Symbol('dt') |
||||
state_dot = sp.Matrix(np.zeros((dim_state, 1))) |
||||
state_dot[States.VELOCITY.start + 1, 0] = x_dot[0] |
||||
state_dot[States.YAW_RATE.start, 0] = x_dot[1] |
||||
|
||||
# Basic descretization, 1st order integrator |
||||
# Can be pretty bad if dt is big |
||||
f_sym = state + dt * state_dot |
||||
|
||||
# |
||||
# Observation functions |
||||
# |
||||
obs_eqs = [ |
||||
[sp.Matrix([r]), ObservationKind.CAL_DEVICE_FRAME_YAW_RATE, None], |
||||
[sp.Matrix([u, v]), ObservationKind.CAL_DEVICE_FRAME_XY_SPEED, None], |
||||
[sp.Matrix([sa]), ObservationKind.STEER_ANGLE, None], |
||||
[sp.Matrix([angle_offset_fast]), ObservationKind.ANGLE_OFFSET_FAST, None], |
||||
[sp.Matrix([sR]), ObservationKind.STEER_RATIO, None], |
||||
[sp.Matrix([x]), ObservationKind.STIFFNESS, None], |
||||
] |
||||
|
||||
gen_code(name, f_sym, dt, state_sym, obs_eqs, dim_state, dim_state, maha_test_kinds=maha_test_kinds) |
||||
|
||||
def __init__(self): |
||||
self.dim_state = self.x_initial.shape[0] |
||||
|
||||
# init filter |
||||
self.filter = EKF_sym(self.name, self.Q, self.x_initial, self.P_initial, self.dim_state, self.dim_state, maha_test_kinds=self.maha_test_kinds) |
||||
|
||||
@property |
||||
def x(self): |
||||
return self.filter.state() |
||||
|
||||
@property |
||||
def P(self): |
||||
return self.filter.covs() |
||||
|
||||
def predict(self, t): |
||||
return self.filter.predict(t) |
||||
|
||||
def rts_smooth(self, estimates): |
||||
return self.filter.rts_smooth(estimates, norm_quats=False) |
||||
|
||||
def get_R(self, kind, n): |
||||
obs_noise = self.obs_noise[kind] |
||||
dim = obs_noise.shape[0] |
||||
R = np.zeros((n, dim, dim)) |
||||
for i in range(n): |
||||
R[i, :, :] = obs_noise |
||||
return R |
||||
|
||||
def init_state(self, state, covs_diag=None, covs=None, filter_time=None): |
||||
if covs_diag is not None: |
||||
P = np.diag(covs_diag) |
||||
elif covs is not None: |
||||
P = covs |
||||
else: |
||||
P = self.filter.covs() |
||||
self.filter.init_state(state, P, filter_time) |
||||
|
||||
def predict_and_observe(self, t, kind, data): |
||||
if len(data) > 0: |
||||
data = np.atleast_2d(data) |
||||
self.filter.predict_and_update_batch(t, kind, data, self.get_R(kind, len(data))) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
CarKalman.generate_code() |
@ -0,0 +1,109 @@ |
||||
#!/usr/bin/env python3 |
||||
import math |
||||
|
||||
import cereal.messaging as messaging |
||||
import common.transformations.orientation as orient |
||||
from selfdrive.locationd.kalman.models.car_kf import CarKalman, ObservationKind, States |
||||
|
||||
CARSTATE_DECIMATION = 5 |
||||
|
||||
|
||||
class ParamsLearner: |
||||
def __init__(self): |
||||
self.kf = CarKalman() |
||||
self.active = False |
||||
|
||||
self.speed = 0 |
||||
self.steering_pressed = False |
||||
self.steering_angle = 0 |
||||
self.carstate_counter = 0 |
||||
|
||||
def update_active(self): |
||||
self.active = (abs(self.steering_angle) < 45 or not self.steering_pressed) and self.speed > 5 |
||||
|
||||
def handle_log(self, t, which, msg): |
||||
if which == 'liveLocation': |
||||
roll, pitch, yaw = math.radians(msg.roll), math.radians(msg.pitch), math.radians(msg.heading) |
||||
v_device = orient.rot_from_euler([roll, pitch, yaw]).T.dot(msg.vNED) |
||||
self.speed = v_device[0] |
||||
|
||||
self.update_active() |
||||
if self.active: |
||||
self.kf.predict_and_observe(t, ObservationKind.CAL_DEVICE_FRAME_YAW_RATE, [-msg.gyro[2]]) |
||||
self.kf.predict_and_observe(t, ObservationKind.CAL_DEVICE_FRAME_XY_SPEED, [[v_device[0], -v_device[1]]]) |
||||
|
||||
# Clamp values |
||||
x = self.kf.x |
||||
if not (10 < x[States.STEER_RATIO] < 25): |
||||
self.kf.predict_and_observe(t, ObservationKind.STEER_RATIO, [15.0]) |
||||
|
||||
if not (0.5 < x[States.STIFFNESS] < 3.0): |
||||
self.kf.predict_and_observe(t, ObservationKind.STIFFNESS, [1.0]) |
||||
|
||||
else: |
||||
self.kf.filter.filter_time = t - 0.1 |
||||
|
||||
elif which == 'carState': |
||||
self.carstate_counter += 1 |
||||
if self.carstate_counter % CARSTATE_DECIMATION == 0: |
||||
self.steering_angle = msg.steeringAngle |
||||
self.steering_pressed = msg.steeringPressed |
||||
|
||||
self.update_active() |
||||
if self.active: |
||||
self.kf.predict_and_observe(t, ObservationKind.STEER_ANGLE, [math.radians(msg.steeringAngle)]) |
||||
self.kf.predict_and_observe(t, ObservationKind.ANGLE_OFFSET_FAST, [0]) |
||||
else: |
||||
self.kf.filter.filter_time = t - 0.1 |
||||
|
||||
|
||||
def main(sm=None, pm=None): |
||||
if sm is None: |
||||
sm = messaging.SubMaster(['liveLocation', 'carState']) |
||||
if pm is None: |
||||
pm = messaging.PubMaster(['liveParameters']) |
||||
|
||||
learner = ParamsLearner() |
||||
|
||||
while True: |
||||
sm.update() |
||||
|
||||
for which, updated in sm.updated.items(): |
||||
if not updated: |
||||
continue |
||||
t = sm.logMonoTime[which] * 1e-9 |
||||
learner.handle_log(t, which, sm[which]) |
||||
|
||||
# TODO: set valid to false when locationd stops sending |
||||
# TODO: make sure controlsd knows when there is no gyro |
||||
# TODO: move posenetValid somewhere else to show the model uncertainty alert |
||||
# TODO: Save and resume values from param |
||||
# TODO: Change KF to allow mass, etc to be inputs in predict step |
||||
|
||||
if sm.updated['carState']: |
||||
msg = messaging.new_message() |
||||
msg.logMonoTime = sm.logMonoTime['carState'] |
||||
|
||||
msg.init('liveParameters') |
||||
msg.liveParameters.valid = True # TODO: Check if learned values are sane |
||||
msg.liveParameters.posenetValid = True |
||||
msg.liveParameters.sensorValid = True |
||||
|
||||
x = learner.kf.x |
||||
msg.liveParameters.steerRatio = float(x[States.STEER_RATIO]) |
||||
msg.liveParameters.stiffnessFactor = float(x[States.STIFFNESS]) |
||||
msg.liveParameters.angleOffsetAverage = math.degrees(x[States.ANGLE_OFFSET]) |
||||
msg.liveParameters.angleOffset = math.degrees(x[States.ANGLE_OFFSET_FAST]) |
||||
|
||||
# P = learner.kf.P |
||||
# print() |
||||
# print("sR", float(x[States.STEER_RATIO]), float(P[States.STEER_RATIO, States.STEER_RATIO])**0.5) |
||||
# print("x ", float(x[States.STIFFNESS]), float(P[States.STIFFNESS, States.STIFFNESS])**0.5) |
||||
# print("ao avg ", math.degrees(x[States.ANGLE_OFFSET]), math.degrees(P[States.ANGLE_OFFSET, States.ANGLE_OFFSET])**0.5) |
||||
# print("ao ", math.degrees(x[States.ANGLE_OFFSET_FAST]), math.degrees(P[States.ANGLE_OFFSET_FAST, States.ANGLE_OFFSET_FAST])**0.5) |
||||
|
||||
pm.send('liveParameters', msg) |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
main() |
Loading…
Reference in new issue