#!/usr/bin/env python3 import os import time import capnp import numpy as np from enum import Enum from collections import defaultdict from cereal import log, messaging from cereal.services import SERVICE_LIST from openpilot.common.transformations.orientation import rot_from_euler from openpilot.common.realtime import config_realtime_process from openpilot.common.params import Params from openpilot.common.swaglog import cloudlog from openpilot.selfdrive.locationd.helpers import rotate_std from openpilot.selfdrive.locationd.models.pose_kf import PoseKalman, States from openpilot.selfdrive.locationd.models.constants import ObservationKind, GENERATED_DIR ACCEL_SANITY_CHECK = 100.0 # m/s^2 ROTATION_SANITY_CHECK = 10.0 # rad/s TRANS_SANITY_CHECK = 200.0 # m/s CALIB_RPY_SANITY_CHECK = 0.5 # rad (+- 30 deg) MIN_STD_SANITY_CHECK = 1e-5 # m or rad MAX_FILTER_REWIND_TIME = 0.8 # s MAX_SENSOR_TIME_DIFF = 0.1 # s YAWRATE_CROSS_ERR_CHECK_FACTOR = 30 INPUT_INVALID_LIMIT = 2.0 # 1 (camodo) / 9 (sensor) bad input[s] ignored INPUT_INVALID_RECOVERY = 10.0 # ~10 secs to resume after exceeding allowed bad inputs by one POSENET_STD_INITIAL_VALUE = 10.0 POSENET_STD_HIST_HALF = 20 def calculate_invalid_input_decay(invalid_limit, recovery_time, frequency): return (1 - 1 / (2 * invalid_limit)) ** (1 / (recovery_time * frequency)) def init_xyz_measurement(measurement: capnp._DynamicStructBuilder, values: np.ndarray, stds: np.ndarray, valid: bool): assert len(values) == len(stds) == 3 measurement.x, measurement.y, measurement.z = map(float, values) measurement.xStd, measurement.yStd, measurement.zStd = map(float, stds) measurement.valid = valid class HandleLogResult(Enum): SUCCESS = 0 TIMING_INVALID = 1 INPUT_INVALID = 2 SENSOR_SOURCE_INVALID = 3 class LocationEstimator: def __init__(self, debug: bool): self.kf = PoseKalman(GENERATED_DIR, MAX_FILTER_REWIND_TIME) self.debug = debug self.posenet_stds = np.array([POSENET_STD_INITIAL_VALUE] * (POSENET_STD_HIST_HALF * 2)) self.car_speed = 0.0 self.camodo_yawrate_distribution = np.array([0.0, 10.0]) # mean, std self.device_from_calib = np.eye(3) obs_kinds = [ObservationKind.PHONE_ACCEL, ObservationKind.PHONE_GYRO, ObservationKind.CAMERA_ODO_ROTATION, ObservationKind.CAMERA_ODO_TRANSLATION] self.observations = {kind: np.zeros(3, dtype=np.float32) for kind in obs_kinds} self.observation_errors = {kind: np.zeros(3, dtype=np.float32) for kind in obs_kinds} def reset(self, t: float, x_initial: np.ndarray = PoseKalman.initial_x, P_initial: np.ndarray = PoseKalman.initial_P): self.kf.init_state(x_initial, covs=P_initial, filter_time=t) def _validate_sensor_source(self, source: log.SensorEventData.SensorSource): # some segments have two IMUs, ignore the second one return source != log.SensorEventData.SensorSource.bmx055 def _validate_sensor_time(self, sensor_time: float, t: float): # ignore empty readings if sensor_time == 0: return False # sensor time and log time should be close sensor_time_invalid = abs(sensor_time - t) > MAX_SENSOR_TIME_DIFF if sensor_time_invalid: cloudlog.warning("Sensor reading ignored, sensor timestamp more than 100ms off from log time") return not sensor_time_invalid def _validate_timestamp(self, t: float): kf_t = self.kf.t invalid = not np.isnan(kf_t) and (kf_t - t) > MAX_FILTER_REWIND_TIME if invalid: cloudlog.warning("Observation timestamp is older than the max rewind threshold of the filter") return not invalid def _finite_check(self, t: float, new_x: np.ndarray, new_P: np.ndarray): all_finite = np.isfinite(new_x).all() and np.isfinite(new_P).all() if not all_finite: cloudlog.error("Non-finite values detected, kalman reset") self.reset(t) def handle_log(self, t: float, which: str, msg: capnp._DynamicStructReader) -> HandleLogResult: new_x, new_P = None, None if which == "accelerometer" and msg.which() == "acceleration": sensor_time = msg.timestamp * 1e-9 if not self._validate_sensor_time(sensor_time, t) or not self._validate_timestamp(sensor_time): return HandleLogResult.TIMING_INVALID if not self._validate_sensor_source(msg.source): return HandleLogResult.SENSOR_SOURCE_INVALID v = msg.acceleration.v meas = np.array([-v[2], -v[1], -v[0]]) if np.linalg.norm(meas) >= ACCEL_SANITY_CHECK: return HandleLogResult.INPUT_INVALID acc_res = self.kf.predict_and_observe(sensor_time, ObservationKind.PHONE_ACCEL, meas) if acc_res is not None: _, new_x, _, new_P, _, _, (acc_err,), _, _ = acc_res self.observation_errors[ObservationKind.PHONE_ACCEL] = np.array(acc_err) self.observations[ObservationKind.PHONE_ACCEL] = meas elif which == "gyroscope" and msg.which() == "gyroUncalibrated": sensor_time = msg.timestamp * 1e-9 if not self._validate_sensor_time(sensor_time, t) or not self._validate_timestamp(sensor_time): return HandleLogResult.TIMING_INVALID if not self._validate_sensor_source(msg.source): return HandleLogResult.SENSOR_SOURCE_INVALID v = msg.gyroUncalibrated.v meas = np.array([-v[2], -v[1], -v[0]]) gyro_bias = self.kf.x[States.GYRO_BIAS] gyro_camodo_yawrate_err = np.abs((meas[2] - gyro_bias[2]) - self.camodo_yawrate_distribution[0]) gyro_camodo_yawrate_err_threshold = YAWRATE_CROSS_ERR_CHECK_FACTOR * self.camodo_yawrate_distribution[1] gyro_valid = gyro_camodo_yawrate_err < gyro_camodo_yawrate_err_threshold if np.linalg.norm(meas) >= ROTATION_SANITY_CHECK or not gyro_valid: return HandleLogResult.INPUT_INVALID gyro_res = self.kf.predict_and_observe(sensor_time, ObservationKind.PHONE_GYRO, meas) if gyro_res is not None: _, new_x, _, new_P, _, _, (gyro_err,), _, _ = gyro_res self.observation_errors[ObservationKind.PHONE_GYRO] = np.array(gyro_err) self.observations[ObservationKind.PHONE_GYRO] = meas elif which == "carState": self.car_speed = abs(msg.vEgo) elif which == "liveCalibration": # Note that we use this message during calibration if len(msg.rpyCalib) > 0: calib = np.array(msg.rpyCalib) if calib.min() < -CALIB_RPY_SANITY_CHECK or calib.max() > CALIB_RPY_SANITY_CHECK: return HandleLogResult.INPUT_INVALID self.device_from_calib = rot_from_euler(calib) elif which == "cameraOdometry": if not self._validate_timestamp(t): return HandleLogResult.TIMING_INVALID rot_device = np.matmul(self.device_from_calib, np.array(msg.rot)) trans_device = np.matmul(self.device_from_calib, np.array(msg.trans)) if np.linalg.norm(rot_device) > ROTATION_SANITY_CHECK or np.linalg.norm(trans_device) > TRANS_SANITY_CHECK: return HandleLogResult.INPUT_INVALID rot_calib_std = np.array(msg.rotStd) trans_calib_std = np.array(msg.transStd) if rot_calib_std.min() <= MIN_STD_SANITY_CHECK or trans_calib_std.min() <= MIN_STD_SANITY_CHECK: return HandleLogResult.INPUT_INVALID if np.linalg.norm(rot_calib_std) > 10 * ROTATION_SANITY_CHECK or np.linalg.norm(trans_calib_std) > 10 * TRANS_SANITY_CHECK: return HandleLogResult.INPUT_INVALID self.posenet_stds = np.roll(self.posenet_stds, -1) self.posenet_stds[-1] = trans_calib_std[0] # Multiply by N to avoid to high certainty in kalman filter because of temporally correlated noise rot_calib_std *= 10 trans_calib_std *= 2 rot_device_std = rotate_std(self.device_from_calib, rot_calib_std) trans_device_std = rotate_std(self.device_from_calib, trans_calib_std) rot_device_noise = rot_device_std ** 2 trans_device_noise = trans_device_std ** 2 cam_odo_rot_res = self.kf.predict_and_observe(t, ObservationKind.CAMERA_ODO_ROTATION, rot_device, np.array([np.diag(rot_device_noise)])) cam_odo_trans_res = self.kf.predict_and_observe(t, ObservationKind.CAMERA_ODO_TRANSLATION, trans_device, np.array([np.diag(trans_device_noise)])) self.camodo_yawrate_distribution = np.array([rot_device[2], rot_device_std[2]]) if cam_odo_rot_res is not None: _, new_x, _, new_P, _, _, (cam_odo_rot_err,), _, _ = cam_odo_rot_res self.observation_errors[ObservationKind.CAMERA_ODO_ROTATION] = np.array(cam_odo_rot_err) self.observations[ObservationKind.CAMERA_ODO_ROTATION] = rot_device if cam_odo_trans_res is not None: _, new_x, _, new_P, _, _, (cam_odo_trans_err,), _, _ = cam_odo_trans_res self.observation_errors[ObservationKind.CAMERA_ODO_TRANSLATION] = np.array(cam_odo_trans_err) self.observations[ObservationKind.CAMERA_ODO_TRANSLATION] = trans_device if new_x is not None and new_P is not None: self._finite_check(t, new_x, new_P) return HandleLogResult.SUCCESS def get_msg(self, sensors_valid: bool, inputs_valid: bool, filter_valid: bool): state, cov = self.kf.x, self.kf.P std = np.sqrt(np.diag(cov)) orientation_ned, orientation_ned_std = state[States.NED_ORIENTATION], std[States.NED_ORIENTATION] velocity_device, velocity_device_std = state[States.DEVICE_VELOCITY], std[States.DEVICE_VELOCITY] angular_velocity_device, angular_velocity_device_std = state[States.ANGULAR_VELOCITY], std[States.ANGULAR_VELOCITY] acceleration_device, acceleration_device_std = state[States.ACCELERATION], std[States.ACCELERATION] msg = messaging.new_message("livePose") msg.valid = filter_valid livePose = msg.livePose init_xyz_measurement(livePose.orientationNED, orientation_ned, orientation_ned_std, filter_valid) init_xyz_measurement(livePose.velocityDevice, velocity_device, velocity_device_std, filter_valid) init_xyz_measurement(livePose.angularVelocityDevice, angular_velocity_device, angular_velocity_device_std, filter_valid) init_xyz_measurement(livePose.accelerationDevice, acceleration_device, acceleration_device_std, filter_valid) if self.debug: livePose.debugFilterState.value = state.tolist() livePose.debugFilterState.std = std.tolist() livePose.debugFilterState.valid = filter_valid livePose.debugFilterState.observations = [ {'kind': k, 'value': self.observations[k].tolist(), 'error': self.observation_errors[k].tolist()} for k in self.observations.keys() ] old_mean = np.mean(self.posenet_stds[:POSENET_STD_HIST_HALF]) new_mean = np.mean(self.posenet_stds[POSENET_STD_HIST_HALF:]) std_spike = (new_mean / old_mean) > 4.0 and new_mean > 7.0 livePose.inputsOK = inputs_valid livePose.posenetOK = not std_spike or self.car_speed <= 5.0 livePose.sensorsOK = sensors_valid return msg def sensor_all_checks(acc_msgs, gyro_msgs, sensor_valid, sensor_recv_time, sensor_alive, simulation): cur_time = time.monotonic() for which, msgs in [("accelerometer", acc_msgs), ("gyroscope", gyro_msgs)]: if len(msgs) > 0: sensor_valid[which] = msgs[-1].valid sensor_recv_time[which] = cur_time if not simulation: sensor_alive[which] = (cur_time - sensor_recv_time[which]) < 0.1 else: sensor_alive[which] = len(msgs) > 0 return all(sensor_alive.values()) and all(sensor_valid.values()) def main(): config_realtime_process([0, 1, 2, 3], 5) DEBUG = bool(int(os.getenv("DEBUG", "0"))) SIMULATION = bool(int(os.getenv("SIMULATION", "0"))) pm = messaging.PubMaster(['livePose']) sm = messaging.SubMaster(['carState', 'liveCalibration', 'cameraOdometry'], poll='cameraOdometry') # separate sensor sockets for efficiency sensor_sockets = [messaging.sub_sock(which, timeout=20) for which in ['accelerometer', 'gyroscope']] sensor_alive, sensor_valid, sensor_recv_time = defaultdict(bool), defaultdict(bool), defaultdict(float) params = Params() estimator = LocationEstimator(DEBUG) filter_initialized = False critcal_services = ["accelerometer", "gyroscope", "cameraOdometry"] observation_input_invalid = defaultdict(int) input_invalid_limit = {s: round(INPUT_INVALID_LIMIT * (SERVICE_LIST[s].frequency / 20.)) for s in critcal_services} input_invalid_threshold = {s: input_invalid_limit[s] - 0.5 for s in critcal_services} input_invalid_decay = {s: calculate_invalid_input_decay(input_invalid_limit[s], INPUT_INVALID_RECOVERY, SERVICE_LIST[s].frequency) for s in critcal_services} initial_pose_data = params.get("LocationFilterInitialState") if initial_pose_data is not None: with log.Event.from_bytes(initial_pose_data) as lp_msg: filter_state = lp_msg.livePose.debugFilterState x_initial = np.array(filter_state.value, dtype=np.float64) if len(filter_state) != 0 else PoseKalman.initial_x P_initial = np.array(filter_state.std, dtype=np.float64) if len(filter_state) != 0 else PoseKalman.initial_P estimator.reset(None, x_initial, P_initial) while True: sm.update() acc_msgs, gyro_msgs = (messaging.drain_sock(sock) for sock in sensor_sockets) if filter_initialized: msgs = [] for msg in acc_msgs + gyro_msgs: t, valid, which, data = msg.logMonoTime, msg.valid, msg.which(), getattr(msg, msg.which()) msgs.append((t, valid, which, data)) for which, updated in sm.updated.items(): if not updated: continue t, valid, data = sm.logMonoTime[which], sm.valid[which], sm[which] msgs.append((t, valid, which, data)) for log_mono_time, valid, which, msg in sorted(msgs, key=lambda x: x[0]): if valid: t = log_mono_time * 1e-9 res = estimator.handle_log(t, which, msg) if which not in critcal_services: continue if res == HandleLogResult.TIMING_INVALID: cloudlog.warning(f"Observation {which} ignored due to failed timing check") observation_input_invalid[which] += 1 elif res == HandleLogResult.INPUT_INVALID: cloudlog.warning(f"Observation {which} ignored due to failed sanity check") observation_input_invalid[which] += 1 elif res == HandleLogResult.SUCCESS: observation_input_invalid[which] *= input_invalid_decay[which] else: filter_initialized = sm.all_checks() and sensor_all_checks(acc_msgs, gyro_msgs, sensor_valid, sensor_recv_time, sensor_alive, SIMULATION) if sm.updated["cameraOdometry"]: critical_service_inputs_valid = all(observation_input_invalid[s] < input_invalid_threshold[s] for s in critcal_services) inputs_valid = sm.all_valid() and critical_service_inputs_valid sensors_valid = sensor_all_checks(acc_msgs, gyro_msgs, sensor_valid, sensor_recv_time, sensor_alive, SIMULATION) msg = estimator.get_msg(sensors_valid, inputs_valid, filter_initialized) pm.send("livePose", msg) if __name__ == "__main__": main()