|  |  |  | #!/usr/bin/env python3
 | 
					
						
							|  |  |  | '''
 | 
					
						
							|  |  |  | This process finds calibration values. More info on what these calibration values
 | 
					
						
							|  |  |  | are can be found here https://github.com/commaai/openpilot/tree/master/common/transformations
 | 
					
						
							|  |  |  | While the roll calibration is a real value that can be estimated, here we assume it's zero,
 | 
					
						
							|  |  |  | and the image input into the neural network is not corrected for roll.
 | 
					
						
							|  |  |  | '''
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import gc
 | 
					
						
							|  |  |  | import os
 | 
					
						
							|  |  |  | import capnp
 | 
					
						
							|  |  |  | import numpy as np
 | 
					
						
							|  |  |  | from typing import NoReturn
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from cereal import log
 | 
					
						
							|  |  |  | import cereal.messaging as messaging
 | 
					
						
							|  |  |  | from openpilot.common.conversions import Conversions as CV
 | 
					
						
							|  |  |  | from openpilot.common.params import Params
 | 
					
						
							|  |  |  | from openpilot.common.realtime import set_realtime_priority
 | 
					
						
							|  |  |  | from openpilot.common.transformations.orientation import rot_from_euler, euler_from_rot
 | 
					
						
							|  |  |  | from openpilot.common.swaglog import cloudlog
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | MIN_SPEED_FILTER = 15 * CV.MPH_TO_MS
 | 
					
						
							|  |  |  | MAX_VEL_ANGLE_STD = np.radians(0.25)
 | 
					
						
							|  |  |  | MAX_YAW_RATE_FILTER = np.radians(2)  # per second
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | MAX_HEIGHT_STD = np.exp(-3.5)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # This is at model frequency, blocks needed for efficiency
 | 
					
						
							|  |  |  | SMOOTH_CYCLES = 10
 | 
					
						
							|  |  |  | BLOCK_SIZE = 100
 | 
					
						
							|  |  |  | INPUTS_NEEDED = 5   # Minimum blocks needed for valid calibration
 | 
					
						
							|  |  |  | INPUTS_WANTED = 50   # We want a little bit more than we need for stability
 | 
					
						
							|  |  |  | MAX_ALLOWED_YAW_SPREAD = np.radians(2)
 | 
					
						
							|  |  |  | MAX_ALLOWED_PITCH_SPREAD = np.radians(4)
 | 
					
						
							|  |  |  | RPY_INIT = np.array([0.0,0.0,0.0])
 | 
					
						
							|  |  |  | WIDE_FROM_DEVICE_EULER_INIT = np.array([0.0, 0.0, 0.0])
 | 
					
						
							|  |  |  | HEIGHT_INIT = np.array([1.22])
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | # These values are needed to accommodate the model frame in the narrow cam of the C3
 | 
					
						
							|  |  |  | PITCH_LIMITS = np.array([-0.09074112085129739, 0.17])
 | 
					
						
							|  |  |  | YAW_LIMITS = np.array([-0.06912048084718224, 0.06912048084718235])
 | 
					
						
							|  |  |  | DEBUG = os.getenv("DEBUG") is not None
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def is_calibration_valid(rpy: np.ndarray) -> bool:
 | 
					
						
							|  |  |  |   return (PITCH_LIMITS[0] < rpy[1] < PITCH_LIMITS[1]) and (YAW_LIMITS[0] < rpy[2] < YAW_LIMITS[1])  # type: ignore
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def sanity_clip(rpy: np.ndarray) -> np.ndarray:
 | 
					
						
							|  |  |  |   if np.isnan(rpy).any():
 | 
					
						
							|  |  |  |     rpy = RPY_INIT
 | 
					
						
							|  |  |  |   return np.array([rpy[0],
 | 
					
						
							|  |  |  |                    np.clip(rpy[1], PITCH_LIMITS[0] - .005, PITCH_LIMITS[1] + .005),
 | 
					
						
							|  |  |  |                    np.clip(rpy[2], YAW_LIMITS[0] - .005, YAW_LIMITS[1] + .005)])
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def moving_avg_with_linear_decay(prev_mean: np.ndarray, new_val: np.ndarray, idx: int, block_size: float) -> np.ndarray:
 | 
					
						
							|  |  |  |   return (idx*prev_mean + (block_size - idx) * new_val) / block_size
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class Calibrator:
 | 
					
						
							|  |  |  |   def __init__(self, param_put: bool = False):
 | 
					
						
							|  |  |  |     self.param_put = param_put
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.not_car = False
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # Read saved calibration
 | 
					
						
							|  |  |  |     self.params = Params()
 | 
					
						
							|  |  |  |     calibration_params = self.params.get("CalibrationParams")
 | 
					
						
							|  |  |  |     rpy_init = RPY_INIT
 | 
					
						
							|  |  |  |     wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT
 | 
					
						
							|  |  |  |     height = HEIGHT_INIT
 | 
					
						
							|  |  |  |     valid_blocks = 0
 | 
					
						
							|  |  |  |     self.cal_status = log.LiveCalibrationData.Status.uncalibrated
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if param_put and calibration_params:
 | 
					
						
							|  |  |  |       try:
 | 
					
						
							|  |  |  |         with log.Event.from_bytes(calibration_params) as msg:
 | 
					
						
							|  |  |  |           rpy_init = np.array(msg.liveCalibration.rpyCalib)
 | 
					
						
							|  |  |  |           valid_blocks = msg.liveCalibration.validBlocks
 | 
					
						
							|  |  |  |           wide_from_device_euler = np.array(msg.liveCalibration.wideFromDeviceEuler)
 | 
					
						
							|  |  |  |           height = np.array(msg.liveCalibration.height)
 | 
					
						
							|  |  |  |       except Exception:
 | 
					
						
							|  |  |  |         cloudlog.exception("Error reading cached CalibrationParams")
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.reset(rpy_init, valid_blocks, wide_from_device_euler, height)
 | 
					
						
							|  |  |  |     self.update_status()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def reset(self, rpy_init: np.ndarray = RPY_INIT,
 | 
					
						
							|  |  |  |                   valid_blocks: int = 0,
 | 
					
						
							|  |  |  |                   wide_from_device_euler_init: np.ndarray = WIDE_FROM_DEVICE_EULER_INIT,
 | 
					
						
							|  |  |  |                   height_init: np.ndarray = HEIGHT_INIT,
 | 
					
						
							|  |  |  |                   smooth_from: np.ndarray = None) -> None:
 | 
					
						
							|  |  |  |     if not np.isfinite(rpy_init).all():
 | 
					
						
							|  |  |  |       self.rpy = RPY_INIT.copy()
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       self.rpy = rpy_init.copy()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if not np.isfinite(height_init).all() or len(height_init) != 1:
 | 
					
						
							|  |  |  |       self.height = HEIGHT_INIT.copy()
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       self.height = height_init.copy()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if not np.isfinite(wide_from_device_euler_init).all() or len(wide_from_device_euler_init) != 3:
 | 
					
						
							|  |  |  |       self.wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT.copy()
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       self.wide_from_device_euler = wide_from_device_euler_init.copy()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if not np.isfinite(valid_blocks) or valid_blocks < 0:
 | 
					
						
							|  |  |  |       self.valid_blocks = 0
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       self.valid_blocks = valid_blocks
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.rpys = np.tile(self.rpy, (INPUTS_WANTED, 1))
 | 
					
						
							|  |  |  |     self.wide_from_device_eulers = np.tile(self.wide_from_device_euler, (INPUTS_WANTED, 1))
 | 
					
						
							|  |  |  |     self.heights = np.tile(self.height, (INPUTS_WANTED, 1))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.idx = 0
 | 
					
						
							|  |  |  |     self.block_idx = 0
 | 
					
						
							|  |  |  |     self.v_ego = 0.0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if smooth_from is None:
 | 
					
						
							|  |  |  |       self.old_rpy = RPY_INIT
 | 
					
						
							|  |  |  |       self.old_rpy_weight = 0.0
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       self.old_rpy = smooth_from
 | 
					
						
							|  |  |  |       self.old_rpy_weight = 1.0
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def get_valid_idxs(self) -> list[int]:
 | 
					
						
							|  |  |  |     # exclude current block_idx from validity window
 | 
					
						
							|  |  |  |     before_current = list(range(self.block_idx))
 | 
					
						
							|  |  |  |     after_current = list(range(min(self.valid_blocks, self.block_idx + 1), self.valid_blocks))
 | 
					
						
							|  |  |  |     return before_current + after_current
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def update_status(self) -> None:
 | 
					
						
							|  |  |  |     valid_idxs = self.get_valid_idxs()
 | 
					
						
							|  |  |  |     if valid_idxs:
 | 
					
						
							|  |  |  |       self.wide_from_device_euler = np.mean(self.wide_from_device_eulers[valid_idxs], axis=0)
 | 
					
						
							|  |  |  |       self.height = np.mean(self.heights[valid_idxs], axis=0)
 | 
					
						
							|  |  |  |       rpys = self.rpys[valid_idxs]
 | 
					
						
							|  |  |  |       self.rpy = np.mean(rpys, axis=0)
 | 
					
						
							|  |  |  |       max_rpy_calib = np.array(np.max(rpys, axis=0))
 | 
					
						
							|  |  |  |       min_rpy_calib = np.array(np.min(rpys, axis=0))
 | 
					
						
							|  |  |  |       self.calib_spread = np.abs(max_rpy_calib - min_rpy_calib)
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       self.calib_spread = np.zeros(3)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if self.valid_blocks < INPUTS_NEEDED:
 | 
					
						
							|  |  |  |       if self.cal_status == log.LiveCalibrationData.Status.recalibrating:
 | 
					
						
							|  |  |  |         self.cal_status = log.LiveCalibrationData.Status.recalibrating
 | 
					
						
							|  |  |  |       else:
 | 
					
						
							|  |  |  |         self.cal_status = log.LiveCalibrationData.Status.uncalibrated
 | 
					
						
							|  |  |  |     elif is_calibration_valid(self.rpy):
 | 
					
						
							|  |  |  |       self.cal_status = log.LiveCalibrationData.Status.calibrated
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       self.cal_status = log.LiveCalibrationData.Status.invalid
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # If spread is too high, assume mounting was changed and reset to last block.
 | 
					
						
							|  |  |  |     # Make the transition smooth. Abrupt transitions are not good for feedback loop through supercombo model.
 | 
					
						
							|  |  |  |     # TODO: add height spread check with smooth transition too
 | 
					
						
							|  |  |  |     spread_too_high = self.calib_spread[1] > MAX_ALLOWED_PITCH_SPREAD or self.calib_spread[2] > MAX_ALLOWED_YAW_SPREAD
 | 
					
						
							|  |  |  |     if spread_too_high and self.cal_status == log.LiveCalibrationData.Status.calibrated:
 | 
					
						
							|  |  |  |       self.reset(self.rpys[self.block_idx - 1], valid_blocks=1, smooth_from=self.rpy)
 | 
					
						
							|  |  |  |       self.cal_status = log.LiveCalibrationData.Status.recalibrating
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     write_this_cycle = (self.idx == 0) and (self.block_idx % (INPUTS_WANTED//5) == 5)
 | 
					
						
							|  |  |  |     if self.param_put and write_this_cycle:
 | 
					
						
							|  |  |  |       self.params.put_nonblocking("CalibrationParams", self.get_msg(True).to_bytes())
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def handle_v_ego(self, v_ego: float) -> None:
 | 
					
						
							|  |  |  |     self.v_ego = v_ego
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def get_smooth_rpy(self) -> np.ndarray:
 | 
					
						
							|  |  |  |     if self.old_rpy_weight > 0:
 | 
					
						
							|  |  |  |       return self.old_rpy_weight * self.old_rpy + (1.0 - self.old_rpy_weight) * self.rpy
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       return self.rpy
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def handle_cam_odom(self, trans: list[float],
 | 
					
						
							|  |  |  |                             rot: list[float],
 | 
					
						
							|  |  |  |                             wide_from_device_euler: list[float],
 | 
					
						
							|  |  |  |                             trans_std: list[float],
 | 
					
						
							|  |  |  |                             road_transform_trans: list[float],
 | 
					
						
							|  |  |  |                             road_transform_trans_std: list[float]) -> np.ndarray | None:
 | 
					
						
							|  |  |  |     self.old_rpy_weight = max(0.0, self.old_rpy_weight - 1/SMOOTH_CYCLES)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     straight_and_fast = ((self.v_ego > MIN_SPEED_FILTER) and (trans[0] > MIN_SPEED_FILTER) and (abs(rot[2]) < MAX_YAW_RATE_FILTER))
 | 
					
						
							|  |  |  |     angle_std_threshold = MAX_VEL_ANGLE_STD
 | 
					
						
							|  |  |  |     height_std_threshold = MAX_HEIGHT_STD
 | 
					
						
							|  |  |  |     rpy_certain = np.arctan2(trans_std[1], trans[0]) < angle_std_threshold
 | 
					
						
							|  |  |  |     if len(road_transform_trans_std) == 3:
 | 
					
						
							|  |  |  |       height_certain = road_transform_trans_std[2] < height_std_threshold
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       height_certain = True
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     certain_if_calib = (rpy_certain and height_certain) or (self.valid_blocks < INPUTS_NEEDED)
 | 
					
						
							|  |  |  |     if not (straight_and_fast and certain_if_calib):
 | 
					
						
							|  |  |  |       return None
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     observed_rpy = np.array([0,
 | 
					
						
							|  |  |  |                              -np.arctan2(trans[2], trans[0]),
 | 
					
						
							|  |  |  |                              np.arctan2(trans[1], trans[0])])
 | 
					
						
							|  |  |  |     new_rpy = euler_from_rot(rot_from_euler(self.get_smooth_rpy()).dot(rot_from_euler(observed_rpy)))
 | 
					
						
							|  |  |  |     new_rpy = sanity_clip(new_rpy)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if len(wide_from_device_euler) == 3:
 | 
					
						
							|  |  |  |       new_wide_from_device_euler = np.array(wide_from_device_euler)
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       new_wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if (len(road_transform_trans) == 3):
 | 
					
						
							|  |  |  |       new_height = np.array([road_transform_trans[2]])
 | 
					
						
							|  |  |  |     else:
 | 
					
						
							|  |  |  |       new_height = HEIGHT_INIT
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.rpys[self.block_idx] = moving_avg_with_linear_decay(self.rpys[self.block_idx], new_rpy, self.idx, float(BLOCK_SIZE))
 | 
					
						
							|  |  |  |     self.wide_from_device_eulers[self.block_idx] = moving_avg_with_linear_decay(self.wide_from_device_eulers[self.block_idx],
 | 
					
						
							|  |  |  |                                                                                 new_wide_from_device_euler, self.idx, float(BLOCK_SIZE))
 | 
					
						
							|  |  |  |     self.heights[self.block_idx] = moving_avg_with_linear_decay(self.heights[self.block_idx], new_height, self.idx, float(BLOCK_SIZE))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.idx = (self.idx + 1) % BLOCK_SIZE
 | 
					
						
							|  |  |  |     if self.idx == 0:
 | 
					
						
							|  |  |  |       self.block_idx += 1
 | 
					
						
							|  |  |  |       self.valid_blocks = max(self.block_idx, self.valid_blocks)
 | 
					
						
							|  |  |  |       self.block_idx = self.block_idx % INPUTS_WANTED
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     self.update_status()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return new_rpy
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def get_msg(self, valid: bool) -> capnp.lib.capnp._DynamicStructBuilder:
 | 
					
						
							|  |  |  |     smooth_rpy = self.get_smooth_rpy()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     msg = messaging.new_message('liveCalibration')
 | 
					
						
							|  |  |  |     msg.valid = valid
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     liveCalibration = msg.liveCalibration
 | 
					
						
							|  |  |  |     liveCalibration.validBlocks = self.valid_blocks
 | 
					
						
							|  |  |  |     liveCalibration.calStatus = self.cal_status
 | 
					
						
							|  |  |  |     liveCalibration.calPerc = min(100 * (self.valid_blocks * BLOCK_SIZE + self.idx) // (INPUTS_NEEDED * BLOCK_SIZE), 100)
 | 
					
						
							|  |  |  |     liveCalibration.rpyCalib = smooth_rpy.tolist()
 | 
					
						
							|  |  |  |     liveCalibration.rpyCalibSpread = self.calib_spread.tolist()
 | 
					
						
							|  |  |  |     liveCalibration.wideFromDeviceEuler = self.wide_from_device_euler.tolist()
 | 
					
						
							|  |  |  |     liveCalibration.height = self.height.tolist()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if self.not_car:
 | 
					
						
							|  |  |  |       liveCalibration.validBlocks = INPUTS_NEEDED
 | 
					
						
							|  |  |  |       liveCalibration.calStatus = log.LiveCalibrationData.Status.calibrated
 | 
					
						
							|  |  |  |       liveCalibration.calPerc = 100.
 | 
					
						
							|  |  |  |       liveCalibration.rpyCalib = [0, 0, 0]
 | 
					
						
							|  |  |  |       liveCalibration.rpyCalibSpread = self.calib_spread.tolist()
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     return msg
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   def send_data(self, pm: messaging.PubMaster, valid: bool) -> None:
 | 
					
						
							|  |  |  |     pm.send('liveCalibration', self.get_msg(valid))
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def main() -> NoReturn:
 | 
					
						
							|  |  |  |   gc.disable()
 | 
					
						
							|  |  |  |   set_realtime_priority(1)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   pm = messaging.PubMaster(['liveCalibration'])
 | 
					
						
							|  |  |  |   sm = messaging.SubMaster(['cameraOdometry', 'carState', 'carParams'], poll='cameraOdometry')
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   calibrator = Calibrator(param_put=True)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |   while 1:
 | 
					
						
							|  |  |  |     timeout = 0 if sm.frame == -1 else 100
 | 
					
						
							|  |  |  |     sm.update(timeout)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     calibrator.not_car = sm['carParams'].notCar
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if sm.updated['cameraOdometry']:
 | 
					
						
							|  |  |  |       calibrator.handle_v_ego(sm['carState'].vEgo)
 | 
					
						
							|  |  |  |       new_rpy = calibrator.handle_cam_odom(sm['cameraOdometry'].trans,
 | 
					
						
							|  |  |  |                                            sm['cameraOdometry'].rot,
 | 
					
						
							|  |  |  |                                            sm['cameraOdometry'].wideFromDeviceEuler,
 | 
					
						
							|  |  |  |                                            sm['cameraOdometry'].transStd,
 | 
					
						
							|  |  |  |                                            sm['cameraOdometry'].roadTransformTrans,
 | 
					
						
							|  |  |  |                                            sm['cameraOdometry'].roadTransformTransStd)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |       if DEBUG and new_rpy is not None:
 | 
					
						
							|  |  |  |         print('got new rpy', new_rpy)
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     # 4Hz driven by cameraOdometry
 | 
					
						
							|  |  |  |     if sm.frame % 5 == 0:
 | 
					
						
							|  |  |  |       calibrator.send_data(pm, sm.all_checks())
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__":
 | 
					
						
							|  |  |  |   main()
 |