You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							299 lines
						
					
					
						
							13 KiB
						
					
					
				
			
		
		
	
	
							299 lines
						
					
					
						
							13 KiB
						
					
					
				| #!/usr/bin/env python3
 | |
| import os
 | |
| import json
 | |
| import numpy as np
 | |
| import capnp
 | |
| 
 | |
| import cereal.messaging as messaging
 | |
| from cereal import car, log
 | |
| from openpilot.common.params import Params
 | |
| from openpilot.common.realtime import config_realtime_process, DT_MDL
 | |
| from openpilot.selfdrive.locationd.models.car_kf import CarKalman, ObservationKind, States
 | |
| from openpilot.selfdrive.locationd.models.constants import GENERATED_DIR
 | |
| from openpilot.selfdrive.locationd.helpers import PoseCalibrator, Pose
 | |
| from openpilot.common.swaglog import cloudlog
 | |
| 
 | |
| MAX_ANGLE_OFFSET_DELTA = 20 * DT_MDL  # Max 20 deg/s
 | |
| ROLL_MAX_DELTA = np.radians(20.0) * DT_MDL  # 20deg in 1 second is well within curvature limits
 | |
| ROLL_MIN, ROLL_MAX = np.radians(-10), np.radians(10)
 | |
| ROLL_LOWERED_MAX = np.radians(8)
 | |
| ROLL_STD_MAX = np.radians(1.5)
 | |
| LATERAL_ACC_SENSOR_THRESHOLD = 4.0
 | |
| OFFSET_MAX = 10.0
 | |
| OFFSET_LOWERED_MAX = 8.0
 | |
| MIN_ACTIVE_SPEED = 1.0
 | |
| LOW_ACTIVE_SPEED = 10.0
 | |
| 
 | |
| 
 | |
| class VehicleParamsLearner:
 | |
|   def __init__(self, CP: car.CarParams, steer_ratio: float, stiffness_factor: float, angle_offset: float, P_initial: np.ndarray | None = None):
 | |
|     self.kf = CarKalman(GENERATED_DIR)
 | |
| 
 | |
|     self.x_initial = CarKalman.initial_x.copy()
 | |
|     self.x_initial[States.STEER_RATIO] = steer_ratio
 | |
|     self.x_initial[States.STIFFNESS] = stiffness_factor
 | |
|     self.x_initial[States.ANGLE_OFFSET] = angle_offset
 | |
|     self.P_initial = P_initial if P_initial is not None else CarKalman.P_initial
 | |
| 
 | |
|     self.kf.set_globals(
 | |
|       mass=CP.mass,
 | |
|       rotational_inertia=CP.rotationalInertia,
 | |
|       center_to_front=CP.centerToFront,
 | |
|       center_to_rear=CP.wheelbase - CP.centerToFront,
 | |
|       stiffness_front=CP.tireStiffnessFront,
 | |
|       stiffness_rear=CP.tireStiffnessRear
 | |
|     )
 | |
| 
 | |
|     self.min_sr, self.max_sr = 0.5 * CP.steerRatio, 2.0 * CP.steerRatio
 | |
| 
 | |
|     self.calibrator = PoseCalibrator()
 | |
| 
 | |
|     self.observed_speed = 0.0
 | |
|     self.observed_yaw_rate = 0.0
 | |
|     self.observed_roll = 0.0
 | |
| 
 | |
|     self.avg_offset_valid = True
 | |
|     self.total_offset_valid = True
 | |
|     self.roll_valid = True
 | |
| 
 | |
|     self.reset(None)
 | |
| 
 | |
|   def reset(self, t: float | None):
 | |
|     self.kf.init_state(self.x_initial, covs=self.P_initial, filter_time=t)
 | |
| 
 | |
|     self.angle_offset, self.roll, self.active = np.degrees(self.x_initial[States.ANGLE_OFFSET].item()), 0.0, False
 | |
|     self.avg_angle_offset = self.angle_offset
 | |
