#!/usr/bin/env python3 import os import sys import signal import numpy as np from collections import deque, defaultdict import cereal.messaging as messaging from cereal import car, log from common.params import Params from common.realtime import config_realtime_process, DT_MDL from common.filter_simple import FirstOrderFilter from system.swaglog import cloudlog from selfdrive.controls.lib.vehicle_model import ACCELERATION_DUE_TO_GRAVITY HISTORY = 5 # secs POINTS_PER_BUCKET = 1500 MIN_POINTS_TOTAL = 4000 MIN_POINTS_TOTAL_QLOG = 600 FIT_POINTS_TOTAL = 2000 FIT_POINTS_TOTAL_QLOG = 600 MIN_VEL = 15 # m/s FRICTION_FACTOR = 1.5 # ~85% of data coverage FACTOR_SANITY = 0.3 FRICTION_SANITY = 0.5 STEER_MIN_THRESHOLD = 0.02 MIN_FILTER_DECAY = 50 MAX_FILTER_DECAY = 250 LAT_ACC_THRESHOLD = 1 STEER_BUCKET_BOUNDS = [(-0.5, -0.3), (-0.3, -0.2), (-0.2, -0.1), (-0.1, 0), (0, 0.1), (0.1, 0.2), (0.2, 0.3), (0.3, 0.5)] MIN_BUCKET_POINTS = np.array([100, 300, 500, 500, 500, 500, 300, 100]) MAX_RESETS = 5.0 MAX_INVALID_THRESHOLD = 10 MIN_ENGAGE_BUFFER = 2 # secs VERSION = 1 # bump this to invalidate old parameter caches ALLOWED_CARS = ['toyota', 'hyundai'] def slope2rot(slope): sin = np.sqrt(slope**2 / (slope**2 + 1)) cos = np.sqrt(1 / (slope**2 + 1)) return np.array([[cos, -sin], [sin, cos]]) class NPQueue: def __init__(self, maxlen, rowsize): self.maxlen = maxlen self.arr = np.empty((0, rowsize)) def __len__(self): return len(self.arr) def append(self, pt): if len(self.arr) < self.maxlen: self.arr = np.append(self.arr, [pt], axis=0) else: self.arr[:-1] = self.arr[1:] self.arr[-1] = pt class PointBuckets: def __init__(self, x_bounds, min_points, min_points_total): self.x_bounds = x_bounds self.buckets = {bounds: NPQueue(maxlen=POINTS_PER_BUCKET, rowsize=3) for bounds in x_bounds} self.buckets_min_points = {bounds: min_point for bounds, min_point in zip(x_bounds, min_points)} self.min_points_total = min_points_total def bucket_lengths(self): return [len(v) for v in self.buckets.values()] def __len__(self): return sum(self.bucket_lengths()) def is_valid(self): return all(len(v) >= min_pts for v, min_pts in zip(self.buckets.values(), self.buckets_min_points.values())) and (self.__len__() >= self.min_points_total) def add_point(self, x, y): for bound_min, bound_max in self.x_bounds: if (x >= bound_min) and (x < bound_max): self.buckets[(bound_min, bound_max)].append([x, 1.0, y]) break def get_points(self, num_points=None): points = np.vstack([x.arr for x in self.buckets.values()]) if num_points is None: return points return points[np.random.choice(np.arange(len(points)), min(len(points), num_points), replace=False)] def load_points(self, points): for x, y in points: self.add_point(x, y) class TorqueEstimator: def __init__(self, CP, decimated=False): self.hist_len = int(HISTORY / DT_MDL) self.lag = CP.steerActuatorDelay + .2 # from controlsd if decimated: self.min_bucket_points = MIN_BUCKET_POINTS / 10 self.min_points_total = MIN_POINTS_TOTAL_QLOG self.fit_points = FIT_POINTS_TOTAL_QLOG else: self.min_bucket_points = MIN_BUCKET_POINTS self.min_points_total = MIN_POINTS_TOTAL self.fit_points = FIT_POINTS_TOTAL self.offline_friction = 0.0 self.offline_latAccelFactor = 0.0 self.resets = 0.0 self.use_params = CP.carName in ALLOWED_CARS if CP.lateralTuning.which() == 'torque': self.offline_friction = CP.lateralTuning.torque.friction self.offline_latAccelFactor = CP.lateralTuning.torque.latAccelFactor self.reset() initial_params = { 'latAccelFactor': self.offline_latAccelFactor, 'latAccelOffset': 0.0, 'frictionCoefficient': self.offline_friction, 'points': [] } self.decay = MIN_FILTER_DECAY self.min_lataccel_factor = (1.0 - FACTOR_SANITY) * self.offline_latAccelFactor self.max_lataccel_factor = (1.0 + FACTOR_SANITY) * self.offline_latAccelFactor self.min_friction = (1.0 - FRICTION_SANITY) * self.offline_friction self.max_friction = (1.0 + FRICTION_SANITY) * self.offline_friction # try to restore cached params params = Params() params_cache = params.get("LiveTorqueCarParams") torque_cache = params.get("LiveTorqueParameters") if params_cache is not None and torque_cache is not None: try: cache_ltp = log.Event.from_bytes(torque_cache).liveTorqueParameters cache_CP = car.CarParams.from_bytes(params_cache) if self.get_restore_key(cache_CP, cache_ltp.version) == self.get_restore_key(CP, VERSION): if cache_ltp.liveValid: initial_params = { 'latAccelFactor': cache_ltp.latAccelFactorFiltered, 'latAccelOffset': cache_ltp.latAccelOffsetFiltered, 'frictionCoefficient': cache_ltp.frictionCoefficientFiltered } initial_params['points'] = cache_ltp.points self.decay = cache_ltp.decay self.filtered_points.load_points(initial_params['points']) cloudlog.info("restored torque params from cache") except Exception: cloudlog.exception("failed to restore cached torque params") params.remove("LiveTorqueCarParams") params.remove("LiveTorqueParameters") self.filtered_params = {} for param in initial_params: self.filtered_params[param] = FirstOrderFilter(initial_params[param], self.decay, DT_MDL) def get_restore_key(self, CP, version): a, b = None, None if CP.lateralTuning.which() == 'torque': a = CP.lateralTuning.torque.friction b = CP.lateralTuning.torque.latAccelFactor return (CP.carFingerprint, CP.lateralTuning.which(), a, b, version) def reset(self): self.resets += 1.0 self.invalid_values_tracker = 0.0 self.decay = MIN_FILTER_DECAY self.raw_points = defaultdict(lambda: deque(maxlen=self.hist_len)) self.filtered_points = PointBuckets(x_bounds=STEER_BUCKET_BOUNDS, min_points=self.min_bucket_points, min_points_total=self.min_points_total) def estimate_params(self): points = self.filtered_points.get_points(self.fit_points) # total least square solution as both x and y are noisy observations # this is empirically the slope of the hysteresis parallelogram as opposed to the line through the diagonals try: _, _, v = np.linalg.svd(points, full_matrices=False) slope, offset = -v.T[0:2, 2] / v.T[2, 2] _, spread = np.matmul(points[:, [0, 2]], slope2rot(slope)).T friction_coeff = np.std(spread) * FRICTION_FACTOR except np.linalg.LinAlgError as e: cloudlog.exception(f"Error computing live torque params: {e}") slope = offset = friction_coeff = np.nan return slope, offset, friction_coeff def update_params(self, params): self.decay = min(self.decay + DT_MDL, MAX_FILTER_DECAY) for param, value in params.items(): self.filtered_params[param].update(value) self.filtered_params[param].update_alpha(self.decay) def is_sane(self, latAccelFactor, latAccelOffset, friction): if any([val is None or np.isnan(val) for val in [latAccelFactor, latAccelOffset, friction]]): return False return (self.max_friction >= friction >= self.min_friction) and\ (self.max_lataccel_factor >= latAccelFactor >= self.min_lataccel_factor) def handle_log(self, t, which, msg): if which == "carControl": self.raw_points["carControl_t"].append(t + self.lag) self.raw_points["steer_torque"].append(-msg.actuatorsOutput.steer) self.raw_points["active"].append(msg.latActive) elif which == "carState": self.raw_points["carState_t"].append(t + self.lag) self.raw_points["vego"].append(msg.vEgo) self.raw_points["steer_override"].append(msg.steeringPressed) elif which == "liveLocationKalman": if len(self.raw_points['steer_torque']) == self.hist_len: yaw_rate = msg.angularVelocityCalibrated.value[2] roll = msg.orientationNED.value[0] active = np.interp(np.arange(t - MIN_ENGAGE_BUFFER, t, DT_MDL), self.raw_points['carControl_t'], self.raw_points['active']).astype(bool) steer_override = np.interp(np.arange(t - MIN_ENGAGE_BUFFER, t, DT_MDL), self.raw_points['carState_t'], self.raw_points['steer_override']).astype(bool) vego = np.interp(t, self.raw_points['carState_t'], self.raw_points['vego']) steer = np.interp(t, self.raw_points['carControl_t'], self.raw_points['steer_torque']) lateral_acc = (vego * yaw_rate) - (np.sin(roll) * ACCELERATION_DUE_TO_GRAVITY) if all(active) and (not any(steer_override)) and (vego > MIN_VEL) and (abs(steer) > STEER_MIN_THRESHOLD) and (abs(lateral_acc) <= LAT_ACC_THRESHOLD): self.filtered_points.add_point(float(steer), float(lateral_acc)) def get_msg(self, valid=True, with_points=False): msg = messaging.new_message('liveTorqueParameters') msg.valid = valid liveTorqueParameters = msg.liveTorqueParameters liveTorqueParameters.version = VERSION liveTorqueParameters.useParams = self.use_params if self.filtered_points.is_valid(): latAccelFactor, latAccelOffset, friction_coeff = self.estimate_params() liveTorqueParameters.latAccelFactorRaw = float(latAccelFactor) liveTorqueParameters.latAccelOffsetRaw = float(latAccelOffset) liveTorqueParameters.frictionCoefficientRaw = float(friction_coeff) if self.is_sane(latAccelFactor, latAccelOffset, friction_coeff): liveTorqueParameters.liveValid = True self.update_params({'latAccelFactor': latAccelFactor, 'latAccelOffset': latAccelOffset, 'frictionCoefficient': friction_coeff}) self.invalid_values_tracker = max(0.0, self.invalid_values_tracker - 0.5) else: cloudlog.exception("Live torque parameters are outside acceptable bounds.") liveTorqueParameters.liveValid = False self.invalid_values_tracker += 1.0 # Reset when ~10 invalid over 5 secs if self.invalid_values_tracker > MAX_INVALID_THRESHOLD: # Do not reset the filter as it may cause a drastic jump, just reset points self.reset() else: liveTorqueParameters.liveValid = False if with_points: liveTorqueParameters.points = self.filtered_points.get_points()[:, [0, 2]].tolist() liveTorqueParameters.latAccelFactorFiltered = float(self.filtered_params['latAccelFactor'].x) liveTorqueParameters.latAccelOffsetFiltered = float(self.filtered_params['latAccelOffset'].x) liveTorqueParameters.frictionCoefficientFiltered = float(self.filtered_params['frictionCoefficient'].x) liveTorqueParameters.totalBucketPoints = len(self.filtered_points) liveTorqueParameters.decay = self.decay liveTorqueParameters.maxResets = self.resets return msg def main(sm=None, pm=None): config_realtime_process([0, 1, 2, 3], 5) if sm is None: sm = messaging.SubMaster(['carControl', 'carState', 'liveLocationKalman'], poll=['liveLocationKalman']) if pm is None: pm = messaging.PubMaster(['liveTorqueParameters']) params = Params() CP = car.CarParams.from_bytes(params.get("CarParams", block=True)) estimator = TorqueEstimator(CP) def cache_params(sig, frame): signal.signal(sig, signal.SIG_DFL) cloudlog.warning("caching torque params") params = Params() params.put("LiveTorqueCarParams", CP.as_builder().to_bytes()) msg = estimator.get_msg(with_points=True) params.put("LiveTorqueParameters", msg.to_bytes()) sys.exit(0) if "REPLAY" not in os.environ: signal.signal(signal.SIGINT, cache_params) while True: sm.update() if sm.all_checks(): for which in sm.updated.keys(): if sm.updated[which]: t = sm.logMonoTime[which] * 1e-9 estimator.handle_log(t, which, sm[which]) # 4Hz driven by liveLocationKalman if sm.frame % 5 == 0: pm.send('liveTorqueParameters', estimator.get_msg(valid=sm.all_checks())) if __name__ == "__main__": main()