From 22b2715530179f7ab2f5420f3981648c63d6a33e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kacper=20R=C4=85czy?= Date: Wed, 26 Mar 2025 19:11:58 -0700 Subject: [PATCH] Move ncc code to separate function --- selfdrive/locationd/lagd.py | 112 +++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 53 deletions(-) diff --git a/selfdrive/locationd/lagd.py b/selfdrive/locationd/lagd.py index dc81dbcdcf..ee92db7b84 100755 --- a/selfdrive/locationd/lagd.py +++ b/selfdrive/locationd/lagd.py @@ -28,6 +28,64 @@ def parabolic_peak_interp(R, max_index): return max_index + offset +def masked_normalized_cross_correlation(expected_sig, actual_sig, mask): + """ + References: + D. Padfield. "Masked FFT registration". In Proc. Computer Vision and + Pattern Recognition, pp. 2918-2925 (2010). + :DOI:`10.1109/CVPR.2010.5540032` + """ + + eps = np.finfo(np.float64).eps + expected_sig = np.asarray(expected_sig, dtype=np.float64) + actual_sig = np.asarray(actual_sig, dtype=np.float64) + + expected_sig[~mask] = 0.0 + actual_sig[~mask] = 0.0 + + rotated_expected_sig = expected_sig[::-1] + rotated_mask = mask[::-1] + + n = len(expected_sig) + len(actual_sig) - 1 + fft = partial(np.fft.fft, n=n) + + actual_sig_fft = fft(actual_sig) + rotated_expected_sig_fft = fft(rotated_expected_sig) + actual_mask_fft = fft(mask.astype(np.float64)) + rotated_mask_fft = fft(rotated_mask.astype(np.float64)) + + number_overlap_masked_samples = np.fft.ifft(rotated_mask_fft * actual_mask_fft).real + number_overlap_masked_samples[:] = np.round(number_overlap_masked_samples) + number_overlap_masked_samples[:] = np.fmax(number_overlap_masked_samples, eps) + masked_correlated_actual_fft = np.fft.ifft(rotated_mask_fft * actual_sig_fft).real + masked_correlated_expected_fft = np.fft.ifft(actual_mask_fft * rotated_expected_sig_fft).real + + numerator = np.fft.ifft(rotated_expected_sig_fft * actual_sig_fft).real + numerator -= masked_correlated_actual_fft * masked_correlated_expected_fft / number_overlap_masked_samples + + actual_squared_fft = fft(actual_sig ** 2) + actual_sig_denom = np.fft.ifft(rotated_mask_fft * actual_squared_fft).real + actual_sig_denom -= masked_correlated_actual_fft ** 2 / number_overlap_masked_samples + actual_sig_denom[:] = np.fmax(actual_sig_denom, 0.0) + + rotated_expected_squared_fft = fft(rotated_expected_sig ** 2) + expected_sig_denom = np.fft.ifft(actual_mask_fft * rotated_expected_squared_fft).real + expected_sig_denom -= masked_correlated_expected_fft ** 2 / number_overlap_masked_samples + expected_sig_denom[:] = np.fmax(expected_sig_denom, 0.0) + + denom = np.sqrt(actual_sig_denom * expected_sig_denom) + + # zero-out samples with very small denominators + tol = 1e3 * eps * np.max(np.abs(denom), keepdims=True) + nonzero_indices = denom > tol + + ncc = np.zeros_like(denom, dtype=np.float64) + ncc[nonzero_indices] = numerator[nonzero_indices] / denom[nonzero_indices] + np.clip(ncc, -1, 1, out=ncc) + + return ncc + + class Points: def __init__(self, num_points): self.times = deque(maxlen=num_points) @@ -161,59 +219,7 @@ class LagEstimator: return np.arange(0, sig_len) * dt def actuator_delay(self, expected_sig, actual_sig, mask, dt, max_lag=1.): - """ - References: - D. Padfield. "Masked FFT registration". In Proc. Computer Vision and - Pattern Recognition, pp. 2918-2925 (2010). - :DOI:`10.1109/CVPR.2010.5540032` - """ - - eps = np.finfo(np.float64).eps - expected_sig = np.asarray(expected_sig, dtype=np.float64) - actual_sig = np.asarray(actual_sig, dtype=np.float64) - - expected_sig[~mask] = 0.0 - actual_sig[~mask] = 0.0 - - rotated_expected_sig = expected_sig[::-1] - rotated_mask = mask[::-1] - - n = len(expected_sig) + len(actual_sig) - 1 - fft = partial(np.fft.fft, n=n) - - actual_sig_fft = fft(actual_sig) - rotated_expected_sig_fft = fft(rotated_expected_sig) - actual_mask_fft = fft(mask.astype(np.float64)) - rotated_mask_fft = fft(rotated_mask.astype(np.float64)) - - number_overlap_masked_samples = np.fft.ifft(rotated_mask_fft * actual_mask_fft).real - number_overlap_masked_samples[:] = np.round(number_overlap_masked_samples) - number_overlap_masked_samples[:] = np.fmax(number_overlap_masked_samples, eps) - masked_correlated_actual_fft = np.fft.ifft(rotated_mask_fft * actual_sig_fft).real - masked_correlated_expected_fft = np.fft.ifft(actual_mask_fft * rotated_expected_sig_fft).real - - numerator = np.fft.ifft(rotated_expected_sig_fft * actual_sig_fft).real - numerator -= masked_correlated_actual_fft * masked_correlated_expected_fft / number_overlap_masked_samples - - actual_squared_fft = fft(actual_sig ** 2) - actual_sig_denom = np.fft.ifft(rotated_mask_fft * actual_squared_fft).real - actual_sig_denom -= masked_correlated_actual_fft ** 2 / number_overlap_masked_samples - actual_sig_denom[:] = np.fmax(actual_sig_denom, 0.0) - - rotated_expected_squared_fft = fft(rotated_expected_sig ** 2) - expected_sig_denom = np.fft.ifft(actual_mask_fft * rotated_expected_squared_fft).real - expected_sig_denom -= masked_correlated_expected_fft ** 2 / number_overlap_masked_samples - expected_sig_denom[:] = np.fmax(expected_sig_denom, 0.0) - - denom = np.sqrt(actual_sig_denom * expected_sig_denom) - - # zero-out samples with very small denominators - tol = 1e3 * eps * np.max(np.abs(denom), keepdims=True) - nonzero_indices = denom > tol - - ncc = np.zeros_like(denom, dtype=np.float64) - ncc[nonzero_indices] = numerator[nonzero_indices] / denom[nonzero_indices] - np.clip(ncc, -1, 1, out=ncc) + ncc = masked_normalized_cross_correlation(expected_sig, actual_sig, mask) # only consider lags from 0 to max_lag max_lag_samples = int(max_lag / dt)