openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

119 lines
4.7 KiB

4 weeks ago
import random
import numpy as np
import time
import pytest
from cereal import messaging
from openpilot.selfdrive.locationd.lagd import LateralLagEstimator, retrieve_initial_lag, masked_normalized_cross_correlation
from openpilot.selfdrive.test.process_replay.migration import migrate, migrate_carParams
from openpilot.selfdrive.locationd.test.test_locationd_scenarios import TEST_ROUTE
from openpilot.common.params import Params
from openpilot.tools.lib.logreader import LogReader
from openpilot.system.hardware import PC
MAX_ERR_FRAMES = 1
4 weeks ago
def run_estimator_on_fake_data(estimator, dt, lag_frames, n_frames, mocker):
class ZeroMock(mocker.Mock):
def __getattr__(self, *args):
return 0
for i in range(n_frames):
t = i * dt
vego = 20.0
desired_cuvature = np.cos(t) / (vego ** 2)
actual_yr = np.cos(t - lag_frames * dt) / vego
msgs = [
(t, "carControl", mocker.Mock(latActive=True)),
(t, "carState", mocker.Mock(vEgo=vego, steeringPressed=False)),
(t, "controlsState", mocker.Mock(desiredCurvature=desired_cuvature,
lateralControlState=mocker.Mock(which=mocker.Mock(return_value='debugControlState'), debugControlState=ZeroMock()))),
(t, "livePose", mocker.Mock(orientationNED=ZeroMock(),
velocityDevice=ZeroMock(),
accelerationDevice=ZeroMock(),
angularVelocityDevice=ZeroMock(z=actual_yr))),
]
for t, w, m in msgs:
estimator.handle_log(t, w, m)
estimator.update_points()
estimator.update_estimate()
4 weeks ago
class TestLagd:
def test_read_saved_params(self):
params = Params()
lr = migrate(LogReader(TEST_ROUTE), [migrate_carParams])
CP = next(m for m in lr if m.which() == "carParams").carParams
msg = messaging.new_message('liveDelay')
msg.liveDelay.lateralDelayEstimate = random.random()
msg.liveDelay.validBlocks = random.randint(1, 10)
params.put("LiveLag", msg.to_bytes())
params.put("CarParamsPrevRoute", CP.as_builder().to_bytes())
saved_lag_params = retrieve_initial_lag(params, CP)
assert saved_lag_params is not None
lag, valid_blocks = saved_lag_params
assert lag == msg.liveDelay.lateralDelayEstimate
assert valid_blocks == msg.liveDelay.validBlocks
def test_ncc(self):
4 weeks ago
lag_frames = random.randint(1, 19)
4 weeks ago
desired_sig = np.sin(np.arange(0.0, 10.0, 0.1))
actual_sig = np.sin(np.arange(0.0, 10.0, 0.1) - lag_frames * 0.1)
mask = np.ones(len(desired_sig), dtype=bool)
corr = masked_normalized_cross_correlation(desired_sig, actual_sig, mask, 200)[len(desired_sig) - 1:len(desired_sig) + 20]
assert np.argmax(corr) == lag_frames
# add some noise
desired_sig += np.random.normal(0, 0.05, len(desired_sig))
actual_sig += np.random.normal(0, 0.05, len(actual_sig))
corr = masked_normalized_cross_correlation(desired_sig, actual_sig, mask, 200)[len(desired_sig) - 1:len(desired_sig) + 20]
assert np.argmax(corr) in range(lag_frames - MAX_ERR_FRAMES, lag_frames + MAX_ERR_FRAMES + 1)
4 weeks ago
def test_empty_estimator(self, mocker):
mocked_CP = mocker.Mock(steerActuatorDelay=0.8)
estimator = LateralLagEstimator(mocked_CP, 0.05)
msg = estimator.get_msg(True)
assert msg.liveDelay.status == 'unestimated'
assert np.allclose(msg.liveDelay.lateralDelay, 1.0)
4 weeks ago
4 weeks ago
def test_estimator(self, mocker):
dt, iters = 0.05, 100
lag_frames = random.randint(1, 19)
4 weeks ago
4 weeks ago
mocked_CP = mocker.Mock(steerActuatorDelay=1.0)
4 weeks ago
estimator = LateralLagEstimator(mocked_CP, dt,
4 weeks ago
block_count=10, min_valid_block_count=0,
4 weeks ago
block_size=1, okay_window_sec=iters * dt,
4 weeks ago
min_recovery_buffer_sec=0, min_yr=0)
4 weeks ago
run_estimator_on_fake_data(estimator, dt, lag_frames, iters, mocker)
4 weeks ago
# expect one block filled, with lateralDelayEstimate equal to lateralDelay equal to lag_frames
output = estimator.get_msg(True)
assert output.liveDelay.status == 'estimated'
4 weeks ago
assert np.allclose(output.liveDelay.lateralDelay, lag_frames * dt, atol=0.01)
assert np.allclose(output.liveDelay.lateralDelayEstimate, output.liveDelay.lateralDelay, atol=0.01)
4 weeks ago
assert output.liveDelay.validBlocks == 1
@pytest.mark.skipif(PC, reason="only on device")
4 weeks ago
@pytest.mark.timeout(30)
4 weeks ago
def test_estimator_performance(self, mocker):
mocked_CP = mocker.Mock(steerActuatorDelay=0.1)
4 weeks ago
estimator = LateralLagEstimator(mocked_CP, 0.05)
ds = []
for _ in range(1000):
4 weeks ago
st = time.perf_counter()
estimator.update_points()
estimator.update_estimate()
d = time.perf_counter() - st
ds.append(d)
assert np.mean(ds) < 0.05