Sliding cross corr

pull/34531/head
Kacper Rączy 7 months ago
parent 9adf8740a5
commit 5db10e5acf
  1. 42
      selfdrive/locationd/torqued.py

@ -41,25 +41,29 @@ def slope2rot(slope):
return np.array([[cos, -sin], [sin, cos]])
MIN_LAG_VEL = 15.0
MAX_SANE_LAG = 3.0
MIN_HIST_LEN_SEC = 20
MIN_HIST_LEN_SEC = 30
MAX_HIST_LEN_SEC = 120
MOVING_AVERAGE_WINDOW = 20
MIN_LAG_VEL = 15.0
MAX_LAG_HIST_LEN_SEC = 300
MOVING_CORR_WINDOW = 30
OVERLAP_FACTOR = 0.25
class LagEstimator(ParameterEstimator):
def __init__(self, CP, dt):
self.dt = dt
self.hist_len = int(MAX_HIST_LEN_SEC / self.dt)
self.min_hist_len = int(MIN_HIST_LEN_SEC / self.dt)
self.window_len = int(MOVING_CORR_WINDOW / self.dt)
self.initial_lag = CP.steerActuatorDelay
self.current_lag = self.initial_lag
self.lat_active = False
self.steering_pressed = False
self.v_ego = 0.0
self.lags = deque(maxlen=int(MOVING_AVERAGE_WINDOW / 0.25))
self.points = defaultdict(lambda: deque(maxlen=self.hist_len))
self.lags = deque(maxlen= int(MAX_LAG_HIST_LEN_SEC / (MOVING_CORR_WINDOW * OVERLAP_FACTOR)))
self.curvature = deque(maxlen=int(MAX_HIST_LEN_SEC / self.dt))
self.desired_curvature = deque(maxlen=int(MAX_HIST_LEN_SEC / self.dt))
self.frame = 0
def correlation_lags(self, sig_len, dt):
return np.arange(0, sig_len) * dt
@ -88,31 +92,33 @@ class LagEstimator(ParameterEstimator):
elif which == "controlsState":
curvature = msg.curvature
desired_curvature = msg.desiredCurvature
if self.lat_active and not self.steering_pressed and self.v_ego > MIN_LAG_VEL:
self.points['curvature'].append((t, curvature))
self.points['desired_curvature'].append((t, desired_curvature))
if self.lat_active and not self.steering_pressed:
self.curvature.append((t, curvature))
self.desired_curvature.append((t, desired_curvature))
self.frame += 1
def get_msg(self, valid: bool, with_points: bool):
if len(self.points['curvature']) >= self.min_hist_len and self.v_ego > MIN_LAG_VEL:
_, curvature = zip(*self.points['curvature'])
_, desired_curvature = zip(*self.points['desired_curvature'])
delay_curvature, _ = self.actuator_delay(curvature, desired_curvature, self.dt)
self.lags.append(delay_curvature)
if len(self.curvature) >= self.min_hist_len:
if self.frame % int(self.window_len * OVERLAP_FACTOR) == 0:
_, curvature = zip(*self.curvature)
_, desired_curvature = zip(*self.desired_curvature)
delay_curvature, _ = self.actuator_delay(curvature[-self.window_len:], desired_curvature[-self.window_len:], self.dt)
if delay_curvature != 0.0:
self.lags.append(delay_curvature)
steer_actuation_delay = float(np.mean(self.lags))
else:
steer_actuation_delay = self.initial_lag
self.lags.append(self.initial_lag)
msg = messaging.new_message('liveActuatorDelay')
msg.valid = valid
liveActuatorDelay = msg.liveActuatorDelay
liveActuatorDelay.steerActuatorDelay = steer_actuation_delay
liveActuatorDelay.totalPoints = len(self.points['curvature'])
liveActuatorDelay.totalPoints = len(self.curvature)
if with_points:
liveActuatorDelay.points = [[c, dc] for ((_, c), (_, dc)) in zip(self.points['curvature'], self.points['desired_curvature'])]
liveActuatorDelay.points = [[c, dc] for ((_, c), (_, dc)) in zip(self.curvature, self.desired_curvature)]
return msg

Loading…
Cancel
Save