#!/usr/bin/env python3
import numpy as np
from collections import deque
import cereal . messaging as messaging
from cereal import car
from openpilot . common . params import Params
from openpilot . common . realtime import config_realtime_process , DT_CTRL
from openpilot . selfdrive . locationd . helpers import ParameterEstimator
MIN_LAG_VEL = 15.0
MAX_SANE_LAG = 3.0
MIN_HIST_LEN_SEC = 60
MAX_LAG_HIST_LEN_SEC = 300
MOVING_CORR_WINDOW = 60
OVERLAP_FACTOR = 0.25
class LagEstimator ( ParameterEstimator ) :
def __init__ ( self , CP , dt , min_hist_len_sec , max_lag_hist_len_sec , moving_corr_window , overlap_factor ) :
self . dt = dt
self . min_hist_len = int ( min_hist_len_sec / self . dt )
self . window_len = int ( moving_corr_window / self . dt )
self . initial_lag = CP . steerActuatorDelay
self . current_lag = self . initial_lag
self . lat_active = False
self . steering_pressed = False
self . v_ego = 0.0
self . lags = deque ( maxlen = int ( max_lag_hist_len_sec / ( moving_corr_window * overlap_factor ) ) )
self . curvature = deque ( maxlen = int ( moving_corr_window / self . dt ) )
self . desired_curvature = deque ( maxlen = int ( moving_corr_window / self . dt ) )
self . frame = 0
def correlation_lags ( self , sig_len , dt ) :
return np . arange ( 0 , sig_len ) * dt
def actuator_delay ( self , expected_sig , actual_sig , dt , max_lag = MAX_SANE_LAG ) :
assert len ( expected_sig ) == len ( actual_sig )
correlations = np . correlate ( expected_sig , actual_sig , mode = ' full ' )
lags = self . correlation_lags ( len ( expected_sig ) , dt )
# only consider negative time shifts within the max_lag
n_frames_max_delay = int ( max_lag / dt )
correlations = correlations [ len ( expected_sig ) - 1 : len ( expected_sig ) - 1 + n_frames_max_delay ]
lags = lags [ : n_frames_max_delay ]
max_corr_index = np . argmax ( correlations )
lag , corr = lags [ max_corr_index ] , correlations [ max_corr_index ]
return lag , corr
def handle_log ( self , t , which , msg ) - > None :
if which == " carControl " :
self . lat_active = msg . latActive
elif which == " carState " :
self . steering_pressed = msg . steeringPressed
self . v_ego = msg . vEgo
elif which == " controlsState " :
curvature = msg . curvature
desired_curvature = msg . desiredCurvature
if self . lat_active and not self . steering_pressed and self . v_ego > MIN_LAG_VEL :
self . curvature . append ( ( t , curvature ) )
self . desired_curvature . append ( ( t , desired_curvature ) )
self . frame + = 1
def get_msg ( self , valid : bool , with_points : bool ) :
if len ( self . curvature ) > = self . min_hist_len :
if self . frame % int ( self . window_len * OVERLAP_FACTOR ) == 0 :
_ , curvature = zip ( * self . curvature )
_ , desired_curvature = zip ( * self . desired_curvature )
delay_curvature , _ = self . actuator_delay ( curvature , desired_curvature , self . dt )
if delay_curvature != 0.0 :
self . lags . append ( delay_curvature )
# FIXME: this is fragile and ugly, refactor this
if len ( self . lags ) > 0 :
steer_actuation_delay = float ( np . mean ( self . lags ) )
else :
steer_actuation_delay = self . initial_lag
else :
steer_actuation_delay = self . initial_lag
msg = messaging . new_message ( ' liveActuatorDelay ' )
msg . valid = valid
liveActuatorDelay = msg . liveActuatorDelay
liveActuatorDelay . steerActuatorDelay = steer_actuation_delay
liveActuatorDelay . totalPoints = len ( self . curvature )
if with_points :
liveActuatorDelay . points = [ [ c , dc ] for ( ( _ , c ) , ( _ , dc ) ) in zip ( self . curvature , self . desired_curvature ) ]
return msg
def main ( ) :
config_realtime_process ( [ 0 , 1 , 2 , 3 ] , 5 )
pm = messaging . PubMaster ( [ ' liveActuatorDelay ' , ' alertDebug ' ] )
sm = messaging . SubMaster ( [ ' carControl ' , ' carState ' , ' controlsState ' ] , poll = ' controlsState ' )
params = Params ( )
CP = messaging . log_from_bytes ( params . get ( " CarParams " , block = True ) , car . CarParams )
estimator = LagEstimator ( CP , DT_CTRL , MIN_HIST_LEN_SEC , MAX_LAG_HIST_LEN_SEC , MOVING_CORR_WINDOW , OVERLAP_FACTOR )
while True :
sm . update ( )
if sm . all_checks ( ) :
for which in sm . updated . keys ( ) :
if sm . updated [ which ] :
t = sm . logMonoTime [ which ] * 1e-9
estimator . handle_log ( t , which , sm [ which ] )
if sm . frame % 25 == 0 :
msg = estimator . get_msg ( sm . all_checks ( ) , with_points = True )
alert_msg = messaging . new_message ( ' alertDebug ' )
alert_msg . alertDebug . alertText1 = f " Lag estimate (fixed: { CP . steerActuatorDelay : .2f } s) "
alert_msg . alertDebug . alertText2 = f " { msg . liveActuatorDelay . steerActuatorDelay : .2f } s "
pm . send ( ' liveActuatorDelay ' , msg )
pm . send ( ' alertDebug ' , alert_msg )
if __name__ == " __main__ " :
main ( )