| 
 | |
|   def handle_log(self, t: float, which: str, msg: capnp._DynamicStructReader):
 | |
|     if which == 'livePose':
 | |
|       device_pose = Pose.from_live_pose(msg)
 | |
|       calibrated_pose = self.calibrator.build_calibrated_pose(device_pose)
 | |
| 
 | |
|       yaw_rate, yaw_rate_std = calibrated_pose.angular_velocity.z, calibrated_pose.angular_velocity.z_std
 | |
|       yaw_rate_valid = msg.angularVelocityDevice.valid
 | |
|       yaw_rate_valid = yaw_rate_valid and 0 < yaw_rate_std < 10  # rad/s
 | |
|       yaw_rate_valid = yaw_rate_valid and abs(yaw_rate) < 1  # rad/s
 | |
|       if not yaw_rate_valid:
 | |
|         # This is done to bound the yaw rate estimate when localizer values are invalid or calibrating
 | |
|         yaw_rate, yaw_rate_std = 0.0, np.radians(10.0)
 | |
|       self.observed_yaw_rate = yaw_rate
 | |
| 
 | |
|       localizer_roll, localizer_roll_std = device_pose.orientation.x, device_pose.orientation.x_std
 | |
|       localizer_roll_std = np.radians(1) if np.isnan(localizer_roll_std) else localizer_roll_std
 | |
|       roll_valid = (localizer_roll_std < ROLL_STD_MAX) and (ROLL_MIN < localizer_roll < ROLL_MAX) and msg.sensorsOK
 | |
|       if roll_valid:
 | |
|         roll = localizer_roll
 | |
|         # Experimentally found multiplier of 2 to be best trade-off between stability and accuracy or similar?
 | |
|         roll_std = 2 * localizer_roll_std
 | |
|       else:
 | |
|         # This is done to bound the road roll estimate when localizer values are invalid
 | |
|         roll = 0.0
 | |
|         roll_std = np.radians(10.0)
 | |
|       self.observed_roll = np.clip(roll, self.observed_roll - ROLL_MAX_DELTA, self.observed_roll + ROLL_MAX_DELTA)
 | |
| 
 | |
|       if self.active:
 | |
|         if msg.posenetOK:
 | |
|           self.kf.predict_and_observe(t,
 | |
|                                       ObservationKind.ROAD_FRAME_YAW_RATE,
 | |
|                                       np.array([[-self.observed_yaw_rate]]),
 | |
|                                       np.array([np.atleast_2d(yaw_rate_std**2)]))
 | |
| 
 | |
|           self.kf.predict_and_observe(t,
 | |
|                                       ObservationKind.ROAD_ROLL,
 | |
|                                       np.array([[self.observed_roll]]),
 | |
|                                       np.array([np.atleast_2d(roll_std**2)]))
 | |
|         self.kf.predict_and_observe(t, ObservationKind.ANGLE_OFFSET_FAST, np.array([[0]]))
 | |
| 
 | |
|         # We observe the current stiffness and steer ratio (with a high observation noise) to bound
 | |
|         # the respective estimate STD. Otherwise the STDs keep increasing, causing rapid changes in the
 | |
|         # states in longer routes (especially straight stretches).
 | |
|         stiffness = float(self.kf.x[States.STIFFNESS].item())
 | |
|         steer_ratio = float(self.kf.x[States.STEER_RATIO].item())
 | |
|         self.kf.predict_and_observe(t, ObservationKind.STIFFNESS, np.array([[stiffness]]))
 | |
|         self.kf.predict_and_observe(t, ObservationKind.STEER_RATIO, np.array([[steer_ratio]]))
 | |
| 
 | |
|     elif which == 'liveCalibration':
 | |
|       self.calibrator.feed_live_calib(msg)
 | |
| 
 | |
|     elif which == 'carState':
 | |
|       steering_angle = msg.steeringAngleDeg
 | |
