#!/usr/bin/env python3 import os import signal import numpy as np from collections import deque, defaultdict from functools import partial import cereal.messaging as messaging from cereal import car, log from openpilot.common.params import Params from openpilot.common.realtime import config_realtime_process, DT_MDL from openpilot.common.filter_simple import FirstOrderFilter from openpilot.common.swaglog import cloudlog from openpilot.selfdrive.controls.lib.vehicle_model import ACCELERATION_DUE_TO_GRAVITY from openpilot.selfdrive.locationd.helpers import PointBuckets, ParameterEstimator, cache_points_onexit HISTORY = 5 # secs POINTS_PER_BUCKET = 1500 MIN_POINTS_TOTAL = 4000 MIN_POINTS_TOTAL_QLOG = 600 FIT_POINTS_TOTAL = 2000 FIT_POINTS_TOTAL_QLOG = 600 MIN_VEL = 15 # m/s FRICTION_FACTOR = 1.5 # ~85% of data coverage FACTOR_SANITY = 0.3 FACTOR_SANITY_QLOG = 0.5 FRICTION_SANITY = 0.5 FRICTION_SANITY_QLOG = 0.8 STEER_MIN_THRESHOLD = 0.02 MIN_FILTER_DECAY = 50 MAX_FILTER_DECAY = 250 LAT_ACC_THRESHOLD = 1 STEER_BUCKET_BOUNDS = [(-0.5, -0.3), (-0.3, -0.2), (-0.2, -0.1), (-0.1, 0), (0, 0.1), (0.1, 0.2), (0.2, 0.3), (0.3, 0.5)] MIN_BUCKET_POINTS = np.array([100, 300, 500, 500, 500, 500, 300, 100]) MIN_ENGAGE_BUFFER = 2 # secs VERSION = 1 # bump this to invalidate old parameter caches ALLOWED_CARS = ['toyota', 'hyundai'] def slope2rot(slope): sin = np.sqrt(slope**2 / (slope**2 + 1)) cos = np.sqrt(1 / (slope**2 + 1)) return np.array([[cos, -sin], [sin, cos]]) class TorqueBuckets(PointBuckets): def add_point(self, x, y): for bound_min, bound_max in self.x_bounds: if (x >= bound_min) and (x < bound_max): self.buckets[(bound_min, bound_max)].append([x, 1.0, y]) break class TorqueEstimator(ParameterEstimator): def __init__(self, CP, decimated=False): self.hist_len = int(HISTORY / DT_MDL) self.lag = CP.steerActuatorDelay + .2 # from controlsd if decimated: self.min_bucket_points = MIN_BUCKET_POINTS / 10 self.min_points_total = MIN_POINTS_TOTAL_QLOG self.fit_points = FIT_POINTS_TOTAL_QLOG self.factor_sanity = FACTOR_SANITY_QLOG self.friction_sanity = FRICTION_SANITY_QLOG else: self.min_bucket_points = MIN_BUCKET_POINTS self.min_points_total = MIN_POINTS_TOTAL self.fit_points = FIT_POINTS_TOTAL self.factor_sanity = FACTOR_SANITY self.friction_sanity = FRICTION_SANITY self.offline_friction = 0.0 self.offline_latAccelFactor = 0.0 self.resets = 0.0 self.use_params = CP.carName in ALLOWED_CARS and CP.lateralTuning.which() == 'torque' if CP.lateralTuning.which() == 'torque': self.offline_friction = CP.lateralTuning.torque.friction self.offline_latAccelFactor = CP.lateralTuning.torque.latAccelFactor self.reset() initial_params = { 'latAccelFactor': self.offline_latAccelFactor, 'latAccelOffset': 0.0, 'frictionCoefficient': self.offline_friction, 'points': [] } self.decay = MIN_FILTER_DECAY self.min_lataccel_factor = (1.0 - self.factor_sanity) * self.offline_latAccelFactor self.max_lataccel_factor = (1.0 + self.factor_sanity) * self.offline_latAccelFactor self.min_friction = (1.0 - self.friction_sanity) * self.offline_friction self.max_friction = (1.0 + self.friction_sanity) * self.offline_friction # try to restore cached params params = Params() params_cache = params.get("CarParamsPrevRoute") torque_cache = params.get("LiveTorqueParameters") if params_cache is not None and torque_cache is not None: try: with log.Event.from_bytes(torque_cache) as log_evt: cache_ltp = log_evt.liveTorqueParameters with car.CarParams.from_bytes(params_cache) as msg: cache_CP = msg if self.get_restore_key(cache_CP, cache_ltp.version) == self.get_restore_key(CP, VERSION): if cache_ltp.liveValid: initial_params = { 'latAccelFactor': cache_ltp.latAccelFactorFiltered, 'latAccelOffset': cache_ltp.latAccelOffsetFiltered, 'frictionCoefficient': cache_ltp.frictionCoefficientFiltered } initial_params['points'] = cache_ltp.points self.decay = cache_ltp.decay self.filtered_points.load_points(initial_params['points']) cloudlog.info("restored torque params from cache") except Exception: cloudlog.exception("failed to restore cached torque params") params.remove("LiveTorqueParameters") self.filtered_params = {} for param in initial_params: self.filtered_params[param] = FirstOrderFilter(initial_params[param], self.decay, DT_MDL) def get_restore_key(self, CP, version): a, b = None, None if CP.lateralTuning.which() == 'torque': a = CP.lateralTuning.torque.friction b = CP.lateralTuning.torque.latAccelFactor return (CP.carFingerprint, CP.lateralTuning.which(), a, b, version) def reset(self): self.resets += 1.0 self.decay = MIN_FILTER_DECAY self.raw_points = defaultdict(lambda: deque(maxlen=self.hist_len)) self.filtered_points = TorqueBuckets(x_bounds=STEER_BUCKET_BOUNDS, min_points=self.min_bucket_points, min_points_total=self.min_points_total, points_per_bucket=POINTS_PER_BUCKET, rowsize=3) def estimate_params(self): points = self.filtered_points.get_points(self.fit_points) # total least square solution as both x and y are noisy observations # this is empirically the slope of the hysteresis parallelogram as opposed to the line through the diagonals try: _, _, v = np.linalg.svd(points, full_matrices=False) slope, offset = -v.T[0:2, 2] / v.T[2, 2] _, spread = np.matmul(points[:, [0, 2]], slope2rot(slope)).T friction_coeff = np.std(spread) * FRICTION_FACTOR except np.linalg.LinAlgError as e: cloudlog.exception(f"Error computing live torque params: {e}") slope = offset = friction_coeff = np.nan return slope, offset, friction_coeff def update_params(self, params): self.decay = min(self.decay + DT_MDL, MAX_FILTER_DECAY) for param, value in params.items(): self.filtered_params[param].update(value) self.filtered_params[param].update_alpha(self.decay) def handle_log(self, t, which, msg): if which == "carControl": self.raw_points["carControl_t"].append(t + self.lag) self.raw_points["steer_torque"].append(-msg.actuatorsOutput.steer) self.raw_points["active"].append(msg.latActive) elif which == "carState": self.raw_points["carState_t"].append(t + self.lag) self.raw_points["vego"].append(msg.vEgo) self.raw_points["steer_override"].append(msg.steeringPressed) elif which == "liveLocationKalman": if len(self.raw_points['steer_torque']) == self.hist_len: yaw_rate = msg.angularVelocityCalibrated.value[2] roll = msg.orientationNED.value[0] active = np.interp(np.arange(t - MIN_ENGAGE_BUFFER, t, DT_MDL), self.raw_points['carControl_t'], self.raw_points['active']).astype(bool) steer_override = np.interp(np.arange(t - MIN_ENGAGE_BUFFER, t, DT_MDL), self.raw_points['carState_t'], self.raw_points['steer_override']).astype(bool) vego = np.interp(t, self.raw_points['carState_t'], self.raw_points['vego']) steer = np.interp(t, self.raw_points['carControl_t'], self.raw_points['steer_torque']) lateral_acc = (vego * yaw_rate) - (np.sin(roll) * ACCELERATION_DUE_TO_GRAVITY) if all(active) and (not any(steer_override)) and (vego > MIN_VEL) and (abs(steer) > STEER_MIN_THRESHOLD) and (abs(lateral_acc) <= LAT_ACC_THRESHOLD): self.filtered_points.add_point(float(steer), float(lateral_acc)) def get_msg(self, valid=True, with_points=False): msg = messaging.new_message('liveTorqueParameters') msg.valid = valid liveTorqueParameters = msg.liveTorqueParameters liveTorqueParameters.version = VERSION liveTorqueParameters.useParams = self.use_params if self.filtered_points.is_valid(): latAccelFactor, latAccelOffset, frictionCoeff = self.estimate_params() liveTorqueParameters.latAccelFactorRaw = float(latAccelFactor) liveTorqueParameters.latAccelOffsetRaw = float(latAccelOffset) liveTorqueParameters.frictionCoefficientRaw = float(frictionCoeff) if any(val is None or np.isnan(val) for val in [latAccelFactor, latAccelOffset, frictionCoeff]): cloudlog.exception("Live torque parameters are invalid.") liveTorqueParameters.liveValid = False self.reset() else: liveTorqueParameters.liveValid = True latAccelFactor = np.clip(latAccelFactor, self.min_lataccel_factor, self.max_lataccel_factor) frictionCoeff = np.clip(frictionCoeff, self.min_friction, self.max_friction) self.update_params({'latAccelFactor': latAccelFactor, 'latAccelOffset': latAccelOffset, 'frictionCoefficient': frictionCoeff}) else: liveTorqueParameters.liveValid = False if with_points: liveTorqueParameters.points = self.filtered_points.get_points()[:, [0, 2]].tolist() liveTorqueParameters.latAccelFactorFiltered = float(self.filtered_params['latAccelFactor'].x) liveTorqueParameters.latAccelOffsetFiltered = float(self.filtered_params['latAccelOffset'].x) liveTorqueParameters.frictionCoefficientFiltered = float(self.filtered_params['frictionCoefficient'].x) liveTorqueParameters.totalBucketPoints = len(self.filtered_points) liveTorqueParameters.decay = self.decay liveTorqueParameters.maxResets = self.resets return msg def main(): config_realtime_process([0, 1, 2, 3], 5) pm = messaging.PubMaster(['liveTorqueParameters']) sm = messaging.SubMaster(['carControl', 'carState', 'liveLocationKalman'], poll=['liveLocationKalman']) params = Params() with car.CarParams.from_bytes(params.get("CarParams", block=True)) as CP: estimator = TorqueEstimator(CP) if "REPLAY" not in os.environ: signal.signal(signal.SIGINT, partial(cache_points_onexit, "LiveTorqueParameters", estimator)) while True: sm.update() if sm.all_checks(): for which in sm.updated.keys(): if sm.updated[which]: t = sm.logMonoTime[which] * 1e-9 estimator.handle_log(t, which, sm[which]) # 4Hz driven by liveLocationKalman if sm.frame % 5 == 0: pm.send('liveTorqueParameters', estimator.get_msg(valid=sm.all_checks())) if __name__ == "__main__": main()