diff --git a/selfdrive/locationd/test/test_lagd.py b/selfdrive/locationd/test/test_lagd.py index b805f1759d..aad8cff229 100644 --- a/selfdrive/locationd/test/test_lagd.py +++ b/selfdrive/locationd/test/test_lagd.py @@ -3,7 +3,7 @@ import numpy as np import time import pytest -from cereal import messaging, log +from cereal import messaging, log, car from openpilot.selfdrive.locationd.lagd import LateralLagEstimator, retrieve_initial_lag, masked_normalized_cross_correlation, \ BLOCK_NUM_NEEDED, BLOCK_SIZE, MIN_OKAY_WINDOW_SEC from openpilot.selfdrive.test.process_replay.migration import migrate, migrate_carParams @@ -16,11 +16,7 @@ MAX_ERR_FRAMES = 1 DT = 0.05 -def process_messages(mocker, estimator, lag_frames, n_frames, vego=20.0, rejection_threshold=0.0): - class ZeroMock(mocker.Mock): - def __getattr__(self, *args): - return 0 - +def process_messages(estimator, lag_frames, n_frames, vego=20.0, rejection_threshold=0.0): for i in range(n_frames): t = i * estimator.dt desired_la = np.cos(10 * t) * 0.1 @@ -31,19 +27,15 @@ def process_messages(mocker, estimator, lag_frames, n_frames, vego=20.0, rejecti if rejected: actual_la = desired_la - desired_cuvature = desired_la / (vego ** 2) - actual_yr = actual_la / vego + desired_cuvature = float(desired_la / (vego ** 2)) + actual_yr = float(actual_la / vego) msgs = [ - (t, "carControl", mocker.Mock(latActive=not rejected)), - (t, "carState", mocker.Mock(vEgo=vego, steeringPressed=False)), - (t, "controlsState", mocker.Mock(desiredCurvature=desired_cuvature, - lateralControlState=mocker.Mock(which=mocker.Mock(return_value='debugControlState'), debugControlState=ZeroMock()))), - (t, "livePose", mocker.Mock(orientationNED=ZeroMock(), - velocityDevice=ZeroMock(), - accelerationDevice=ZeroMock(), - angularVelocityDevice=ZeroMock(z=actual_yr, valid=True), - posenetOK=True, inputsOK=True)), - (t, "liveCalibration", mocker.Mock(rpyCalib=[0, 0, 0], calStatus=log.LiveCalibrationData.Status.calibrated)), + (t, "carControl", car.CarControl(latActive=not rejected)), + (t, "carState", car.CarState(vEgo=vego, steeringPressed=False)), + (t, "controlsState", log.ControlsState(desiredCurvature=desired_cuvature)), + (t, "livePose", log.LivePose(angularVelocityDevice=log.LivePose.XYZMeasurement(z=actual_yr, valid=True), + posenetOK=True, inputsOK=True)), + (t, "liveCalibration", log.LiveCalibrationData(rpyCalib=[0, 0, 0], calStatus=log.LiveCalibrationData.Status.calibrated)), ] for t, w, m in msgs: estimator.handle_log(t, w, m) @@ -94,8 +86,8 @@ class TestLagd: corr = masked_normalized_cross_correlation(desired_sig, actual_sig, mask, 200)[len(desired_sig) - 1:len(desired_sig) + 20] assert np.argmax(corr) in range(lag_frames - MAX_ERR_FRAMES, lag_frames + MAX_ERR_FRAMES + 1) - def test_empty_estimator(self, mocker): - mocked_CP = mocker.Mock(steerActuatorDelay=0.8) + def test_empty_estimator(self): + mocked_CP = car.CarParams(steerActuatorDelay=0.8) estimator = LateralLagEstimator(mocked_CP, DT) msg = estimator.get_msg(True) assert msg.liveDelay.status == 'unestimated' @@ -103,12 +95,12 @@ class TestLagd: assert np.allclose(msg.liveDelay.lateralDelayEstimate, estimator.initial_lag) assert msg.liveDelay.validBlocks == 0 - def test_estimator_basics(self, mocker, subtests): + def test_estimator_basics(self, subtests): for lag_frames in range(5): with subtests.test(msg=f"lag_frames={lag_frames}"): - mocked_CP = mocker.Mock(steerActuatorDelay=0.8) + mocked_CP = car.CarParams(steerActuatorDelay=0.8) estimator = LateralLagEstimator(mocked_CP, DT, min_recovery_buffer_sec=0.0, min_yr=0.0) - process_messages(mocker, estimator, lag_frames, int(MIN_OKAY_WINDOW_SEC / DT) + BLOCK_NUM_NEEDED * BLOCK_SIZE) + process_messages(estimator, lag_frames, int(MIN_OKAY_WINDOW_SEC / DT) + BLOCK_NUM_NEEDED * BLOCK_SIZE) msg = estimator.get_msg(True) assert msg.liveDelay.status == 'estimated' assert np.allclose(msg.liveDelay.lateralDelay, lag_frames * DT, atol=0.01) @@ -116,18 +108,18 @@ class TestLagd: assert np.allclose(msg.liveDelay.lateralDelayEstimateStd, 0.0, atol=0.01) assert msg.liveDelay.validBlocks == BLOCK_NUM_NEEDED - def test_estimator_masking(self, mocker): - mocked_CP, lag_frames = mocker.Mock(steerActuatorDelay=0.8), random.randint(1, 19) + def test_estimator_masking(self): + mocked_CP, lag_frames = car.CarParams(steerActuatorDelay=0.8), random.randint(1, 19) estimator = LateralLagEstimator(mocked_CP, DT, min_recovery_buffer_sec=0.0, min_yr=0.0, min_valid_block_count=1) - process_messages(mocker, estimator, lag_frames, (int(MIN_OKAY_WINDOW_SEC / DT) + BLOCK_SIZE) * 2, rejection_threshold=0.4) + process_messages(estimator, lag_frames, (int(MIN_OKAY_WINDOW_SEC / DT) + BLOCK_SIZE) * 2, rejection_threshold=0.4) msg = estimator.get_msg(True) assert np.allclose(msg.liveDelay.lateralDelayEstimate, lag_frames * DT, atol=0.01) assert np.allclose(msg.liveDelay.lateralDelayEstimateStd, 0.0, atol=0.01) @pytest.mark.skipif(PC, reason="only on device") @pytest.mark.timeout(60) - def test_estimator_performance(self, mocker): - mocked_CP = mocker.Mock(steerActuatorDelay=0.8) + def test_estimator_performance(self): + mocked_CP = car.CarParams(steerActuatorDelay=0.8) estimator = LateralLagEstimator(mocked_CP, DT) ds = []