|
|
|
@ -16,17 +16,25 @@ MAX_ERR_FRAMES = 1 |
|
|
|
|
DT = 0.05 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_messages(mocker, estimator, lag_frames, n_frames, vego=20.0): |
|
|
|
|
def process_messages(mocker, estimator, lag_frames, n_frames, vego=20.0, rejection_threshold=0.0): |
|
|
|
|
class ZeroMock(mocker.Mock): |
|
|
|
|
def __getattr__(self, *args): |
|
|
|
|
return 0 |
|
|
|
|
|
|
|
|
|
for i in range(n_frames): |
|
|
|
|
t = i * estimator.dt |
|
|
|
|
desired_cuvature = np.cos(t) / (vego ** 2) |
|
|
|
|
actual_yr = np.cos(t - lag_frames * estimator.dt) / vego |
|
|
|
|
desired_la = np.cos(t) |
|
|
|
|
actual_la = np.cos(t - lag_frames * estimator.dt) |
|
|
|
|
|
|
|
|
|
# if sample is masked out, set it to desired value (no lag) |
|
|
|
|
rejected = random.uniform(0, 1) < rejection_threshold |
|
|
|
|
if rejected: |
|
|
|
|
actual_la = desired_la |
|
|
|
|
|
|
|
|
|
desired_cuvature = desired_la / (vego ** 2) |
|
|
|
|
actual_yr = actual_la / vego |
|
|
|
|
msgs = [ |
|
|
|
|
(t, "carControl", mocker.Mock(latActive=True)), |
|
|
|
|
(t, "carControl", mocker.Mock(latActive=not rejected)), |
|
|
|
|
(t, "carState", mocker.Mock(vEgo=vego, steeringPressed=False)), |
|
|
|
|
(t, "controlsState", mocker.Mock(desiredCurvature=desired_cuvature, |
|
|
|
|
lateralControlState=mocker.Mock(which=mocker.Mock(return_value='debugControlState'), debugControlState=ZeroMock()))), |
|
|
|
@ -105,6 +113,13 @@ class TestLagd: |
|
|
|
|
assert np.allclose(msg.liveDelay.lateralDelayEstimate, lag_frames * DT, atol=0.01) |
|
|
|
|
assert msg.liveDelay.validBlocks == BLOCK_NUM_NEEDED |
|
|
|
|
|
|
|
|
|
def test_estimator_masking(self, mocker): |
|
|
|
|
mocked_CP, lag_frames = mocker.Mock(steerActuatorDelay=0.8), random.randint(1, 19) |
|
|
|
|
estimator = LateralLagEstimator(mocked_CP, DT, min_recovery_buffer_sec=0.0, min_yr=0.0, min_valid_block_count=1) |
|
|
|
|
process_messages(mocker, estimator, lag_frames, (int(MIN_OKAY_WINDOW_SEC / DT) + BLOCK_SIZE) * 2, rejection_threshold=0.4) |
|
|
|
|
msg = estimator.get_msg(True) |
|
|
|
|
assert np.allclose(msg.liveDelay.lateralDelayEstimate, lag_frames * DT, atol=0.01) |
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif(PC, reason="only on device") |
|
|
|
|
@pytest.mark.timeout(30) |
|
|
|
|
def test_estimator_performance(self, mocker): |
|
|
|
|