| 
 | |
|       in_linear_region = abs(steering_angle) < 45
 | |
|       self.observed_speed = msg.vEgo
 | |
|       self.active = self.observed_speed > MIN_ACTIVE_SPEED and in_linear_region
 | |
| 
 | |
|       if self.active:
 | |
|         self.kf.predict_and_observe(t, ObservationKind.STEER_ANGLE, np.array([[np.radians(steering_angle)]]))
 | |
|         self.kf.predict_and_observe(t, ObservationKind.ROAD_FRAME_X_SPEED, np.array([[self.observed_speed]]))
 | |
| 
 | |
|     if not self.active:
 | |
|       # Reset time when stopped so uncertainty doesn't grow
 | |
|       self.kf.filter.set_filter_time(t)  # type: ignore
 | |
|       self.kf.filter.reset_rewind()      # type: ignore
 | |
| 
 | |
|   def get_msg(self, valid: bool, debug: bool = False) -> capnp._DynamicStructBuilder:
 | |
|     x = self.kf.x
 | |
|     P = np.sqrt(self.kf.P.diagonal())
 | |
|     if not np.all(np.isfinite(x)):
 | |
|       cloudlog.error("NaN in liveParameters estimate. Resetting to default values")
 | |
|       self.reset(self.kf.t)
 | |
|       x = self.kf.x
 | |
| 
 | |
|     self.avg_angle_offset = np.clip(np.degrees(x[States.ANGLE_OFFSET].item()),
 | |
|                                 self.avg_angle_offset - MAX_ANGLE_OFFSET_DELTA, self.avg_angle_offset + MAX_ANGLE_OFFSET_DELTA)
 | |
|     self.angle_offset = np.clip(np.degrees(x[States.ANGLE_OFFSET].item() + x[States.ANGLE_OFFSET_FAST].item()),
 | |
|                         self.angle_offset - MAX_ANGLE_OFFSET_DELTA, self.angle_offset + MAX_ANGLE_OFFSET_DELTA)
 | |
|     self.roll = np.clip(float(x[States.ROAD_ROLL].item()), self.roll - ROLL_MAX_DELTA, self.roll + ROLL_MAX_DELTA)
 | |
|     roll_std = float(P[States.ROAD_ROLL].item())
 | |
|     if self.active and self.observed_speed > LOW_ACTIVE_SPEED:
 | |
|       # Account for the opposite signs of the yaw rates
 | |
|       # At low speeds, bumping into a curb can cause the yaw rate to be very high
 | |
|       sensors_valid = bool(abs(self.observed_speed * (x[States.YAW_RATE].item() + self.observed_yaw_rate)) < LATERAL_ACC_SENSOR_THRESHOLD)
 | |
|     else:
 | |
|       sensors_valid = True
 | |
|     self.avg_offset_valid = check_valid_with_hysteresis(self.avg_offset_valid, self.avg_angle_offset, OFFSET_MAX, OFFSET_LOWERED_MAX)
 | |
|     self.total_offset_valid = check_valid_with_hysteresis(self.total_offset_valid, self.angle_offset, OFFSET_MAX, OFFSET_LOWERED_MAX)
 | |
|     self.roll_valid = check_valid_with_hysteresis(self.roll_valid, self.roll, ROLL_MAX, ROLL_LOWERED_MAX)
 | |
| 
 | |
|     msg = messaging.new_message('liveParameters')
 | |
| 
 | |
|     msg.valid = valid
 | |
| 
 | |
|     liveParameters = msg.liveParameters
 | |
|     liveParameters.posenetValid = True
 | |
|     liveParameters.sensorValid = sensors_valid
 | |
|     liveParameters.steerRatio = float(x[States.STEER_RATIO].item())
 | |
|     liveParameters.stiffnessFactor = float(x[States.STIFFNESS].item())
 | |
|     liveParameters.roll = float(self.roll)
 | |
