@ -9,8 +9,10 @@ from cereal import car, log
from openpilot . common . params import Params
from openpilot . common . realtime import config_realtime_process , DT_MDL
from openpilot . common . numpy_fast import clip
from openpilot . common . transformations . orientation import rot_from_euler
from openpilot . selfdrive . locationd . models . car_kf import CarKalman , ObservationKind , States
from openpilot . selfdrive . locationd . models . constants import GENERATED_DIR
from openpilot . selfdrive . locationd . helpers import rotate_std
from openpilot . common . swaglog import cloudlog
@ -38,6 +40,9 @@ class ParamsLearner:
self . kf . filter . set_global ( " stiffness_rear " , CP . tireStiffnessRear )
self . active = False
self . calibrated = False
self . calib_from_device = np . eye ( 3 )
self . speed = 0.0
self . yaw_rate = 0.0
@ -47,12 +52,16 @@ class ParamsLearner:
self . roll_valid = False
def handle_log ( self , t , which , msg ) :
if which == ' liveLocationKalman ' :
self . yaw_rate = msg . angularVelocityCalibrated . value [ 2 ]
self . yaw_rate_std = msg . angularVelocityCalibrated . std [ 2 ]
if which == ' livePose ' :
angular_velocity_device = np . array ( [ msg . angularVelocityDevice . x , msg . angularVelocityDevice . y , msg . angularVelocityDevice . z ] )
angular_velocity_device_std = np . array ( [ msg . angularVelocityDevice . xStd , msg . angularVelocityDevice . yStd , msg . angularVelocityDevice . zStd ] )
angular_velocity_calibrated = np . matmul ( self . calib_from_device , angular_velocity_device )
angular_velocity_calibrated_std = rotate_std ( self . calib_from_device , angular_velocity_device_std )
self . yaw_rate , self . yaw_rate_std = angular_velocity_calibrated [ 2 ] , angular_velocity_calibrated_std [ 2 ]
localizer_roll = msg . orientationNED . value [ 0 ]
localizer_roll_std = np . radians ( 1 ) if np . isnan ( msg . orientationNED . std [ 0 ] ) else msg . orientationNED . std [ 0 ]
localizer_roll = msg . orientationNED . x
localizer_roll_std = np . radians ( 1 ) if np . isnan ( msg . orientationNED . xStd ) else msg . orientationNED . xStd
self . roll_valid = ( localizer_roll_std < ROLL_STD_MAX ) and ( ROLL_MIN < localizer_roll < ROLL_MAX ) and msg . sensorsOK
if self . roll_valid :
roll = localizer_roll
@ -64,7 +73,7 @@ class ParamsLearner:
roll_std = np . radians ( 10.0 )
self . roll = clip ( roll , self . roll - ROLL_MAX_DELTA , self . roll + ROLL_MAX_DELTA )
yaw_rate_valid = msg . angularVelocityCalibrated . vali d
yaw_rate_valid = msg . angularVelocityDevice . valid and self . calibrate d
yaw_rate_valid = yaw_rate_valid and 0 < self . yaw_rate_std < 10 # rad/s
yaw_rate_valid = yaw_rate_valid and abs ( self . yaw_rate ) < 1 # rad/s
@ -91,6 +100,11 @@ class ParamsLearner:
self . kf . predict_and_observe ( t , ObservationKind . STIFFNESS , np . array ( [ [ stiffness ] ] ) )
self . kf . predict_and_observe ( t , ObservationKind . STEER_RATIO , np . array ( [ [ steer_ratio ] ] ) )
elif which == ' liveCalibration ' :
self . calibrated = msg . calStatus == log . LiveCalibrationData . Status . calibrated
device_from_calib = rot_from_euler ( np . array ( msg . rpyCalib ) )
self . calib_from_device = device_from_calib . T
elif which == ' carState ' :
self . steering_angle = msg . steeringAngleDeg
self . speed = msg . vEgo
@ -123,7 +137,7 @@ def main():
REPLAY = bool ( int ( os . getenv ( " REPLAY " , " 0 " ) ) )
pm = messaging . PubMaster ( [ ' liveParameters ' ] )
sm = messaging . SubMaster ( [ ' liveLocationKalma n ' , ' carState ' ] , poll = ' liveLocationKalman ' )
sm = messaging . SubMaster ( [ ' livePose ' , ' liveCalibratio n ' , ' carState ' ] , poll = ' livePose ' )
params_reader = Params ( )
# wait for stats about the car to come in from controls
@ -188,7 +202,7 @@ def main():
t = sm . logMonoTime [ which ] * 1e-9
learner . handle_log ( t , which , sm [ which ] )
if sm . updated [ ' liveLocationKalman ' ] :
if sm . updated [ ' livePose ' ] :
x = learner . kf . x
P = np . sqrt ( learner . kf . P . diagonal ( ) )
if not all ( map ( math . isfinite , x ) ) :