Proof of concept

pull/34531/head
Kacper Rączy 4 months ago
parent 72dcd59501
commit 176e539648
  1. 69
      selfdrive/locationd/torqued.py

@ -5,7 +5,7 @@ from collections import deque, defaultdict
import cereal.messaging as messaging
from cereal import car, log
from openpilot.common.params import Params
from openpilot.common.realtime import config_realtime_process, DT_MDL
from openpilot.common.realtime import config_realtime_process, DT_MDL, DT_CTRL
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.common.swaglog import cloudlog
from openpilot.selfdrive.controls.lib.vehicle_model import ACCELERATION_DUE_TO_GRAVITY
@ -41,6 +41,73 @@ def slope2rot(slope):
return np.array([[cos, -sin], [sin, cos]])
MAX_SANE_LAG = 3.0
MIN_HIST_LEN_SEC = 20
MAX_HIST_LEN_SEC = 120
class LagEstimator(ParameterEstimator):
def __init__(self, CP):
self.hist_len = int(MIN_HIST_LEN_SEC / DT_CTRL)
self.min_hist_len = int(MIN_HIST_LEN_SEC / DT_CTRL)
self.initial_lag = CP.steerActuatorDelay
self.current_lag = self.initial_lag
self.lat_active = False
self.steering_pressed = False
self.points = defaultdict(lambda: deque(maxlen=self.hist_len))
def correlation_lags(self, sig_len, dt):
return np.arange(-sig_len + 1, 1) * dt
def actuator_delay(self, expected_sig, actual_sig, dt, max_lag=MAX_SANE_LAG):
assert len(expected_sig) == len(actual_sig)
correlations = np.correlate(expected_sig, actual_sig, mode='full')
lags = self.correlation_lags(len(expected_sig), dt)
# only consider negative time shifts within the max_lag
n_frames_max_delay = int(max_lag / dt)
correlations = correlations[len(expected_sig) - n_frames_max_delay:len(expected_sig)]
lags = lags[len(expected_sig) - n_frames_max_delay:len(expected_sig)]
max_corr_index = np.argmax(correlations)
lag, corr = lags[max_corr_index], correlations[max_corr_index]
return lag, corr
def handle_log(self, t, which, msg) -> None:
if which == "carControl":
self.lat_active = msg.carControl.latActive
elif which == "carState":
self.steering_pressed = msg.carState.steeringPressed
elif which == "controlsState":
curvature = msg.controlsState.curvature
desired_curvature = msg.controlsState.desiredCurvature
if self.lat_active and not self.steering_pressed:
self.points['curvature'].append((t, curvature))
self.points['desired_curvature'].append((t, desired_curvature))
def get_msg(self, valid: bool, with_points: bool):
steer_actuation_delay = self.initial_lag
if len(self.points['curvature']) >= self.min_hist_len:
_, curvature = zip(*self.points['curvature'])
_, desired_curvature = zip(*self.points['desired_curvature'])
delay_curvature, _ = self.actuator_delay(curvature, desired_curvature, DT_CTRL)
steer_actuation_delay = (self.current_lag + delay_curvature) / 2.
self.current_lag = steer_actuation_delay
msg = messaging.new_message('liveActuatorDelay')
msg.valid = valid
liveActuatorDelay = msg.liveActuatorDelay
liveActuatorDelay.steerActuatorDelay = steer_actuation_delay
liveActuatorDelay.totalPoints = len(self.points['curvature'])
return msg
class TorqueBuckets(PointBuckets):
def add_point(self, x, y):
for bound_min, bound_max in self.x_bounds:

Loading…
Cancel
Save