#!/usr/bin/env python3
import os
import json
import time
import capnp
import numpy as np
from enum import Enum
from collections import defaultdict
from cereal import log , messaging
from cereal . services import SERVICE_LIST
from openpilot . common . transformations . orientation import rot_from_euler
from openpilot . common . realtime import config_realtime_process
from openpilot . common . params import Params
from openpilot . common . swaglog import cloudlog
from openpilot . selfdrive . locationd . helpers import rotate_std
from openpilot . selfdrive . locationd . models . pose_kf import PoseKalman , States
from openpilot . selfdrive . locationd . models . constants import ObservationKind , GENERATED_DIR
ACCEL_SANITY_CHECK = 100.0 # m/s^2
ROTATION_SANITY_CHECK = 10.0 # rad/s
TRANS_SANITY_CHECK = 200.0 # m/s
CALIB_RPY_SANITY_CHECK = 0.5 # rad (+- 30 deg)
MIN_STD_SANITY_CHECK = 1e-5 # m or rad
MAX_FILTER_REWIND_TIME = 0.8 # s
MAX_SENSOR_TIME_DIFF = 0.1 # s
YAWRATE_CROSS_ERR_CHECK_FACTOR = 30
INPUT_INVALID_LIMIT = 2.0 # 1 (camodo) / 9 (sensor) bad input[s] ignored
INPUT_INVALID_RECOVERY = 10.0 # ~10 secs to resume after exceeding allowed bad inputs by one
POSENET_STD_INITIAL_VALUE = 10.0
POSENET_STD_HIST_HALF = 20
def calculate_invalid_input_decay ( invalid_limit , recovery_time , frequency ) :
return ( 1 - 1 / ( 2 * invalid_limit ) ) * * ( 1 / ( recovery_time * frequency ) )
def init_xyz_measurement ( measurement : capnp . _DynamicStructBuilder , values : np . ndarray , stds : np . ndarray , valid : bool ) :
assert len ( values ) == len ( stds ) == 3
measurement . x , measurement . y , measurement . z = map ( float , values )
measurement . xStd , measurement . yStd , measurement . zStd = map ( float , stds )
measurement . valid = valid
class HandleLogResult ( Enum ) :
SUCCESS = 0
TIMING_INVALID = 1
INPUT_INVALID = 2
SENSOR_SOURCE_INVALID = 3
class LocationEstimator :
def __init__ ( self , debug : bool ) :
self . kf = PoseKalman ( GENERATED_DIR , MAX_FILTER_REWIND_TIME )
self . debug = debug
self . posenet_stds = np . array ( [ POSENET_STD_INITIAL_VALUE ] * ( POSENET_STD_HIST_HALF * 2 ) )
self . car_speed = 0.0
self . camodo_yawrate_distribution = np . array ( [ 0.0 , 10.0 ] ) # mean, std
self . device_from_calib = np . eye ( 3 )
obs_kinds = [ ObservationKind . PHONE_ACCEL , ObservationKind . PHONE_GYRO , ObservationKind . CAMERA_ODO_ROTATION , ObservationKind . CAMERA_ODO_TRANSLATION ]
self . observations = { kind : np . zeros ( 3 , dtype = np . float32 ) for kind in obs_kinds }
self . observation_errors = { kind : np . zeros ( 3 , dtype = np . float32 ) for kind in obs_kinds }
def reset ( self , t : float , x_initial : np . ndarray = PoseKalman . initial_x , P_initial : np . ndarray = PoseKalman . initial_P ) :
self . kf . reset ( t , x_initial , P_initial )
def _validate_sensor_source ( self , source : log . SensorEventData . SensorSource ) :
# some segments have two IMUs, ignore the second one
return source != log . SensorEventData . SensorSource . bmx055
def _validate_sensor_time ( self , sensor_time : float , t : float ) :
# ignore empty readings
if sensor_time == 0 :
return False
# sensor time and log time should be close
sensor_time_invalid = abs ( sensor_time - t ) > MAX_SENSOR_TIME_DIFF
if sensor_time_invalid :
cloudlog . warning ( " Sensor reading ignored, sensor timestamp more than 100ms off from log time " )
return not sensor_time_invalid
def _validate_timestamp ( self , t : float ) :
kf_t = self . kf . t
invalid = not np . isnan ( kf_t ) and ( kf_t - t ) > MAX_FILTER_REWIND_TIME
if invalid :
cloudlog . warning ( " Observation timestamp is older than the max rewind threshold of the filter " )
return not invalid
def _finite_check ( self , t : float , new_x : np . ndarray , new_P : np . ndarray ) :
all_finite = np . isfinite ( new_x ) . all ( ) and np . isfinite ( new_P ) . all ( )
if not all_finite :
cloudlog . error ( " Non-finite values detected, kalman reset " )
self . reset ( t )
def handle_log ( self , t : float , which : str , msg : capnp . _DynamicStructReader ) - > HandleLogResult :
new_x , new_P = None , None
if which == " accelerometer " and msg . which ( ) == " acceleration " :
sensor_time = msg . timestamp * 1e-9
if not self . _validate_sensor_time ( sensor_time , t ) or not self . _validate_timestamp ( sensor_time ) :
return HandleLogResult . TIMING_INVALID
if not self . _validate_sensor_source ( msg . source ) :
return HandleLogResult . SENSOR_SOURCE_INVALID
v = msg . acceleration . v
meas = np . array ( [ - v [ 2 ] , - v [ 1 ] , - v [ 0 ] ] )
if np . linalg . norm ( meas ) > = ACCEL_SANITY_CHECK :
return HandleLogResult . INPUT_INVALID
acc_res = self . kf . predict_and_observe ( sensor_time , ObservationKind . PHONE_ACCEL , meas )
if acc_res is not None :
_ , new_x , _ , new_P , _ , _ , ( acc_err , ) , _ , _ = acc_res
self . observation_errors [ ObservationKind . PHONE_ACCEL ] = np . array ( acc_err )
self . observations [ ObservationKind . PHONE_ACCEL ] = meas
elif which == " gyroscope " and msg . which ( ) == " gyroUncalibrated " :
sensor_time = msg . timestamp * 1e-9
if not self . _validate_sensor_time ( sensor_time , t ) or not self . _validate_timestamp ( sensor_time ) :
return HandleLogResult . TIMING_INVALID
if not self . _validate_sensor_source ( msg . source ) :
return HandleLogResult . SENSOR_SOURCE_INVALID
v = msg . gyroUncalibrated . v
meas = np . array ( [ - v [ 2 ] , - v [ 1 ] , - v [ 0 ] ] )
gyro_bias = self . kf . x [ States . GYRO_BIAS ]
gyro_camodo_yawrate_err = np . abs ( ( meas [ 2 ] - gyro_bias [ 2 ] ) - self . camodo_yawrate_distribution [ 0 ] )
gyro_camodo_yawrate_err_threshold = YAWRATE_CROSS_ERR_CHECK_FACTOR * self . camodo_yawrate_distribution [ 1 ]
gyro_valid = gyro_camodo_yawrate_err < gyro_camodo_yawrate_err_threshold
if np . linalg . norm ( meas ) > = ROTATION_SANITY_CHECK or not gyro_valid :
return HandleLogResult . INPUT_INVALID
gyro_res = self . kf . predict_and_observe ( sensor_time , ObservationKind . PHONE_GYRO , meas )
if gyro_res is not None :
_ , new_x , _ , new_P , _ , _ , ( gyro_err , ) , _ , _ = gyro_res
self . observation_errors [ ObservationKind . PHONE_GYRO ] = np . array ( gyro_err )
self . observations [ ObservationKind . PHONE_GYRO ] = meas
elif which == " carState " :
self . car_speed = abs ( msg . vEgo )
elif which == " liveCalibration " :
if len ( msg . rpyCalib ) > 0 :
calib = np . array ( msg . rpyCalib )
if calib . min ( ) < - CALIB_RPY_SANITY_CHECK or calib . max ( ) > CALIB_RPY_SANITY_CHECK :
return HandleLogResult . INPUT_INVALID
self . device_from_calib = rot_from_euler ( calib )
self . calibrated = msg . calStatus == log . LiveCalibrationData . Status . calibrated
elif which == " cameraOdometry " :
if not self . _validate_timestamp ( t ) :
return HandleLogResult . TIMING_INVALID
rot_device = np . matmul ( self . device_from_calib , np . array ( msg . rot ) )
trans_device = np . matmul ( self . device_from_calib , np . array ( msg . trans ) )
if np . linalg . norm ( rot_device ) > ROTATION_SANITY_CHECK or np . linalg . norm ( trans_device ) > TRANS_SANITY_CHECK :
return HandleLogResult . INPUT_INVALID
rot_calib_std = np . array ( msg . rotStd )
trans_calib_std = np . array ( msg . transStd )
if rot_calib_std . min ( ) < = MIN_STD_SANITY_CHECK or trans_calib_std . min ( ) < = MIN_STD_SANITY_CHECK :
return HandleLogResult . INPUT_INVALID
if np . linalg . norm ( rot_calib_std ) > 10 * ROTATION_SANITY_CHECK or np . linalg . norm ( trans_calib_std ) > 10 * TRANS_SANITY_CHECK :
return HandleLogResult . INPUT_INVALID
self . posenet_stds = np . roll ( self . posenet_stds , - 1 )
self . posenet_stds [ - 1 ] = trans_calib_std [ 0 ]
# Multiply by N to avoid to high certainty in kalman filter because of temporally correlated noise
rot_calib_std * = 10
trans_calib_std * = 2
rot_device_std = rotate_std ( self . device_from_calib , rot_calib_std )
trans_device_std = rotate_std ( self . device_from_calib , trans_calib_std )
rot_device_noise = rot_device_std * * 2
trans_device_noise = trans_device_std * * 2
cam_odo_rot_res = self . kf . predict_and_observe ( t , ObservationKind . CAMERA_ODO_ROTATION , rot_device , rot_device_noise )
cam_odo_trans_res = self . kf . predict_and_observe ( t , ObservationKind . CAMERA_ODO_TRANSLATION , trans_device , trans_device_noise )
self . camodo_yawrate_distribution = np . array ( [ rot_device [ 2 ] , rot_device_std [ 2 ] ] )
if cam_odo_rot_res is not None :
_ , new_x , _ , new_P , _ , _ , ( cam_odo_rot_err , ) , _ , _ = cam_odo_rot_res
self . observation_errors [ ObservationKind . CAMERA_ODO_ROTATION ] = np . array ( cam_odo_rot_err )
self . observations [ ObservationKind . CAMERA_ODO_ROTATION ] = rot_device
if cam_odo_trans_res is not None :
_ , new_x , _ , new_P , _ , _ , ( cam_odo_trans_err , ) , _ , _ = cam_odo_trans_res
self . observation_errors [ ObservationKind . CAMERA_ODO_TRANSLATION ] = np . array ( cam_odo_trans_err )
self . observations [ ObservationKind . CAMERA_ODO_TRANSLATION ] = trans_device
if new_x is not None and new_P is not None :
self . _finite_check ( t , new_x , new_P )
return HandleLogResult . SUCCESS
def get_msg ( self , sensors_valid : bool , inputs_valid : bool , filter_valid : bool ) :
state , cov = self . kf . x , self . kf . P
std = np . sqrt ( np . diag ( cov ) )
orientation_ned , orientation_ned_std = state [ States . NED_ORIENTATION ] , std [ States . NED_ORIENTATION ]
velocity_device , velocity_device_std = state [ States . DEVICE_VELOCITY ] , std [ States . DEVICE_VELOCITY ]
angular_velocity_device , angular_velocity_device_std = state [ States . ANGULAR_VELOCITY ] , std [ States . ANGULAR_VELOCITY ]
acceleration_device , acceleration_device_std = state [ States . ACCELERATION ] , std [ States . ACCELERATION ]
msg = messaging . new_message ( " livePose " )
msg . valid = filter_valid
livePose = msg . livePose
init_xyz_measurement ( livePose . orientationNED , orientation_ned , orientation_ned_std , filter_valid )
init_xyz_measurement ( livePose . velocityDevice , velocity_device , velocity_device_std , filter_valid )
init_xyz_measurement ( livePose . angularVelocityDevice , angular_velocity_device , angular_velocity_device_std , filter_valid )
init_xyz_measurement ( livePose . accelerationDevice , acceleration_device , acceleration_device_std , filter_valid )
if self . debug :
livePose . debugFilterState . value = state . tolist ( )
livePose . debugFilterState . std = std . tolist ( )
livePose . debugFilterState . valid = filter_valid
livePose . debugFilterState . observations = [
{ ' kind ' : k , ' value ' : self . observations [ k ] . tolist ( ) , ' error ' : self . observation_errors [ k ] . tolist ( ) }
for k in self . observations . keys ( )
]
old_mean = np . mean ( self . posenet_stds [ : POSENET_STD_HIST_HALF ] )
new_mean = np . mean ( self . posenet_stds [ POSENET_STD_HIST_HALF : ] )
std_spike = ( new_mean / old_mean ) > 4.0 and new_mean > 7.0
livePose . inputsOK = inputs_valid
livePose . posenetOK = not std_spike or self . car_speed < = 5.0
livePose . sensorsOK = sensors_valid
return msg
def sensor_all_checks ( acc_msgs , gyro_msgs , sensor_valid , sensor_recv_time , sensor_alive , simulation ) :
cur_time = time . monotonic ( )
for which , msgs in [ ( " accelerometer " , acc_msgs ) , ( " gyroscope " , gyro_msgs ) ] :
if len ( msgs ) > 0 :
sensor_valid [ which ] = msgs [ - 1 ] . valid
sensor_recv_time [ which ] = cur_time
if not simulation :
sensor_alive [ which ] = ( cur_time - sensor_recv_time [ which ] ) < 0.1
else :
sensor_alive [ which ] = len ( msgs ) > 0
return all ( sensor_alive . values ( ) ) and all ( sensor_valid . values ( ) )
def main ( ) :
config_realtime_process ( [ 0 , 1 , 2 , 3 ] , 5 )
DEBUG = bool ( int ( os . getenv ( " DEBUG " , " 0 " ) ) )
SIMULATION = bool ( int ( os . getenv ( " SIMULATION " , " 0 " ) ) )
pm = messaging . PubMaster ( [ ' livePose ' ] )
sm = messaging . SubMaster ( [ ' carState ' , ' liveCalibration ' , ' cameraOdometry ' ] , poll = ' cameraOdometry ' )
# separate sensor sockets for efficiency
sensor_sockets = [ messaging . sub_sock ( which , timeout = 20 ) for which in [ ' accelerometer ' , ' gyroscope ' ] ]
sensor_alive , sensor_valid , sensor_recv_time = defaultdict ( bool ) , defaultdict ( bool ) , defaultdict ( float )
params = Params ( )
estimator = LocationEstimator ( DEBUG )
filter_initialized = False
critcal_services = [ " accelerometer " , " gyroscope " , " cameraOdometry " ]
observation_input_invalid = defaultdict ( int )
input_invalid_limit = { s : round ( INPUT_INVALID_LIMIT * ( SERVICE_LIST [ s ] . frequency / 20. ) ) for s in critcal_services }
input_invalid_threshold = { s : input_invalid_limit [ s ] - 0.5 for s in critcal_services }
input_invalid_decay = { s : calculate_invalid_input_decay ( input_invalid_limit [ s ] , INPUT_INVALID_RECOVERY , SERVICE_LIST [ s ] . frequency ) for s in critcal_services }
initial_pose = params . get ( " LocationFilterInitialState " )
if initial_pose is not None :
initial_pose = json . loads ( initial_pose )
x_initial = np . array ( initial_pose [ " x " ] , dtype = np . float64 )
P_initial = np . diag ( np . array ( initial_pose [ " P " ] , dtype = np . float64 ) )
estimator . reset ( None , x_initial , P_initial )
while True :
sm . update ( )
acc_msgs , gyro_msgs = ( messaging . drain_sock ( sock ) for sock in sensor_sockets )
if filter_initialized :
msgs = [ ]
for msg in acc_msgs + gyro_msgs :
t , valid , which , data = msg . logMonoTime , msg . valid , msg . which ( ) , getattr ( msg , msg . which ( ) )
msgs . append ( ( t , valid , which , data ) )
for which , updated in sm . updated . items ( ) :
if not updated :
continue
t , valid , data = sm . logMonoTime [ which ] , sm . valid [ which ] , sm [ which ]
msgs . append ( ( t , valid , which , data ) )
for log_mono_time , valid , which , msg in sorted ( msgs , key = lambda x : x [ 0 ] ) :
if valid :
t = log_mono_time * 1e-9
res = estimator . handle_log ( t , which , msg )
if which not in critcal_services :
continue
if res == HandleLogResult . TIMING_INVALID :
cloudlog . warning ( f " Observation { which } ignored due to failed timing check " )
observation_input_invalid [ which ] + = 1
elif res == HandleLogResult . INPUT_INVALID :
cloudlog . warning ( f " Observation { which } ignored due to failed sanity check " )
observation_input_invalid [ which ] + = 1
elif res == HandleLogResult . SUCCESS :
observation_input_invalid [ which ] * = input_invalid_decay [ which ]
else :
filter_initialized = sm . all_checks ( ) and sensor_all_checks ( acc_msgs , gyro_msgs , sensor_valid , sensor_recv_time , sensor_alive , SIMULATION )
if sm . updated [ " cameraOdometry " ] :
critical_service_inputs_valid = all ( observation_input_invalid [ s ] < input_invalid_threshold [ s ] for s in critcal_services )
inputs_valid = sm . all_valid ( ) and critical_service_inputs_valid
sensors_valid = sensor_all_checks ( acc_msgs , gyro_msgs , sensor_valid , sensor_recv_time , sensor_alive , SIMULATION )
msg = estimator . get_msg ( sensors_valid , inputs_valid , filter_initialized )
pm . send ( " livePose " , msg )
if __name__ == " __main__ " :
main ( )