|
|
|
@ -1,9 +1,10 @@ |
|
|
|
|
import numpy as np |
|
|
|
|
import capnp |
|
|
|
|
from collections import deque |
|
|
|
|
from functools import partial, cache |
|
|
|
|
|
|
|
|
|
import cereal.messaging as messaging |
|
|
|
|
from cereal import log |
|
|
|
|
from cereal import log, car |
|
|
|
|
from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose |
|
|
|
|
|
|
|
|
|
BLOCK_SIZE = 100 |
|
|
|
@ -18,7 +19,7 @@ MIN_NCC = 0.95 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@cache |
|
|
|
|
def fft_next_good_size(n): |
|
|
|
|
def fft_next_good_size(n: int) -> int: |
|
|
|
|
""" |
|
|
|
|
smallest composite of 2, 3, 5, 7, 11 that is >= n |
|
|
|
|
inspired by pocketfft |
|
|
|
@ -54,7 +55,7 @@ def parabolic_peak_interp(R, max_index): |
|
|
|
|
return max_index + offset |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def masked_normalized_cross_correlation(expected_sig, actual_sig, mask, n): |
|
|
|
|
def masked_normalized_cross_correlation(expected_sig: np.ndarray, actual_sig: np.ndarray, mask: np.ndarray, n: int): |
|
|
|
|
""" |
|
|
|
|
References: |
|
|
|
|
D. Padfield. "Masked FFT registration". In Proc. Computer Vision and |
|
|
|
@ -112,7 +113,7 @@ def masked_normalized_cross_correlation(expected_sig, actual_sig, mask, n): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Points: |
|
|
|
|
def __init__(self, num_points): |
|
|
|
|
def __init__(self, num_points: int): |
|
|
|
|
self.times = deque(maxlen=num_points) |
|
|
|
|
self.okay = deque(maxlen=num_points) |
|
|
|
|
self.desired = deque(maxlen=num_points) |
|
|
|
@ -126,18 +127,18 @@ class Points: |
|
|
|
|
def num_okay(self): |
|
|
|
|
return np.count_nonzero(self.okay) |
|
|
|
|
|
|
|
|
|
def update(self, t, desired, actual, okay): |
|
|
|
|
def update(self, t: float, desired: float, actual: float, okay: bool): |
|
|
|
|
self.times.append(t) |
|
|
|
|
self.okay.append(okay) |
|
|
|
|
self.desired.append(desired) |
|
|
|
|
self.actual.append(actual) |
|
|
|
|
|
|
|
|
|
def get(self): |
|
|
|
|
def get(self) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: |
|
|
|
|
return np.array(self.times), np.array(self.desired), np.array(self.actual), np.array(self.okay) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BlockAverage: |
|
|
|
|
def __init__(self, num_blocks, block_size, valid_blocks, initial_value): |
|
|
|
|
def __init__(self, num_blocks: int, block_size: int, valid_blocks: int, initial_value: float): |
|
|
|
|
self.num_blocks = num_blocks |
|
|
|
|
self.block_size = block_size |
|
|
|
|
self.block_idx = valid_blocks % block_size |
|
|
|
@ -146,14 +147,14 @@ class BlockAverage: |
|
|
|
|
self.values = np.tile(initial_value, (num_blocks, 1)) |
|
|
|
|
self.valid_blocks = valid_blocks |
|
|
|
|
|
|
|
|
|
def update(self, value): |
|
|
|
|
def update(self, value: float): |
|
|
|
|
self.values[self.block_idx] = (self.idx * self.values[self.block_idx] + (self.block_size - self.idx) * value) / self.block_size |
|
|
|
|
self.idx = (self.idx + 1) % self.block_size |
|
|
|
|
if self.idx == 0: |
|
|
|
|
self.block_idx = (self.block_idx + 1) % self.num_blocks |
|
|
|
|
self.valid_blocks = min(self.valid_blocks + 1, self.num_blocks) |
|
|
|
|
|
|
|
|
|
def get(self): |
|
|
|
|
def get(self) -> float | None: |
|
|
|
|
valid_block_idx = [i for i in range(self.valid_blocks) if i != self.block_idx] |
|
|
|
|
if not valid_block_idx: |
|
|
|
|
return None |
|
|
|
@ -163,10 +164,10 @@ class BlockAverage: |
|
|
|
|
class LateralLagEstimator: |
|
|
|
|
inputs = {"carControl", "carState", "controlsState", "liveCalibration", "livePose"} |
|
|
|
|
|
|
|
|
|
def __init__(self, CP, dt, |
|
|
|
|
block_count=BLOCK_NUM, min_valid_block_count=BLOCK_NUM_NEEDED, block_size=BLOCK_SIZE, |
|
|
|
|
window_sec=MOVING_WINDOW_SEC, okay_window_sec=MIN_OKAY_WINDOW_SEC, min_recovery_buffer_sec=MIN_RECOVERY_BUFFER_SEC, |
|
|
|
|
min_vego=MIN_VEGO, min_yr=MIN_ABS_YAW_RATE, min_ncc=MIN_NCC): |
|
|
|
|
def __init__(self, CP: car.CarParams, dt: float, |
|
|
|
|
block_count: int = BLOCK_NUM, min_valid_block_count: int = BLOCK_NUM_NEEDED, block_size: int = BLOCK_SIZE, |
|
|
|
|
window_sec: float = MOVING_WINDOW_SEC, okay_window_sec: float = MIN_OKAY_WINDOW_SEC, min_recovery_buffer_sec: float = MIN_RECOVERY_BUFFER_SEC, |
|
|
|
|
min_vego: float = MIN_VEGO, min_yr: float = MIN_ABS_YAW_RATE, min_ncc: float = MIN_NCC): |
|
|
|
|
self.dt = dt |
|
|
|
|
self.window_sec = window_sec |
|
|
|
|
self.okay_window_sec = okay_window_sec |
|
|
|
@ -196,12 +197,12 @@ class LateralLagEstimator: |
|
|
|
|
|
|
|
|
|
self.reset(self.initial_lag, 0) |
|
|
|
|
|
|
|
|
|
def reset(self, initial_lag, valid_blocks): |
|
|
|
|
def reset(self, initial_lag: float, valid_blocks: int): |
|
|
|
|
window_len = int(self.window_sec / self.dt) |
|
|
|
|
self.points = Points(window_len) |
|
|
|
|
self.block_avg = BlockAverage(self.block_count, self.block_size, valid_blocks, initial_lag) |
|
|
|
|
|
|
|
|
|
def get_msg(self, valid, debug=False): |
|
|
|
|
def get_msg(self, valid: bool, debug: bool = False) -> capnp._DynamicStructBuilder: |
|
|
|
|
msg = messaging.new_message('liveDelay') |
|
|
|
|
|
|
|
|
|
msg.valid = valid |
|
|
|
@ -222,7 +223,7 @@ class LateralLagEstimator: |
|
|
|
|
|
|
|
|
|
return msg |
|
|
|
|
|
|
|
|
|
def handle_log(self, t, which, msg): |
|
|
|
|
def handle_log(self, t: float, which: str, msg: capnp._DynamicStructReader): |
|
|
|
|
if which == "carControl": |
|
|
|
|
self.lat_active = msg.latActive |
|
|
|
|
elif which == "carState": |
|
|
|
@ -282,10 +283,7 @@ class LateralLagEstimator: |
|
|
|
|
self.block_avg.update(delay) |
|
|
|
|
self.last_estimate_t = self.t |
|
|
|
|
|
|
|
|
|
def correlation_lags(self, sig_len, dt): |
|
|
|
|
return np.arange(0, sig_len) * dt |
|
|
|
|
|
|
|
|
|
def actuator_delay(self, expected_sig, actual_sig, mask, dt, max_lag=1.): |
|
|
|
|
def actuator_delay(self, expected_sig: np.ndarray, actual_sig: np.ndarray, mask: np.ndarray, dt: float, max_lag: float = 1.) -> tuple[float, float]: |
|
|
|
|
assert len(expected_sig) == len(actual_sig) |
|
|
|
|
max_lag_samples = int(max_lag / dt) |
|
|
|
|
padded_size = fft_next_good_size(len(expected_sig) + max_lag_samples) |
|
|
|
|