|
|
|
@ -12,6 +12,7 @@ from openpilot.tools.lib.logreader import LogReader |
|
|
|
|
from openpilot.system.hardware import PC |
|
|
|
|
|
|
|
|
|
MAX_ERR_FRAMES = 1 |
|
|
|
|
DT = 0.05 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_estimator_on_fake_data(estimator, dt, lag_frames, n_frames, mocker): |
|
|
|
@ -78,26 +79,26 @@ class TestLagd: |
|
|
|
|
|
|
|
|
|
def test_empty_estimator(self, mocker): |
|
|
|
|
mocked_CP = mocker.Mock(steerActuatorDelay=0.8) |
|
|
|
|
estimator = LateralLagEstimator(mocked_CP, 0.05) |
|
|
|
|
estimator = LateralLagEstimator(mocked_CP, DT) |
|
|
|
|
msg = estimator.get_msg(True) |
|
|
|
|
assert msg.liveDelay.status == 'unestimated' |
|
|
|
|
assert np.allclose(msg.liveDelay.lateralDelay, 1.0) |
|
|
|
|
|
|
|
|
|
def test_estimator(self, mocker): |
|
|
|
|
dt, iters = 0.05, 100 |
|
|
|
|
iters = 100 |
|
|
|
|
lag_frames = random.randint(1, 19) |
|
|
|
|
|
|
|
|
|
mocked_CP = mocker.Mock(steerActuatorDelay=1.0) |
|
|
|
|
estimator = LateralLagEstimator(mocked_CP, dt, |
|
|
|
|
estimator = LateralLagEstimator(mocked_CP, DT, |
|
|
|
|
block_count=10, min_valid_block_count=0, |
|
|
|
|
block_size=1, okay_window_sec=iters * dt, |
|
|
|
|
block_size=1, okay_window_sec=iters * DT, |
|
|
|
|
min_recovery_buffer_sec=0, min_yr=0) |
|
|
|
|
run_estimator_on_fake_data(estimator, dt, lag_frames, iters, mocker) |
|
|
|
|
run_estimator_on_fake_data(estimator, DT, lag_frames, iters, mocker) |
|
|
|
|
|
|
|
|
|
# expect one block filled, with lateralDelayEstimate equal to lateralDelay equal to lag_frames |
|
|
|
|
output = estimator.get_msg(True) |
|
|
|
|
assert output.liveDelay.status == 'estimated' |
|
|
|
|
assert np.allclose(output.liveDelay.lateralDelay, lag_frames * dt, atol=0.01) |
|
|
|
|
assert np.allclose(output.liveDelay.lateralDelay, lag_frames * DT, atol=0.01) |
|
|
|
|
assert np.allclose(output.liveDelay.lateralDelayEstimate, output.liveDelay.lateralDelay, atol=0.01) |
|
|
|
|
assert output.liveDelay.validBlocks == 1 |
|
|
|
|
|
|
|
|
@ -105,7 +106,7 @@ class TestLagd: |
|
|
|
|
@pytest.mark.timeout(30) |
|
|
|
|
def test_estimator_performance(self, mocker): |
|
|
|
|
mocked_CP = mocker.Mock(steerActuatorDelay=0.1) |
|
|
|
|
estimator = LateralLagEstimator(mocked_CP, 0.05) |
|
|
|
|
estimator = LateralLagEstimator(mocked_CP, DT) |
|
|
|
|
|
|
|
|
|
ds = [] |
|
|
|
|
for _ in range(1000): |
|
|
|
@ -115,4 +116,4 @@ class TestLagd: |
|
|
|
|
d = time.perf_counter() - st |
|
|
|
|
ds.append(d) |
|
|
|
|
|
|
|
|
|
assert np.mean(ds) < 0.05 |
|
|
|
|
assert np.mean(ds) < DT |
|
|
|
|