|     liveParameters.angleOffsetAverageDeg = float(self.avg_angle_offset)
 | |
|     liveParameters.angleOffsetDeg = float(self.angle_offset)
 | |
|     liveParameters.steerRatioValid = self.min_sr <= liveParameters.steerRatio <= self.max_sr
 | |
|     liveParameters.stiffnessFactorValid = 0.2 <= liveParameters.stiffnessFactor <= 5.0
 | |
|     liveParameters.angleOffsetAverageValid = bool(self.avg_offset_valid)
 | |
|     liveParameters.angleOffsetValid = bool(self.total_offset_valid)
 | |
|     liveParameters.valid = all((
 | |
|       liveParameters.angleOffsetAverageValid,
 | |
|       liveParameters.angleOffsetValid ,
 | |
|       self.roll_valid,
 | |
|       roll_std < ROLL_STD_MAX,
 | |
|       liveParameters.stiffnessFactorValid,
 | |
|       liveParameters.steerRatioValid,
 | |
|     ))
 | |
|     liveParameters.steerRatioStd = float(P[States.STEER_RATIO].item())
 | |
|     liveParameters.stiffnessFactorStd = float(P[States.STIFFNESS].item())
 | |
|     liveParameters.angleOffsetAverageStd = float(P[States.ANGLE_OFFSET].item())
 | |
|     liveParameters.angleOffsetFastStd = float(P[States.ANGLE_OFFSET_FAST].item())
 | |
|     if debug:
 | |
|       liveParameters.debugFilterState = log.LiveParametersData.FilterState.new_message()
 | |
|       liveParameters.debugFilterState.value = x.tolist()
 | |
|       liveParameters.debugFilterState.std = P.tolist()
 | |
| 
 | |
|     return msg
 | |
| 
 | |
| 
 | |
| def check_valid_with_hysteresis(current_valid: bool, val: float, threshold: float, lowered_threshold: float):
 | |
|   if current_valid:
 | |
|     current_valid = abs(val) < threshold
 | |
|   else:
 | |
|     current_valid = abs(val) < lowered_threshold
 | |
|   return current_valid
 | |
| 
 | |
| 
 | |
| # TODO: Remove this function after few releases (added in 0.9.9)
 | |
| def migrate_cached_vehicle_params_if_needed(params: Params):
 | |
|   last_parameters_data_old = params.get("LiveParameters")
 | |
|   last_parameters_data = params.get("LiveParametersV2")
 | |
|   if last_parameters_data_old is None or last_parameters_data is not None:
 | |
|     return
 | |
| 
 | |
|   try:
 | |
|     last_parameters_dict = json.loads(last_parameters_data_old)
 | |
|     last_parameters_msg = messaging.new_message('liveParameters')
 | |
|     last_parameters_msg.liveParameters.valid = True
 | |
|     last_parameters_msg.liveParameters.steerRatio = last_parameters_dict['steerRatio']
 | |
|     last_parameters_msg.liveParameters.stiffnessFactor = last_parameters_dict['stiffnessFactor']
 | |
|     last_parameters_msg.liveParameters.angleOffsetAverageDeg = last_parameters_dict['angleOffsetAverageDeg']
 | |
|     params.put("LiveParametersV2", last_parameters_msg.to_bytes())
 | |
|   except Exception as e:
 | |
|     cloudlog.error(f"Failed to perform parameter migration: {e}")
 | |
|     params.remove("LiveParameters")
 | |
| 
 | |
| 
 | |
| def retrieve_initial_vehicle_params(params: Params, CP: car.CarParams, replay: bool, debug: bool):
 | |
|   last_parameters_data = params.get("LiveParametersV2")
 | |
|   last_carparams_data = params.get("CarParamsPrevRoute")
 | |
| 
 | |
|   steer_ratio, stiffness_factor, angle_offset_deg, p_initial = CP.steerRatio, 1.0, 0.0, None
 | |
| 
 | |
|   retrieve_success = False
 | |
