diff --git a/selfdrive/locationd/test/test_lagd.py b/selfdrive/locationd/test/test_lagd.py index 579bf9567a..19340bd347 100644 --- a/selfdrive/locationd/test/test_lagd.py +++ b/selfdrive/locationd/test/test_lagd.py @@ -16,17 +16,25 @@ MAX_ERR_FRAMES = 1 DT = 0.05 -def process_messages(mocker, estimator, lag_frames, n_frames, vego=20.0): +def process_messages(mocker, estimator, lag_frames, n_frames, vego=20.0, rejection_threshold=0.0): class ZeroMock(mocker.Mock): def __getattr__(self, *args): return 0 for i in range(n_frames): t = i * estimator.dt - desired_cuvature = np.cos(t) / (vego ** 2) - actual_yr = np.cos(t - lag_frames * estimator.dt) / vego + desired_la = np.cos(t) + actual_la = np.cos(t - lag_frames * estimator.dt) + + # if sample is masked out, set it to desired value (no lag) + rejected = random.uniform(0, 1) < rejection_threshold + if rejected: + actual_la = desired_la + + desired_cuvature = desired_la / (vego ** 2) + actual_yr = actual_la / vego msgs = [ - (t, "carControl", mocker.Mock(latActive=True)), + (t, "carControl", mocker.Mock(latActive=not rejected)), (t, "carState", mocker.Mock(vEgo=vego, steeringPressed=False)), (t, "controlsState", mocker.Mock(desiredCurvature=desired_cuvature, lateralControlState=mocker.Mock(which=mocker.Mock(return_value='debugControlState'), debugControlState=ZeroMock()))), @@ -105,6 +113,13 @@ class TestLagd: assert np.allclose(msg.liveDelay.lateralDelayEstimate, lag_frames * DT, atol=0.01) assert msg.liveDelay.validBlocks == BLOCK_NUM_NEEDED + def test_estimator_masking(self, mocker): + mocked_CP, lag_frames = mocker.Mock(steerActuatorDelay=0.8), random.randint(1, 19) + estimator = LateralLagEstimator(mocked_CP, DT, min_recovery_buffer_sec=0.0, min_yr=0.0, min_valid_block_count=1) + process_messages(mocker, estimator, lag_frames, (int(MIN_OKAY_WINDOW_SEC / DT) + BLOCK_SIZE) * 2, rejection_threshold=0.4) + msg = estimator.get_msg(True) + assert np.allclose(msg.liveDelay.lateralDelayEstimate, lag_frames * DT, atol=0.01) + @pytest.mark.skipif(PC, reason="only on device") @pytest.mark.timeout(30) def test_estimator_performance(self, mocker):