#!/usr/bin/env python3 ''' This process finds calibration values. More info on what these calibration values are can be found here https://github.com/commaai/openpilot/tree/master/common/transformations While the roll calibration is a real value that can be estimated, here we assume it's zero, and the image input into the neural network is not corrected for roll. ''' import os import capnp import numpy as np from typing import NoReturn from cereal import log import cereal.messaging as messaging from openpilot.common.conversions import Conversions as CV from openpilot.common.params import Params from openpilot.common.realtime import config_realtime_process from openpilot.common.transformations.orientation import rot_from_euler, euler_from_rot from openpilot.common.swaglog import cloudlog MIN_SPEED_FILTER = 15 * CV.MPH_TO_MS MAX_VEL_ANGLE_STD = np.radians(0.25) MAX_YAW_RATE_FILTER = np.radians(2) # per second MAX_HEIGHT_STD = np.exp(-3.5) # This is at model frequency, blocks needed for efficiency SMOOTH_CYCLES = 10 BLOCK_SIZE = 100 INPUTS_NEEDED = 5 # Minimum blocks needed for valid calibration INPUTS_WANTED = 50 # We want a little bit more than we need for stability MAX_ALLOWED_YAW_SPREAD = np.radians(2) MAX_ALLOWED_PITCH_SPREAD = np.radians(4) RPY_INIT = np.array([0.0,0.0,0.0]) WIDE_FROM_DEVICE_EULER_INIT = np.array([0.0, 0.0, 0.0]) HEIGHT_INIT = np.array([1.22]) # These values are needed to accommodate the model frame in the narrow cam of the C3 PITCH_LIMITS = np.array([-0.09074112085129739, 0.17]) YAW_LIMITS = np.array([-0.06912048084718224, 0.06912048084718235]) DEBUG = os.getenv("DEBUG") is not None def is_calibration_valid(rpy: np.ndarray) -> bool: return (PITCH_LIMITS[0] < rpy[1] < PITCH_LIMITS[1]) and (YAW_LIMITS[0] < rpy[2] < YAW_LIMITS[1]) # type: ignore def sanity_clip(rpy: np.ndarray) -> np.ndarray: if np.isnan(rpy).any(): rpy = RPY_INIT return np.array([rpy[0], np.clip(rpy[1], PITCH_LIMITS[0] - .005, PITCH_LIMITS[1] + .005), np.clip(rpy[2], YAW_LIMITS[0] - .005, YAW_LIMITS[1] + .005)]) def moving_avg_with_linear_decay(prev_mean: np.ndarray, new_val: np.ndarray, idx: int, block_size: float) -> np.ndarray: return (idx*prev_mean + (block_size - idx) * new_val) / block_size class Calibrator: def __init__(self, param_put: bool = False): self.param_put = param_put self.not_car = False # Read saved calibration self.params = Params() calibration_params = self.params.get("CalibrationParams") rpy_init = RPY_INIT wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT height = HEIGHT_INIT valid_blocks = 0 self.cal_status = log.LiveCalibrationData.Status.uncalibrated if param_put and calibration_params: try: with log.Event.from_bytes(calibration_params) as msg: rpy_init = np.array(msg.liveCalibration.rpyCalib) valid_blocks = msg.liveCalibration.validBlocks wide_from_device_euler = np.array(msg.liveCalibration.wideFromDeviceEuler) height = np.array(msg.liveCalibration.height) except Exception: cloudlog.exception("Error reading cached CalibrationParams") self.reset(rpy_init, valid_blocks, wide_from_device_euler, height) self.update_status() def reset(self, rpy_init: np.ndarray = RPY_INIT, valid_blocks: int = 0, wide_from_device_euler_init: np.ndarray = WIDE_FROM_DEVICE_EULER_INIT, height_init: np.ndarray = HEIGHT_INIT, smooth_from: np.ndarray = None) -> None: if not np.isfinite(rpy_init).all(): self.rpy = RPY_INIT.copy() else: self.rpy = rpy_init.copy() if not np.isfinite(height_init).all() or len(height_init) != 1: self.height = HEIGHT_INIT.copy() else: self.height = height_init.copy() if not np.isfinite(wide_from_device_euler_init).all() or len(wide_from_device_euler_init) != 3: self.wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT.copy() else: self.wide_from_device_euler = wide_from_device_euler_init.copy() if not np.isfinite(valid_blocks) or valid_blocks < 0: self.valid_blocks = 0 else: self.valid_blocks = valid_blocks self.rpys = np.tile(self.rpy, (INPUTS_WANTED, 1)) self.wide_from_device_eulers = np.tile(self.wide_from_device_euler, (INPUTS_WANTED, 1)) self.heights = np.tile(self.height, (INPUTS_WANTED, 1)) self.idx = 0 self.block_idx = 0 self.v_ego = 0.0 if smooth_from is None: self.old_rpy = RPY_INIT self.old_rpy_weight = 0.0 else: self.old_rpy = smooth_from self.old_rpy_weight = 1.0 def get_valid_idxs(self) -> list[int]: # exclude current block_idx from validity window before_current = list(range(self.block_idx)) after_current = list(range(min(self.valid_blocks, self.block_idx + 1), self.valid_blocks)) return before_current + after_current def update_status(self) -> None: valid_idxs = self.get_valid_idxs() if valid_idxs: self.wide_from_device_euler = np.mean(self.wide_from_device_eulers[valid_idxs], axis=0) self.height = np.mean(self.heights[valid_idxs], axis=0) rpys = self.rpys[valid_idxs] self.rpy = np.mean(rpys, axis=0) max_rpy_calib = np.array(np.max(rpys, axis=0)) min_rpy_calib = np.array(np.min(rpys, axis=0)) self.calib_spread = np.abs(max_rpy_calib - min_rpy_calib) else: self.calib_spread = np.zeros(3) if self.valid_blocks < INPUTS_NEEDED: if self.cal_status == log.LiveCalibrationData.Status.recalibrating: self.cal_status = log.LiveCalibrationData.Status.recalibrating else: self.cal_status = log.LiveCalibrationData.Status.uncalibrated elif is_calibration_valid(self.rpy): self.cal_status = log.LiveCalibrationData.Status.calibrated else: self.cal_status = log.LiveCalibrationData.Status.invalid # If spread is too high, assume mounting was changed and reset to last block. # Make the transition smooth. Abrupt transitions are not good for feedback loop through supercombo model. # TODO: add height spread check with smooth transition too spread_too_high = self.calib_spread[1] > MAX_ALLOWED_PITCH_SPREAD or self.calib_spread[2] > MAX_ALLOWED_YAW_SPREAD if spread_too_high and self.cal_status == log.LiveCalibrationData.Status.calibrated: self.reset(self.rpys[self.block_idx - 1], valid_blocks=1, smooth_from=self.rpy) self.cal_status = log.LiveCalibrationData.Status.recalibrating write_this_cycle = (self.idx == 0) and (self.block_idx % (INPUTS_WANTED//5) == 5) if self.param_put and write_this_cycle: self.params.put_nonblocking("CalibrationParams", self.get_msg(True).to_bytes()) def handle_v_ego(self, v_ego: float) -> None: self.v_ego = v_ego def get_smooth_rpy(self) -> np.ndarray: if self.old_rpy_weight > 0: return self.old_rpy_weight * self.old_rpy + (1.0 - self.old_rpy_weight) * self.rpy else: return self.rpy def handle_cam_odom(self, trans: list[float], rot: list[float], wide_from_device_euler: list[float], trans_std: list[float], road_transform_trans: list[float], road_transform_trans_std: list[float]) -> np.ndarray | None: self.old_rpy_weight = max(0.0, self.old_rpy_weight - 1/SMOOTH_CYCLES) straight_and_fast = ((self.v_ego > MIN_SPEED_FILTER) and (trans[0] > MIN_SPEED_FILTER) and (abs(rot[2]) < MAX_YAW_RATE_FILTER)) angle_std_threshold = MAX_VEL_ANGLE_STD height_std_threshold = MAX_HEIGHT_STD rpy_certain = np.arctan2(trans_std[1], trans[0]) < angle_std_threshold if len(road_transform_trans_std) == 3: height_certain = road_transform_trans_std[2] < height_std_threshold else: height_certain = True certain_if_calib = (rpy_certain and height_certain) or (self.valid_blocks < INPUTS_NEEDED) if not (straight_and_fast and certain_if_calib): return None observed_rpy = np.array([0, -np.arctan2(trans[2], trans[0]), np.arctan2(trans[1], trans[0])]) new_rpy = euler_from_rot(rot_from_euler(self.get_smooth_rpy()).dot(rot_from_euler(observed_rpy))) new_rpy = sanity_clip(new_rpy) if len(wide_from_device_euler) == 3: new_wide_from_device_euler = np.array(wide_from_device_euler) else: new_wide_from_device_euler = WIDE_FROM_DEVICE_EULER_INIT if (len(road_transform_trans) == 3): new_height = np.array([road_transform_trans[2]]) else: new_height = HEIGHT_INIT self.rpys[self.block_idx] = moving_avg_with_linear_decay(self.rpys[self.block_idx], new_rpy, self.idx, float(BLOCK_SIZE)) self.wide_from_device_eulers[self.block_idx] = moving_avg_with_linear_decay(self.wide_from_device_eulers[self.block_idx], new_wide_from_device_euler, self.idx, float(BLOCK_SIZE)) self.heights[self.block_idx] = moving_avg_with_linear_decay(self.heights[self.block_idx], new_height, self.idx, float(BLOCK_SIZE)) self.idx = (self.idx + 1) % BLOCK_SIZE if self.idx == 0: self.block_idx += 1 self.valid_blocks = max(self.block_idx, self.valid_blocks) self.block_idx = self.block_idx % INPUTS_WANTED self.update_status() return new_rpy def get_msg(self, valid: bool) -> capnp.lib.capnp._DynamicStructBuilder: smooth_rpy = self.get_smooth_rpy() msg = messaging.new_message('liveCalibration') msg.valid = valid liveCalibration = msg.liveCalibration liveCalibration.validBlocks = self.valid_blocks liveCalibration.calStatus = self.cal_status liveCalibration.calPerc = min(100 * (self.valid_blocks * BLOCK_SIZE + self.idx) // (INPUTS_NEEDED * BLOCK_SIZE), 100) liveCalibration.rpyCalib = smooth_rpy.tolist() liveCalibration.rpyCalibSpread = self.calib_spread.tolist() liveCalibration.wideFromDeviceEuler = self.wide_from_device_euler.tolist() liveCalibration.height = self.height.tolist() if self.not_car: liveCalibration.validBlocks = INPUTS_NEEDED liveCalibration.calStatus = log.LiveCalibrationData.Status.calibrated liveCalibration.calPerc = 100. liveCalibration.rpyCalib = [0, 0, 0] liveCalibration.rpyCalibSpread = self.calib_spread.tolist() return msg def send_data(self, pm: messaging.PubMaster, valid: bool) -> None: pm.send('liveCalibration', self.get_msg(valid)) def main() -> NoReturn: config_realtime_process([0, 1, 2, 3], 5) pm = messaging.PubMaster(['liveCalibration']) sm = messaging.SubMaster(['cameraOdometry', 'carState', 'carParams'], poll='cameraOdometry') calibrator = Calibrator(param_put=True) while 1: timeout = 0 if sm.frame == -1 else 100 sm.update(timeout) calibrator.not_car = sm['carParams'].notCar if sm.updated['cameraOdometry']: calibrator.handle_v_ego(sm['carState'].vEgo) new_rpy = calibrator.handle_cam_odom(sm['cameraOdometry'].trans, sm['cameraOdometry'].rot, sm['cameraOdometry'].wideFromDeviceEuler, sm['cameraOdometry'].transStd, sm['cameraOdometry'].roadTransformTrans, sm['cameraOdometry'].roadTransformTransStd) if DEBUG and new_rpy is not None: print('got new rpy', new_rpy) # 4Hz driven by cameraOdometry if sm.frame % 5 == 0: calibrator.send_data(pm, sm.all_checks()) if __name__ == "__main__": main()