openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
5.9 KiB

2 months ago
import random
import numpy as np
import time
import pytest
from cereal import messaging
2 months ago
from openpilot.selfdrive.locationd.lagd import LateralLagEstimator, retrieve_initial_lag, masked_normalized_cross_correlation, \
BLOCK_NUM_NEEDED, BLOCK_SIZE, MIN_OKAY_WINDOW_SEC
2 months ago
from openpilot.selfdrive.test.process_replay.migration import migrate, migrate_carParams
from openpilot.selfdrive.locationd.test.test_locationd_scenarios import TEST_ROUTE
from openpilot.common.params import Params
from openpilot.tools.lib.logreader import LogReader
from openpilot.system.hardware import PC
MAX_ERR_FRAMES = 1
2 months ago
DT = 0.05
2 months ago
2 months ago
def process_messages(mocker, estimator, lag_frames, n_frames, vego=20.0, rejection_threshold=0.0):
2 months ago
class ZeroMock(mocker.Mock):
def __getattr__(self, *args):
return 0
for i in range(n_frames):
2 months ago
t = i * estimator.dt
2 months ago
desired_la = np.cos(t)
actual_la = np.cos(t - lag_frames * estimator.dt)
# if sample is masked out, set it to desired value (no lag)
rejected = random.uniform(0, 1) < rejection_threshold
if rejected:
actual_la = desired_la
desired_cuvature = desired_la / (vego ** 2)
actual_yr = actual_la / vego
2 months ago
msgs = [
2 months ago
(t, "carControl", mocker.Mock(latActive=not rejected)),
2 months ago
(t, "carState", mocker.Mock(vEgo=vego, steeringPressed=False)),
(t, "controlsState", mocker.Mock(desiredCurvature=desired_cuvature,
lateralControlState=mocker.Mock(which=mocker.Mock(return_value='debugControlState'), debugControlState=ZeroMock()))),
(t, "livePose", mocker.Mock(orientationNED=ZeroMock(),
velocityDevice=ZeroMock(),
accelerationDevice=ZeroMock(),
angularVelocityDevice=ZeroMock(z=actual_yr))),
]
for t, w, m in msgs:
estimator.handle_log(t, w, m)
estimator.update_points()
estimator.update_estimate()
2 months ago
class TestLagd:
def test_read_saved_params(self):
params = Params()
lr = migrate(LogReader(TEST_ROUTE), [migrate_carParams])
CP = next(m for m in lr if m.which() == "carParams").carParams
msg = messaging.new_message('liveDelay')
msg.liveDelay.lateralDelayEstimate = random.random()
msg.liveDelay.validBlocks = random.randint(1, 10)
params.put("LiveDelay", msg.to_bytes())
2 months ago
params.put("CarParamsPrevRoute", CP.as_builder().to_bytes())
saved_lag_params = retrieve_initial_lag(params, CP)
assert saved_lag_params is not None
lag, valid_blocks = saved_lag_params
assert lag == msg.liveDelay.lateralDelayEstimate
assert valid_blocks == msg.liveDelay.validBlocks
def test_ncc(self):
2 months ago
lag_frames = random.randint(1, 19)
2 months ago
desired_sig = np.sin(np.arange(0.0, 10.0, 0.1))
actual_sig = np.sin(np.arange(0.0, 10.0, 0.1) - lag_frames * 0.1)
mask = np.ones(len(desired_sig), dtype=bool)
corr = masked_normalized_cross_correlation(desired_sig, actual_sig, mask, 200)[len(desired_sig) - 1:len(desired_sig) + 20]
assert np.argmax(corr) == lag_frames
# add some noise
desired_sig += np.random.normal(0, 0.05, len(desired_sig))
actual_sig += np.random.normal(0, 0.05, len(actual_sig))
corr = masked_normalized_cross_correlation(desired_sig, actual_sig, mask, 200)[len(desired_sig) - 1:len(desired_sig) + 20]
assert np.argmax(corr) in range(lag_frames - MAX_ERR_FRAMES, lag_frames + MAX_ERR_FRAMES + 1)
2 months ago
# mask out 40% of the values, and make them noise
mask = np.random.choice([True, False], size=len(desired_sig), p=[0.6, 0.4])
desired_sig[~mask] = np.random.normal(0, 1, size=np.sum(~mask))
actual_sig[~mask] = np.random.normal(0, 1, size=np.sum(~mask))
corr = masked_normalized_cross_correlation(desired_sig, actual_sig, mask, 200)[len(desired_sig) - 1:len(desired_sig) + 20]
assert np.argmax(corr) in range(lag_frames - MAX_ERR_FRAMES, lag_frames + MAX_ERR_FRAMES + 1)
2 months ago
def test_empty_estimator(self, mocker):
mocked_CP = mocker.Mock(steerActuatorDelay=0.8)
2 months ago
estimator = LateralLagEstimator(mocked_CP, DT)
2 months ago
msg = estimator.get_msg(True)
assert msg.liveDelay.status == 'unestimated'
2 months ago
assert np.allclose(msg.liveDelay.lateralDelay, estimator.initial_lag)
assert np.allclose(msg.liveDelay.lateralDelayEstimate, estimator.initial_lag)
assert msg.liveDelay.validBlocks == 0
def test_estimator_basics(self, mocker, subtests):
for lag_frames in range(5):
with subtests.test(msg=f"lag_frames={lag_frames}"):
mocked_CP = mocker.Mock(steerActuatorDelay=0.8)
estimator = LateralLagEstimator(mocked_CP, DT, min_recovery_buffer_sec=0.0, min_yr=0.0)
process_messages(mocker, estimator, lag_frames, int(MIN_OKAY_WINDOW_SEC / DT) + BLOCK_NUM_NEEDED * BLOCK_SIZE)
msg = estimator.get_msg(True)
assert msg.liveDelay.status == 'estimated'
assert np.allclose(msg.liveDelay.lateralDelay, lag_frames * DT, atol=0.01)
assert np.allclose(msg.liveDelay.lateralDelayEstimate, lag_frames * DT, atol=0.01)
assert msg.liveDelay.validBlocks == BLOCK_NUM_NEEDED
2 months ago
2 months ago
def test_estimator_masking(self, mocker):
mocked_CP, lag_frames = mocker.Mock(steerActuatorDelay=0.8), random.randint(1, 19)
estimator = LateralLagEstimator(mocked_CP, DT, min_recovery_buffer_sec=0.0, min_yr=0.0, min_valid_block_count=1)
process_messages(mocker, estimator, lag_frames, (int(MIN_OKAY_WINDOW_SEC / DT) + BLOCK_SIZE) * 2, rejection_threshold=0.4)
msg = estimator.get_msg(True)
assert np.allclose(msg.liveDelay.lateralDelayEstimate, lag_frames * DT, atol=0.01)
2 months ago
@pytest.mark.skipif(PC, reason="only on device")
2 months ago
@pytest.mark.timeout(60)
2 months ago
def test_estimator_performance(self, mocker):
2 months ago
mocked_CP = mocker.Mock(steerActuatorDelay=0.8)
2 months ago
estimator = LateralLagEstimator(mocked_CP, DT)
2 months ago
ds = []
2 months ago
for _ in range(1000):
2 months ago
st = time.perf_counter()
estimator.update_points()
estimator.update_estimate()
d = time.perf_counter() - st
ds.append(d)
2 months ago
assert np.mean(ds) < DT