@ -3,7 +3,7 @@ import numpy as np
import time
import pytest
from cereal import messaging , log
from cereal import messaging , log , car
from openpilot . selfdrive . locationd . lagd import LateralLagEstimator , retrieve_initial_lag , masked_normalized_cross_correlation , \
BLOCK_NUM_NEEDED , BLOCK_SIZE , MIN_OKAY_WINDOW_SEC
from openpilot . selfdrive . test . process_replay . migration import migrate , migrate_carParams
@ -16,11 +16,7 @@ MAX_ERR_FRAMES = 1
DT = 0.05
def process_messages ( mocker , estimator , lag_frames , n_frames , vego = 20.0 , rejection_threshold = 0.0 ) :
class ZeroMock ( mocker . Mock ) :
def __getattr__ ( self , * args ) :
return 0
def process_messages ( estimator , lag_frames , n_frames , vego = 20.0 , rejection_threshold = 0.0 ) :
for i in range ( n_frames ) :
t = i * estimator . dt
desired_la = np . cos ( 10 * t ) * 0.1
@ -31,19 +27,15 @@ def process_messages(mocker, estimator, lag_frames, n_frames, vego=20.0, rejecti
if rejected :
actual_la = desired_la
desired_cuvature = desired_la / ( vego * * 2 )
actual_yr = actual_la / vego
desired_cuvature = float ( desired_la / ( vego * * 2 ) )
actual_yr = float ( actual_la / vego )
msgs = [
( t , " carControl " , mocker . Mock ( latActive = not rejected ) ) ,
( t , " carState " , mocker . Mock ( vEgo = vego , steeringPressed = False ) ) ,
( t , " controlsState " , mocker . Mock ( desiredCurvature = desired_cuvature ,
lateralControlState = mocker . Mock ( which = mocker . Mock ( return_value = ' debugControlState ' ) , debugControlState = ZeroMock ( ) ) ) ) ,
( t , " livePose " , mocker . Mock ( orientationNED = ZeroMock ( ) ,
velocityDevice = ZeroMock ( ) ,
accelerationDevice = ZeroMock ( ) ,
angularVelocityDevice = ZeroMock ( z = actual_yr , valid = True ) ,
posenetOK = True , inputsOK = True ) ) ,
( t , " liveCalibration " , mocker . Mock ( rpyCalib = [ 0 , 0 , 0 ] , calStatus = log . LiveCalibrationData . Status . calibrated ) ) ,
( t , " carControl " , car . CarControl ( latActive = not rejected ) ) ,
( t , " carState " , car . CarState ( vEgo = vego , steeringPressed = False ) ) ,
( t , " controlsState " , log . ControlsState ( desiredCurvature = desired_cuvature ) ) ,
( t , " livePose " , log . LivePose ( angularVelocityDevice = log . LivePose . XYZMeasurement ( z = actual_yr , valid = True ) ,
posenetOK = True , inputsOK = True ) ) ,
( t , " liveCalibration " , log . LiveCalibrationData ( rpyCalib = [ 0 , 0 , 0 ] , calStatus = log . LiveCalibrationData . Status . calibrated ) ) ,
]
for t , w , m in msgs :
estimator . handle_log ( t , w , m )
@ -94,8 +86,8 @@ class TestLagd:
corr = masked_normalized_cross_correlation ( desired_sig , actual_sig , mask , 200 ) [ len ( desired_sig ) - 1 : len ( desired_sig ) + 20 ]
assert np . argmax ( corr ) in range ( lag_frames - MAX_ERR_FRAMES , lag_frames + MAX_ERR_FRAMES + 1 )
def test_empty_estimator ( self , mocker ) :
mocked_CP = mocker . Mock ( steerActuatorDelay = 0.8 )
def test_empty_estimator ( self ) :
mocked_CP = car . CarParams ( steerActuatorDelay = 0.8 )
estimator = LateralLagEstimator ( mocked_CP , DT )
msg = estimator . get_msg ( True )
assert msg . liveDelay . status == ' unestimated '
@ -103,12 +95,12 @@ class TestLagd:
assert np . allclose ( msg . liveDelay . lateralDelayEstimate , estimator . initial_lag )
assert msg . liveDelay . validBlocks == 0
def test_estimator_basics ( self , mocker , subtests ) :
def test_estimator_basics ( self , subtests ) :
for lag_frames in range ( 5 ) :
with subtests . test ( msg = f " lag_frames= { lag_frames } " ) :
mocked_CP = mocker . Mock ( steerActuatorDelay = 0.8 )
mocked_CP = car . CarParams ( steerActuatorDelay = 0.8 )
estimator = LateralLagEstimator ( mocked_CP , DT , min_recovery_buffer_sec = 0.0 , min_yr = 0.0 )
process_messages ( mocker , estimator , lag_frames , int ( MIN_OKAY_WINDOW_SEC / DT ) + BLOCK_NUM_NEEDED * BLOCK_SIZE )
process_messages ( estimator , lag_frames , int ( MIN_OKAY_WINDOW_SEC / DT ) + BLOCK_NUM_NEEDED * BLOCK_SIZE )
msg = estimator . get_msg ( True )
assert msg . liveDelay . status == ' estimated '
assert np . allclose ( msg . liveDelay . lateralDelay , lag_frames * DT , atol = 0.01 )
@ -116,18 +108,18 @@ class TestLagd:
assert np . allclose ( msg . liveDelay . lateralDelayEstimateStd , 0.0 , atol = 0.01 )
assert msg . liveDelay . validBlocks == BLOCK_NUM_NEEDED
def test_estimator_masking ( self , mocker ) :
mocked_CP , lag_frames = mocker . Mock ( steerActuatorDelay = 0.8 ) , random . randint ( 1 , 19 )
def test_estimator_masking ( self ) :
mocked_CP , lag_frames = car . CarParams ( steerActuatorDelay = 0.8 ) , random . randint ( 1 , 19 )
estimator = LateralLagEstimator ( mocked_CP , DT , min_recovery_buffer_sec = 0.0 , min_yr = 0.0 , min_valid_block_count = 1 )
process_messages ( mocker , estimator , lag_frames , ( int ( MIN_OKAY_WINDOW_SEC / DT ) + BLOCK_SIZE ) * 2 , rejection_threshold = 0.4 )
process_messages ( estimator , lag_frames , ( int ( MIN_OKAY_WINDOW_SEC / DT ) + BLOCK_SIZE ) * 2 , rejection_threshold = 0.4 )
msg = estimator . get_msg ( True )
assert np . allclose ( msg . liveDelay . lateralDelayEstimate , lag_frames * DT , atol = 0.01 )
assert np . allclose ( msg . liveDelay . lateralDelayEstimateStd , 0.0 , atol = 0.01 )
@pytest . mark . skipif ( PC , reason = " only on device " )
@pytest . mark . timeout ( 60 )
def test_estimator_performance ( self , mocker ) :
mocked_CP = mocker . Mock ( steerActuatorDelay = 0.8 )
def test_estimator_performance ( self ) :
mocked_CP = car . CarParams ( steerActuatorDelay = 0.8 )
estimator = LateralLagEstimator ( mocked_CP , DT )
ds = [ ]