Improve the tests

pull/34975/head
Kacper Rączy 3 weeks ago
parent a2c431df35
commit 65ad12a613
  1. 52
      selfdrive/locationd/test/test_lagd.py

@ -4,7 +4,7 @@ import time
import pytest import pytest
from cereal import messaging from cereal import messaging
from openpilot.selfdrive.locationd.lagd import LateralLagEstimator, retrieve_initial_lag, masked_normalized_cross_correlation from openpilot.selfdrive.locationd.lagd import LateralLagEstimator, retrieve_initial_lag, masked_normalized_cross_correlation, BLOCK_NUM_NEEDED, BLOCK_SIZE, MIN_OKAY_WINDOW_SEC
from openpilot.selfdrive.test.process_replay.migration import migrate, migrate_carParams from openpilot.selfdrive.test.process_replay.migration import migrate, migrate_carParams
from openpilot.selfdrive.locationd.test.test_locationd_scenarios import TEST_ROUTE from openpilot.selfdrive.locationd.test.test_locationd_scenarios import TEST_ROUTE
from openpilot.common.params import Params from openpilot.common.params import Params
@ -15,16 +15,15 @@ MAX_ERR_FRAMES = 1
DT = 0.05 DT = 0.05
def run_estimator_on_fake_data(estimator, dt, lag_frames, n_frames, mocker): def process_messages(mocker, estimator, lag_frames, n_frames, vego=20.0):
class ZeroMock(mocker.Mock): class ZeroMock(mocker.Mock):
def __getattr__(self, *args): def __getattr__(self, *args):
return 0 return 0
for i in range(n_frames): for i in range(n_frames):
t = i * dt t = i * estimator.dt
vego = 20.0
desired_cuvature = np.cos(t) / (vego ** 2) desired_cuvature = np.cos(t) / (vego ** 2)
actual_yr = np.cos(t - lag_frames * dt) / vego actual_yr = np.cos(t - lag_frames * estimator.dt) / vego
msgs = [ msgs = [
(t, "carControl", mocker.Mock(latActive=True)), (t, "carControl", mocker.Mock(latActive=True)),
(t, "carState", mocker.Mock(vEgo=vego, steeringPressed=False)), (t, "carState", mocker.Mock(vEgo=vego, steeringPressed=False)),
@ -77,30 +76,33 @@ class TestLagd:
corr = masked_normalized_cross_correlation(desired_sig, actual_sig, mask, 200)[len(desired_sig) - 1:len(desired_sig) + 20] corr = masked_normalized_cross_correlation(desired_sig, actual_sig, mask, 200)[len(desired_sig) - 1:len(desired_sig) + 20]
assert np.argmax(corr) in range(lag_frames - MAX_ERR_FRAMES, lag_frames + MAX_ERR_FRAMES + 1) assert np.argmax(corr) in range(lag_frames - MAX_ERR_FRAMES, lag_frames + MAX_ERR_FRAMES + 1)
# mask out 40% of the values, and make them noise
mask = np.random.choice([True, False], size=len(desired_sig), p=[0.6, 0.4])
desired_sig[~mask] = np.random.normal(0, 1, size=np.sum(~mask))
actual_sig[~mask] = np.random.normal(0, 1, size=np.sum(~mask))
corr = masked_normalized_cross_correlation(desired_sig, actual_sig, mask, 200)[len(desired_sig) - 1:len(desired_sig) + 20]
assert np.argmax(corr) in range(lag_frames - MAX_ERR_FRAMES, lag_frames + MAX_ERR_FRAMES + 1)
def test_empty_estimator(self, mocker): def test_empty_estimator(self, mocker):
mocked_CP = mocker.Mock(steerActuatorDelay=0.8) mocked_CP = mocker.Mock(steerActuatorDelay=0.8)
estimator = LateralLagEstimator(mocked_CP, DT) estimator = LateralLagEstimator(mocked_CP, DT)
msg = estimator.get_msg(True) msg = estimator.get_msg(True)
assert msg.liveDelay.status == 'unestimated' assert msg.liveDelay.status == 'unestimated'
assert np.allclose(msg.liveDelay.lateralDelay, 1.0) assert np.allclose(msg.liveDelay.lateralDelay, estimator.initial_lag)
assert np.allclose(msg.liveDelay.lateralDelayEstimate, estimator.initial_lag)
def test_estimator(self, mocker): assert msg.liveDelay.validBlocks == 0
iters = 100
lag_frames = random.randint(1, 19) def test_estimator_basics(self, mocker, subtests):
for lag_frames in range(5):
mocked_CP = mocker.Mock(steerActuatorDelay=1.0) with subtests.test(msg=f"lag_frames={lag_frames}"):
estimator = LateralLagEstimator(mocked_CP, DT, mocked_CP = mocker.Mock(steerActuatorDelay=0.8)
block_count=10, min_valid_block_count=0, estimator = LateralLagEstimator(mocked_CP, DT, min_recovery_buffer_sec=0.0, min_yr=0.0)
block_size=1, okay_window_sec=iters * DT, process_messages(mocker, estimator, lag_frames, int(MIN_OKAY_WINDOW_SEC / DT) + BLOCK_NUM_NEEDED * BLOCK_SIZE)
min_recovery_buffer_sec=0, min_yr=0) msg = estimator.get_msg(True)
run_estimator_on_fake_data(estimator, DT, lag_frames, iters, mocker) assert msg.liveDelay.status == 'estimated'
assert np.allclose(msg.liveDelay.lateralDelay, lag_frames * DT, atol=0.01)
# expect one block filled, with lateralDelayEstimate equal to lateralDelay equal to lag_frames assert np.allclose(msg.liveDelay.lateralDelayEstimate, lag_frames * DT, atol=0.01)
output = estimator.get_msg(True) assert msg.liveDelay.validBlocks == BLOCK_NUM_NEEDED
assert output.liveDelay.status == 'estimated'
assert np.allclose(output.liveDelay.lateralDelay, lag_frames * DT, atol=0.01)
assert np.allclose(output.liveDelay.lateralDelayEstimate, output.liveDelay.lateralDelay, atol=0.01)
assert output.liveDelay.validBlocks == 1
@pytest.mark.skipif(PC, reason="only on device") @pytest.mark.skipif(PC, reason="only on device")
@pytest.mark.timeout(30) @pytest.mark.timeout(30)
@ -117,3 +119,5 @@ class TestLagd:
ds.append(d) ds.append(d)
assert np.mean(ds) < DT assert np.mean(ds) < DT
import sys
Loading…
Cancel
Save