diff --git a/selfdrive/locationd/estimators/lateral_lag.py b/selfdrive/locationd/estimators/lateral_lag.py index bde9bf79f1..e725eea7e2 100644 --- a/selfdrive/locationd/estimators/lateral_lag.py +++ b/selfdrive/locationd/estimators/lateral_lag.py @@ -1,9 +1,10 @@ import numpy as np +import capnp from collections import deque from functools import partial, cache import cereal.messaging as messaging -from cereal import log +from cereal import log, car from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose BLOCK_SIZE = 100 @@ -18,7 +19,7 @@ MIN_NCC = 0.95 @cache -def fft_next_good_size(n): +def fft_next_good_size(n: int) -> int: """ smallest composite of 2, 3, 5, 7, 11 that is >= n inspired by pocketfft @@ -54,7 +55,7 @@ def parabolic_peak_interp(R, max_index): return max_index + offset -def masked_normalized_cross_correlation(expected_sig, actual_sig, mask, n): +def masked_normalized_cross_correlation(expected_sig: np.ndarray, actual_sig: np.ndarray, mask: np.ndarray, n: int): """ References: D. Padfield. "Masked FFT registration". In Proc. Computer Vision and @@ -112,7 +113,7 @@ def masked_normalized_cross_correlation(expected_sig, actual_sig, mask, n): class Points: - def __init__(self, num_points): + def __init__(self, num_points: int): self.times = deque(maxlen=num_points) self.okay = deque(maxlen=num_points) self.desired = deque(maxlen=num_points) @@ -126,18 +127,18 @@ class Points: def num_okay(self): return np.count_nonzero(self.okay) - def update(self, t, desired, actual, okay): + def update(self, t: float, desired: float, actual: float, okay: bool): self.times.append(t) self.okay.append(okay) self.desired.append(desired) self.actual.append(actual) - def get(self): + def get(self) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: return np.array(self.times), np.array(self.desired), np.array(self.actual), np.array(self.okay) class BlockAverage: - def __init__(self, num_blocks, block_size, valid_blocks, initial_value): + def __init__(self, num_blocks: int, block_size: int, valid_blocks: int, initial_value: float): self.num_blocks = num_blocks self.block_size = block_size self.block_idx = valid_blocks % block_size @@ -146,14 +147,14 @@ class BlockAverage: self.values = np.tile(initial_value, (num_blocks, 1)) self.valid_blocks = valid_blocks - def update(self, value): + def update(self, value: float): self.values[self.block_idx] = (self.idx * self.values[self.block_idx] + (self.block_size - self.idx) * value) / self.block_size self.idx = (self.idx + 1) % self.block_size if self.idx == 0: self.block_idx = (self.block_idx + 1) % self.num_blocks self.valid_blocks = min(self.valid_blocks + 1, self.num_blocks) - def get(self): + def get(self) -> float | None: valid_block_idx = [i for i in range(self.valid_blocks) if i != self.block_idx] if not valid_block_idx: return None @@ -163,10 +164,10 @@ class BlockAverage: class LateralLagEstimator: inputs = {"carControl", "carState", "controlsState", "liveCalibration", "livePose"} - def __init__(self, CP, dt, - block_count=BLOCK_NUM, min_valid_block_count=BLOCK_NUM_NEEDED, block_size=BLOCK_SIZE, - window_sec=MOVING_WINDOW_SEC, okay_window_sec=MIN_OKAY_WINDOW_SEC, min_recovery_buffer_sec=MIN_RECOVERY_BUFFER_SEC, - min_vego=MIN_VEGO, min_yr=MIN_ABS_YAW_RATE, min_ncc=MIN_NCC): + def __init__(self, CP: car.CarParams, dt: float, + block_count: int = BLOCK_NUM, min_valid_block_count: int = BLOCK_NUM_NEEDED, block_size: int = BLOCK_SIZE, + window_sec: float = MOVING_WINDOW_SEC, okay_window_sec: float = MIN_OKAY_WINDOW_SEC, min_recovery_buffer_sec: float = MIN_RECOVERY_BUFFER_SEC, + min_vego: float = MIN_VEGO, min_yr: float = MIN_ABS_YAW_RATE, min_ncc: float = MIN_NCC): self.dt = dt self.window_sec = window_sec self.okay_window_sec = okay_window_sec @@ -196,12 +197,12 @@ class LateralLagEstimator: self.reset(self.initial_lag, 0) - def reset(self, initial_lag, valid_blocks): + def reset(self, initial_lag: float, valid_blocks: int): window_len = int(self.window_sec / self.dt) self.points = Points(window_len) self.block_avg = BlockAverage(self.block_count, self.block_size, valid_blocks, initial_lag) - def get_msg(self, valid, debug=False): + def get_msg(self, valid: bool, debug: bool = False) -> capnp._DynamicStructBuilder: msg = messaging.new_message('liveDelay') msg.valid = valid @@ -222,7 +223,7 @@ class LateralLagEstimator: return msg - def handle_log(self, t, which, msg): + def handle_log(self, t: float, which: str, msg: capnp._DynamicStructReader): if which == "carControl": self.lat_active = msg.latActive elif which == "carState": @@ -282,10 +283,7 @@ class LateralLagEstimator: self.block_avg.update(delay) self.last_estimate_t = self.t - def correlation_lags(self, sig_len, dt): - return np.arange(0, sig_len) * dt - - def actuator_delay(self, expected_sig, actual_sig, mask, dt, max_lag=1.): + def actuator_delay(self, expected_sig: np.ndarray, actual_sig: np.ndarray, mask: np.ndarray, dt: float, max_lag: float = 1.) -> tuple[float, float]: assert len(expected_sig) == len(actual_sig) max_lag_samples = int(max_lag / dt) padded_size = fft_next_good_size(len(expected_sig) + max_lag_samples)