|   if last_parameters_data is not None and last_carparams_data is not None:
 | |
|     try:
 | |
|       with log.Event.from_bytes(last_parameters_data) as last_lp_msg, car.CarParams.from_bytes(last_carparams_data) as last_CP:
 | |
|         lp = last_lp_msg.liveParameters
 | |
|         # Check if car model matches
 | |
|         if last_CP.carFingerprint != CP.carFingerprint:
 | |
|           raise Exception("Car model mismatch")
 | |
| 
 | |
|         # Check if starting values are sane
 | |
|         min_sr, max_sr = 0.5 * CP.steerRatio, 2.0 * CP.steerRatio
 | |
|         steer_ratio_sane = min_sr <= lp.steerRatio <= max_sr
 | |
|         if not steer_ratio_sane:
 | |
|           raise Exception(f"Invalid starting values found {lp}")
 | |
| 
 | |
|         initial_filter_std = np.array(lp.debugFilterState.std)
 | |
|         if debug and len(initial_filter_std) != 0:
 | |
|           p_initial = np.diag(initial_filter_std)
 | |
| 
 | |
|         steer_ratio, stiffness_factor, angle_offset_deg = lp.steerRatio, lp.stiffnessFactor, lp.angleOffsetAverageDeg
 | |
|         retrieve_success = True
 | |
|     except Exception as e:
 | |
|       cloudlog.error(f"Failed to retrieve initial values: {e}")
 | |
| 
 | |
|   if not replay:
 | |
|     # When driving in wet conditions the stiffness can go down, and then be too low on the next drive
 | |
|     # Without a way to detect this we have to reset the stiffness every drive
 | |
|     stiffness_factor = 1.0
 | |
| 
 | |
|   if not retrieve_success:
 | |
|     cloudlog.info("Parameter learner resetting to default values")
 | |
| 
 | |
|   return steer_ratio, stiffness_factor, angle_offset_deg, p_initial
 | |
| 
 | |
| 
 | |
| def main():
 | |
|   config_realtime_process([0, 1, 2, 3], 5)
 | |
| 
 | |
|   DEBUG = bool(int(os.getenv("DEBUG", "0")))
 | |
|   REPLAY = bool(int(os.getenv("REPLAY", "0")))
 | |
| 
 | |
|   pm = messaging.PubMaster(['liveParameters'])
 | |
|   sm = messaging.SubMaster(['livePose', 'liveCalibration', 'carState'], poll='livePose')
 | |
| 
 | |
|   params = Params()
 | |
|   CP = messaging.log_from_bytes(params.get("CarParams", block=True), car.CarParams)
 | |
| 
 | |
|   migrate_cached_vehicle_params_if_needed(params)
 | |
| 
 | |
|   steer_ratio, stiffness_factor, angle_offset_deg, pInitial = retrieve_initial_vehicle_params(params, CP, REPLAY, DEBUG)
 | |
|   learner = VehicleParamsLearner(CP, steer_ratio, stiffness_factor, np.radians(angle_offset_deg), pInitial)
 | |
| 
 | |
|   while True:
 | |
|     sm.update()
 | |
|     if sm.all_checks():
 | |
|       for which in sorted(sm.updated.keys(), key=lambda x: sm.logMonoTime[x]):
 | |
|         if sm.updated[which]:
 | |
|           t = sm.logMonoTime[which] * 1e-9
 | |
|           learner.handle_log(t, which, sm[which])
 | |
| 
 | |
|     if sm.updated['livePose']:
 | |
|       msg = learner.get_msg(sm.all_checks(), debug=DEBUG)
 | |
| 
 | |
|       msg_dat = msg.to_bytes()
 | |
|       if sm.frame % 1200 == 0:  # once a minute
 | |
|         params.put_nonblocking("LiveParametersV2", msg_dat)
 | |
| 
 | |
|       pm.send('liveParameters', msg_dat)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|   main()
 | |
| 
 |