openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

332 lines
12 KiB

4 months ago
#!/usr/bin/env python3
2 months ago
import os
4 months ago
import numpy as np
from collections import deque
from functools import partial
4 months ago
import cereal.messaging as messaging
2 months ago
from cereal import car, log
2 months ago
from cereal.services import SERVICE_LIST
4 months ago
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process
from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose
4 months ago
BLOCK_SIZE = 100
2 months ago
BLOCK_NUM = 50
BLOCK_NUM_NEEDED = 5
2 months ago
MOVING_WINDOW_SEC = 300.0
MIN_OKAY_WINDOW_SEC = 30.0
MIN_RECOVERY_BUFFER_SEC = 2.0
2 months ago
MIN_VEGO = 15.0
MIN_ABS_YAW_RATE = np.radians(1.0)
MIN_NCC = 0.95
def parabolic_peak_interp(R, max_index):
if max_index == 0 or max_index == len(R) - 1:
return max_index
y_m1, y_0, y_p1 = R[max_index - 1], R[max_index], R[max_index + 1]
offset = 0.5 * (y_p1 - y_m1) / (2 * y_0 - y_p1 - y_m1)
return max_index + offset
def masked_normalized_cross_correlation(expected_sig, actual_sig, mask):
"""
References:
D. Padfield. "Masked FFT registration". In Proc. Computer Vision and
Pattern Recognition, pp. 2918-2925 (2010).
:DOI:`10.1109/CVPR.2010.5540032`
"""
eps = np.finfo(np.float64).eps
expected_sig = np.asarray(expected_sig, dtype=np.float64)
actual_sig = np.asarray(actual_sig, dtype=np.float64)
expected_sig[~mask] = 0.0
actual_sig[~mask] = 0.0
rotated_expected_sig = expected_sig[::-1]
rotated_mask = mask[::-1]
n = len(expected_sig) + len(actual_sig) - 1
fft = partial(np.fft.fft, n=n)
actual_sig_fft = fft(actual_sig)
rotated_expected_sig_fft = fft(rotated_expected_sig)
actual_mask_fft = fft(mask.astype(np.float64))
rotated_mask_fft = fft(rotated_mask.astype(np.float64))
number_overlap_masked_samples = np.fft.ifft(rotated_mask_fft * actual_mask_fft).real
number_overlap_masked_samples[:] = np.round(number_overlap_masked_samples)
number_overlap_masked_samples[:] = np.fmax(number_overlap_masked_samples, eps)
masked_correlated_actual_fft = np.fft.ifft(rotated_mask_fft * actual_sig_fft).real
masked_correlated_expected_fft = np.fft.ifft(actual_mask_fft * rotated_expected_sig_fft).real
numerator = np.fft.ifft(rotated_expected_sig_fft * actual_sig_fft).real
numerator -= masked_correlated_actual_fft * masked_correlated_expected_fft / number_overlap_masked_samples
actual_squared_fft = fft(actual_sig ** 2)
actual_sig_denom = np.fft.ifft(rotated_mask_fft * actual_squared_fft).real
actual_sig_denom -= masked_correlated_actual_fft ** 2 / number_overlap_masked_samples
actual_sig_denom[:] = np.fmax(actual_sig_denom, 0.0)
rotated_expected_squared_fft = fft(rotated_expected_sig ** 2)
expected_sig_denom = np.fft.ifft(actual_mask_fft * rotated_expected_squared_fft).real
expected_sig_denom -= masked_correlated_expected_fft ** 2 / number_overlap_masked_samples
expected_sig_denom[:] = np.fmax(expected_sig_denom, 0.0)
denom = np.sqrt(actual_sig_denom * expected_sig_denom)
# zero-out samples with very small denominators
tol = 1e3 * eps * np.max(np.abs(denom), keepdims=True)
nonzero_indices = denom > tol
ncc = np.zeros_like(denom, dtype=np.float64)
ncc[nonzero_indices] = numerator[nonzero_indices] / denom[nonzero_indices]
np.clip(ncc, -1, 1, out=ncc)
return ncc
2 months ago
class Points:
def __init__(self, num_points):
self.times = deque(maxlen=num_points)
self.okay = deque(maxlen=num_points)
self.desired = deque(maxlen=num_points)
self.actual = deque(maxlen=num_points)
2 months ago
@property
def num_points(self):
return len(self.desired)
4 months ago
2 months ago
@property
def num_okay(self):
return np.count_nonzero(self.okay)
4 months ago
2 months ago
def update(self, t, desired, actual, okay):
self.times.append(t)
self.okay.append(okay)
self.desired.append(desired)
self.actual.append(actual)
def get(self):
return np.array(self.times), np.array(self.desired), np.array(self.actual), np.array(self.okay)
class BlockAverage:
2 months ago
def __init__(self, num_blocks, block_size, valid_blocks, initial_value):
2 months ago
self.num_blocks = num_blocks
self.block_size = block_size
self.block_idx = 0
self.idx = 0
self.values = np.tile(initial_value, (num_blocks, 1))
2 months ago
self.valid_blocks = valid_blocks
2 months ago
def update(self, value):
self.values[self.block_idx] = (self.idx * self.values[self.block_idx] + (self.block_size - self.idx) * value) / self.block_size
self.idx = (self.idx + 1) % self.block_size
if self.idx == 0:
self.block_idx = (self.block_idx + 1) % self.num_blocks
self.valid_blocks = min(self.valid_blocks + 1, self.num_blocks)
def get(self):
valid_block_idx = [i for i in range(self.valid_blocks) if i != self.block_idx]
if not valid_block_idx:
return None
return float(np.mean(self.values[valid_block_idx], axis=0).item())
2 months ago
class LagEstimator:
2 months ago
def __init__(self, CP, dt,
block_count=BLOCK_NUM, min_valid_block_count=BLOCK_NUM_NEEDED, block_size=BLOCK_SIZE,
window_sec=MOVING_WINDOW_SEC, okay_window_sec=MIN_OKAY_WINDOW_SEC, min_recovery_buffer_sec=MIN_RECOVERY_BUFFER_SEC,
2 months ago
min_vego=MIN_VEGO, min_yr=MIN_ABS_YAW_RATE, min_ncc=MIN_NCC):
4 months ago
self.dt = dt
2 months ago
self.window_sec = window_sec
self.okay_window_sec = okay_window_sec
self.min_recovery_buffer_sec = min_recovery_buffer_sec
2 months ago
self.initial_lag = CP.steerActuatorDelay + 0.2
self.block_size = block_size
self.block_count = block_count
self.min_valid_block_count = min_valid_block_count
2 months ago
self.min_vego = min_vego
self.min_yr = min_yr
self.min_ncc = min_ncc
self.t = 0
self.lat_active = False
self.steering_pressed = False
self.steering_saturated = False
2 months ago
self.desired_curvature = 0
self.v_ego = 0
self.yaw_rate = 0
4 months ago
self.last_lat_inactive_t = 0
self.last_steering_pressed_t = 0
self.last_steering_saturated_t = 0
self.last_estimate_t = 0
self.calibrator = PoseCalibrator()
2 months ago
self.reset(self.initial_lag, 0)
def reset(self, initial_lag, valid_blocks):
window_len = int(self.window_sec / self.dt)
self.points = Points(window_len)
self.block_avg = BlockAverage(self.block_count, self.block_size, valid_blocks, initial_lag)
2 months ago
def get_msg(self, valid, debug=False):
msg = messaging.new_message('liveDelay')
4 months ago
2 months ago
msg.valid = valid
4 months ago
liveDelay = msg.liveDelay
estimated_lag = self.block_avg.get()
liveDelay.lateralDelayEstimate = estimated_lag or self.initial_lag
if self.block_avg.valid_blocks >= self.min_valid_block_count and estimated_lag is not None:
liveDelay.status = log.LiveDelayData.Status.estimated
liveDelay.lateralDelay = estimated_lag
else:
liveDelay.status = log.LiveDelayData.Status.initial
liveDelay.lateralDelay = self.initial_lag
2 months ago
liveDelay.validBlocks = self.block_avg.valid_blocks
# TODO only in debug
2 months ago
# if debug:
liveDelay.points = self.block_avg.values.flatten().tolist()
2 months ago
return msg
def handle_log(self, t, which, msg):
4 months ago
if which == "carControl":
2 months ago
self.lat_active = msg.latActive
4 months ago
elif which == "carState":
2 months ago
self.steering_pressed = msg.steeringPressed
self.v_ego = msg.vEgo
4 months ago
elif which == "controlsState":
self.steering_saturated = getattr(msg.lateralControlState, msg.lateralControlState.which()).saturated
self.desired_curvature = msg.desiredCurvature
2 months ago
elif which == "liveCalibration":
self.calibrator.feed_live_calib(msg)
elif which == "livePose":
device_pose = Pose.from_live_pose(msg)
calibrated_pose = self.calibrator.build_calibrated_pose(device_pose)
2 months ago
self.yaw_rate = calibrated_pose.angular_velocity.z
self.t = t
2 months ago
def points_valid(self):
2 months ago
return self.points.num_okay >= int(self.okay_window_sec / self.dt)
4 months ago
2 months ago
def update_points(self):
if not self.lat_active:
self.last_lat_inactive_t = self.t
if self.steering_pressed:
self.last_steering_pressed_t = self.t
if self.steering_saturated:
self.last_steering_saturated_t = self.t
2 months ago
la_desired = self.desired_curvature * self.v_ego * self.v_ego
la_actual_pose = self.yaw_rate * self.v_ego
4 months ago
fast = self.v_ego > self.min_vego
turning = np.abs(self.yaw_rate) >= self.min_yr
2 months ago
has_recovered = all( # wait for recovery after !lat_active, steering_pressed, steering_saturated
self.t - last_t >= self.min_recovery_buffer_sec
for last_t in [self.last_lat_inactive_t, self.last_steering_pressed_t, self.last_steering_saturated_t]
2 months ago
)
okay = self.lat_active and not self.steering_pressed and not self.steering_saturated and fast and turning and has_recovered
2 months ago
self.points.update(self.t, la_desired, la_actual_pose, okay)
def update_estimate(self):
# check if the points are valid overall
if not self.points_valid():
2 months ago
return
4 months ago
times, desired, actual, okay = self.points.get()
# check if there are any new valid data points since the last update
2 months ago
if self.last_estimate_t != 0:
new_values_start_idx = next(-i for i, t in enumerate(reversed(times)) if t <= self.last_estimate_t)
if (new_values_start_idx == 0 or not np.any(okay[new_values_start_idx:])):
return
2 months ago
delay, corr = self.actuator_delay(desired, actual, okay, self.dt)
2 months ago
if corr < self.min_ncc:
return
2 months ago
self.block_avg.update(delay)
self.last_estimate_t = self.t
4 months ago
def correlation_lags(self, sig_len, dt):
return np.arange(0, sig_len) * dt
def actuator_delay(self, expected_sig, actual_sig, mask, dt, max_lag=1.):
ncc = masked_normalized_cross_correlation(expected_sig, actual_sig, mask)
# only consider lags from 0 to max_lag
max_lag_samples = int(max_lag / dt)
roi_ncc = ncc[len(expected_sig) - 1: len(expected_sig) - 1 + max_lag_samples]
max_corr_index = np.argmax(roi_ncc)
corr = roi_ncc[max_corr_index]
lag = parabolic_peak_interp(roi_ncc, max_corr_index) * dt
return lag, corr
4 months ago
def main():
config_realtime_process([0, 1, 2, 3], 5)
2 months ago
DEBUG = bool(int(os.getenv("DEBUG", "0")))
pm = messaging.PubMaster(['liveDelay', 'alertDebug'])
2 months ago
sm = messaging.SubMaster(['livePose', 'liveCalibration', 'carControl', 'carState', 'controlsState'], poll='livePose')
4 months ago
params = Params()
CP = messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams)
2 months ago
estimator = LagEstimator(CP, 1. / SERVICE_LIST['livePose'].frequency)
4 months ago
2 months ago
lag_params = params.get("LiveLag")
2 months ago
if lag_params:
try:
with log.Event.from_bytes(lag_params) as msg:
lag_init = msg.liveDelay.lateralDelayEstimate
valid_blocks = msg.liveDelay.validBlocks
2 months ago
estimator.reset(lag_init, valid_blocks)
except Exception:
print("Error reading cached LagParams")
4 months ago
while True:
sm.update()
if sm.all_checks():
3 months ago
for which in sorted(sm.updated.keys(), key=lambda x: sm.logMonoTime[x]):
4 months ago
if sm.updated[which]:
t = sm.logMonoTime[which] * 1e-9
estimator.handle_log(t, which, sm[which])
2 months ago
estimator.update_points()
4 months ago
# 4Hz driven by livePose
if sm.frame % 5 == 0:
estimator.update_estimate()
2 months ago
msg = estimator.get_msg(sm.all_checks(), DEBUG)
# TODO remove
4 months ago
alert_msg = messaging.new_message('alertDebug')
4 months ago
alert_msg.alertDebug.alertText1 = f"Lag estimate (fixed: {CP.steerActuatorDelay:.2f} s)"
progress = int(min((estimator.block_avg.block_idx * estimator.block_avg.block_size + estimator.block_avg.idx) / (estimator.min_valid_block_count * estimator.block_avg.block_size), 1.0) * 100)
alert_msg.alertDebug.alertText2 = f"{msg.liveDelay.lateralDelayEstimate:.2f} s ({msg.liveDelay.status == 'estimated'}, blocks: {msg.liveDelay.validBlocks}, progress: {progress}%)"
4 months ago
pm.send('alertDebug', alert_msg)
msg_dat = msg.to_bytes()
# cache every 60 seconds
if sm.frame % 240 == 0:
params.put_nonblocking("LiveLag", msg_dat)
pm.send('liveDelay', msg_dat)
2 months ago
4 months ago
if __name__ == "__main__":
main()