Masked fft NCC implementation

online-lag
Kacper Rączy 1 month ago
parent 06a4eb453a
commit 23f5354ef6
  1. 71
      selfdrive/locationd/lagd.py

@ -1,8 +1,7 @@
#!/usr/bin/env python3
import numpy as np
from collections import deque
from skimage.registration._masked_phase_cross_correlation import cross_correlate_masked
from functools import partial
import cereal.messaging as messaging
from cereal import car, log
@ -155,22 +154,68 @@ class LagEstimator:
def correlation_lags(self, sig_len, dt):
return np.arange(0, sig_len) * dt
def actuator_delay(self, expected_sig, actual_sig, is_okay, dt, max_lag=1.):
# masked (gated) normalized cross-correlation
# normalized, can be used for anything, like comparsion
def actuator_delay(self, expected_sig, actual_sig, mask, dt, max_lag=1.):
"""
References:
D. Padfield. "Masked FFT registration". In Proc. Computer Vision and
Pattern Recognition, pp. 2918-2925 (2010).
:DOI:`10.1109/CVPR.2010.5540032`
"""
eps = np.finfo(np.float64).eps
expected_sig = np.asarray(expected_sig, dtype=np.float64)
actual_sig = np.asarray(actual_sig, dtype=np.float64)
expected_sig[~mask] = 0.0
actual_sig[~mask] = 0.0
rotated_expected_sig = expected_sig[::-1]
rotated_mask = mask[::-1]
n = len(expected_sig) + len(actual_sig) - 1
fft = partial(np.fft.fft, n=n)
actual_sig_fft = fft(actual_sig)
rotated_expected_sig_fft = fft(rotated_expected_sig)
actual_mask_fft = fft(mask.astype(np.float64))
rotated_mask_fft = fft(rotated_mask.astype(np.float64))
number_overlap_masked_samples = np.fft.ifft(rotated_mask_fft * actual_mask_fft).real
number_overlap_masked_samples[:] = np.round(number_overlap_masked_samples)
number_overlap_masked_samples[:] = np.fmax(number_overlap_masked_samples, eps)
masked_correlated_actual_fft = np.fft.ifft(rotated_mask_fft * actual_sig_fft).real
masked_correlated_expected_fft = np.fft.ifft(actual_mask_fft * rotated_expected_sig_fft).real
numerator = np.fft.ifft(rotated_expected_sig_fft * actual_sig_fft).real
numerator -= masked_correlated_actual_fft * masked_correlated_expected_fft / number_overlap_masked_samples
actual_squared_fft = fft(actual_sig ** 2)
actual_sig_denom = np.fft.ifft(rotated_mask_fft * actual_squared_fft).real
actual_sig_denom -= masked_correlated_actual_fft ** 2 / number_overlap_masked_samples
actual_sig_denom[:] = np.fmax(actual_sig_denom, 0.0)
rotated_expected_squared_fft = fft(rotated_expected_sig ** 2)
expected_sig_denom = np.fft.ifft(actual_mask_fft * rotated_expected_squared_fft).real
expected_sig_denom -= masked_correlated_expected_fft ** 2 / number_overlap_masked_samples
expected_sig_denom[:] = np.fmax(expected_sig_denom, 0.0)
denom = np.sqrt(actual_sig_denom * expected_sig_denom)
assert len(expected_sig) == len(actual_sig)
# zero-out samples with very small denominators
tol = 1e3 * eps * np.max(np.abs(denom), keepdims=True)
nonzero_indices = denom > tol
xcorr = cross_correlate_masked(actual_sig, expected_sig, is_okay, is_okay, axes=tuple(range(actual_sig.ndim)),)
lags = self.correlation_lags(len(expected_sig), dt)
ncc = np.zeros_like(denom, dtype=np.float64)
ncc[nonzero_indices] = numerator[nonzero_indices] / denom[nonzero_indices]
np.clip(ncc, -1, 1, out=ncc)
n_frames_max_delay = int(max_lag / dt)
xcorr = xcorr[len(expected_sig) - 1: len(expected_sig) - 1 + n_frames_max_delay]
lags = lags[:n_frames_max_delay]
max_lag_samples = int(max_lag / dt)
roi_ncc = ncc[len(expected_sig) - 1: len(expected_sig) - 1 + max_lag_samples]
max_corr_index = np.argmax(xcorr)
max_corr_index = np.argmax(roi_ncc)
corr = roi_ncc[max_corr_index]
lag = max_corr_index * dt
lag, corr = lags[max_corr_index], xcorr[max_corr_index]
return lag, corr

Loading…
Cancel
Save