#!/usr/bin/env python3
import os
import math
import json
import numpy as np
import cereal . messaging as messaging
from cereal import car , log
from openpilot . common . params import Params
from openpilot . common . realtime import config_realtime_process , DT_MDL
from openpilot . selfdrive . locationd . models . car_kf import CarKalman , ObservationKind , States
from openpilot . selfdrive . locationd . models . constants import GENERATED_DIR
from openpilot . selfdrive . locationd . helpers import PoseCalibrator , Pose
from openpilot . common . swaglog import cloudlog
MAX_ANGLE_OFFSET_DELTA = 20 * DT_MDL # Max 20 deg/s
ROLL_MAX_DELTA = math . radians ( 20.0 ) * DT_MDL # 20deg in 1 second is well within curvature limits
ROLL_MIN , ROLL_MAX = math . radians ( - 10 ) , math . radians ( 10 )
ROLL_LOWERED_MAX = math . radians ( 8 )
ROLL_STD_MAX = math . radians ( 1.5 )
LATERAL_ACC_SENSOR_THRESHOLD = 4.0
OFFSET_MAX = 10.0
OFFSET_LOWERED_MAX = 8.0
MIN_ACTIVE_SPEED = 1.0
LOW_ACTIVE_SPEED = 10.0
class ParamsLearner :
def __init__ ( self , CP , steer_ratio , stiffness_factor , angle_offset , P_initial = None ) :
self . kf = CarKalman ( GENERATED_DIR , steer_ratio , stiffness_factor , angle_offset , P_initial )
self . kf . filter . set_global ( " mass " , CP . mass )
self . kf . filter . set_global ( " rotational_inertia " , CP . rotationalInertia )
self . kf . filter . set_global ( " center_to_front " , CP . centerToFront )
self . kf . filter . set_global ( " center_to_rear " , CP . wheelbase - CP . centerToFront )
self . kf . filter . set_global ( " stiffness_front " , CP . tireStiffnessFront )
self . kf . filter . set_global ( " stiffness_rear " , CP . tireStiffnessRear )
self . active = False
self . calibrator = PoseCalibrator ( )
self . speed = 0.0
self . yaw_rate = 0.0
self . yaw_rate_std = 0.0
self . roll = 0.0
self . steering_angle = 0.0
def handle_log ( self , t , which , msg ) :
if which == ' livePose ' :
device_pose = Pose . from_live_pose ( msg )
calibrated_pose = self . calibrator . build_calibrated_pose ( device_pose )
yaw_rate_valid = msg . angularVelocityDevice . valid and self . calibrator . calib_valid
yaw_rate_valid = yaw_rate_valid and 0 < self . yaw_rate_std < 10 # rad/s
yaw_rate_valid = yaw_rate_valid and abs ( self . yaw_rate ) < 1 # rad/s
if yaw_rate_valid :
self . yaw_rate , self . yaw_rate_std = calibrated_pose . angular_velocity . z , calibrated_pose . angular_velocity . z_std
else :
# This is done to bound the yaw rate estimate when localizer values are invalid or calibrating
self . yaw_rate , self . yaw_rate_std = 0.0 , np . radians ( 1 )
localizer_roll , localizer_roll_std = device_pose . orientation . x , device_pose . orientation . x_std
localizer_roll_std = np . radians ( 1 ) if np . isnan ( localizer_roll_std ) else localizer_roll_std
roll_valid = ( localizer_roll_std < ROLL_STD_MAX ) and ( ROLL_MIN < localizer_roll < ROLL_MAX ) and msg . sensorsOK
if roll_valid :
roll = localizer_roll
# Experimentally found multiplier of 2 to be best trade-off between stability and accuracy or similar?
roll_std = 2 * localizer_roll_std
else :
# This is done to bound the road roll estimate when localizer values are invalid
roll = 0.0
roll_std = np . radians ( 10.0 )
self . roll = np . clip ( roll , self . roll - ROLL_MAX_DELTA , self . roll + ROLL_MAX_DELTA )
if self . active :
if msg . posenetOK :
self . kf . predict_and_observe ( t ,
ObservationKind . ROAD_FRAME_YAW_RATE ,
np . array ( [ [ - self . yaw_rate ] ] ) ,
np . array ( [ np . atleast_2d ( self . yaw_rate_std * * 2 ) ] ) )
self . kf . predict_and_observe ( t ,
ObservationKind . ROAD_ROLL ,
np . array ( [ [ self . roll ] ] ) ,
np . array ( [ np . atleast_2d ( roll_std * * 2 ) ] ) )
self . kf . predict_and_observe ( t , ObservationKind . ANGLE_OFFSET_FAST , np . array ( [ [ 0 ] ] ) )
# We observe the current stiffness and steer ratio (with a high observation noise) to bound
# the respective estimate STD. Otherwise the STDs keep increasing, causing rapid changes in the
# states in longer routes (especially straight stretches).
stiffness = float ( self . kf . x [ States . STIFFNESS ] . item ( ) )
steer_ratio = float ( self . kf . x [ States . STEER_RATIO ] . item ( ) )
self . kf . predict_and_observe ( t , ObservationKind . STIFFNESS , np . array ( [ [ stiffness ] ] ) )
self . kf . predict_and_observe ( t , ObservationKind . STEER_RATIO , np . array ( [ [ steer_ratio ] ] ) )
elif which == ' liveCalibration ' :
self . calibrator . feed_live_calib ( msg )
elif which == ' carState ' :
self . steering_angle = msg . steeringAngleDeg
self . speed = msg . vEgo
in_linear_region = abs ( self . steering_angle ) < 45
self . active = self . speed > MIN_ACTIVE_SPEED and in_linear_region
if self . active :
self . kf . predict_and_observe ( t , ObservationKind . STEER_ANGLE , np . array ( [ [ math . radians ( msg . steeringAngleDeg ) ] ] ) )
self . kf . predict_and_observe ( t , ObservationKind . ROAD_FRAME_X_SPEED , np . array ( [ [ self . speed ] ] ) )
if not self . active :
# Reset time when stopped so uncertainty doesn't grow
self . kf . filter . set_filter_time ( t )
self . kf . filter . reset_rewind ( )
def check_valid_with_hysteresis ( current_valid : bool , val : float , threshold : float , lowered_threshold : float ) :
if current_valid :
current_valid = abs ( val ) < threshold
else :
current_valid = abs ( val ) < lowered_threshold
return current_valid
def main ( ) :
config_realtime_process ( [ 0 , 1 , 2 , 3 ] , 5 )
DEBUG = bool ( int ( os . getenv ( " DEBUG " , " 0 " ) ) )
REPLAY = bool ( int ( os . getenv ( " REPLAY " , " 0 " ) ) )
pm = messaging . PubMaster ( [ ' liveParameters ' ] )
sm = messaging . SubMaster ( [ ' livePose ' , ' liveCalibration ' , ' carState ' ] , poll = ' livePose ' )
params_reader = Params ( )
# wait for stats about the car to come in from controls
cloudlog . info ( " paramsd is waiting for CarParams " )
CP = messaging . log_from_bytes ( params_reader . get ( " CarParams " , block = True ) , car . CarParams )
cloudlog . info ( " paramsd got CarParams " )
min_sr , max_sr = 0.5 * CP . steerRatio , 2.0 * CP . steerRatio
params = params_reader . get ( " LiveParameters " )
# Check if car model matches
if params is not None :
params = json . loads ( params )
if params . get ( ' carFingerprint ' , None ) != CP . carFingerprint :
cloudlog . info ( " Parameter learner found parameters for wrong car. " )
params = None
# Check if starting values are sane
if params is not None :
try :
steer_ratio_sane = min_sr < = params [ ' steerRatio ' ] < = max_sr
if not steer_ratio_sane :
cloudlog . info ( f " Invalid starting values found { params } " )
params = None
except Exception as e :
cloudlog . info ( f " Error reading params { params } : { str ( e ) } " )
params = None
# TODO: cache the params with the capnp struct
if params is None :
params = {
' carFingerprint ' : CP . carFingerprint ,
' steerRatio ' : CP . steerRatio ,
' stiffnessFactor ' : 1.0 ,
' angleOffsetAverageDeg ' : 0.0 ,
}
cloudlog . info ( " Parameter learner resetting to default values " )
if not REPLAY :
# When driving in wet conditions the stiffness can go down, and then be too low on the next drive
# Without a way to detect this we have to reset the stiffness every drive
params [ ' stiffnessFactor ' ] = 1.0
pInitial = None
if DEBUG :
pInitial = np . array ( params [ ' debugFilterState ' ] [ ' std ' ] ) if ' debugFilterState ' in params else None
learner = ParamsLearner ( CP , params [ ' steerRatio ' ] , params [ ' stiffnessFactor ' ] , math . radians ( params [ ' angleOffsetAverageDeg ' ] ) , pInitial )
angle_offset_average = params [ ' angleOffsetAverageDeg ' ]
angle_offset = angle_offset_average
roll = 0.0
avg_offset_valid = True
total_offset_valid = True
roll_valid = True
while True :
sm . update ( )
if sm . all_checks ( ) :
for which in sorted ( sm . updated . keys ( ) , key = lambda x : sm . logMonoTime [ x ] ) :
if sm . updated [ which ] :
t = sm . logMonoTime [ which ] * 1e-9
learner . handle_log ( t , which , sm [ which ] )
if sm . updated [ ' livePose ' ] :
x = learner . kf . x
P = np . sqrt ( learner . kf . P . diagonal ( ) )
if not all ( map ( math . isfinite , x ) ) :
cloudlog . error ( " NaN in liveParameters estimate. Resetting to default values " )
learner = ParamsLearner ( CP , CP . steerRatio , 1.0 , 0.0 )
x = learner . kf . x
angle_offset_average = np . clip ( math . degrees ( x [ States . ANGLE_OFFSET ] . item ( ) ) ,
angle_offset_average - MAX_ANGLE_OFFSET_DELTA , angle_offset_average + MAX_ANGLE_OFFSET_DELTA )
angle_offset = np . clip ( math . degrees ( x [ States . ANGLE_OFFSET ] . item ( ) + x [ States . ANGLE_OFFSET_FAST ] . item ( ) ) ,
angle_offset - MAX_ANGLE_OFFSET_DELTA , angle_offset + MAX_ANGLE_OFFSET_DELTA )
roll = np . clip ( float ( x [ States . ROAD_ROLL ] . item ( ) ) , roll - ROLL_MAX_DELTA , roll + ROLL_MAX_DELTA )
roll_std = float ( P [ States . ROAD_ROLL ] . item ( ) )
if learner . active and learner . speed > LOW_ACTIVE_SPEED :
# Account for the opposite signs of the yaw rates
# At low speeds, bumping into a curb can cause the yaw rate to be very high
sensors_valid = bool ( abs ( learner . speed * ( x [ States . YAW_RATE ] . item ( ) + learner . yaw_rate ) ) < LATERAL_ACC_SENSOR_THRESHOLD )
else :
sensors_valid = True
avg_offset_valid = check_valid_with_hysteresis ( avg_offset_valid , angle_offset_average , OFFSET_MAX , OFFSET_LOWERED_MAX )
total_offset_valid = check_valid_with_hysteresis ( total_offset_valid , angle_offset , OFFSET_MAX , OFFSET_LOWERED_MAX )
roll_valid = check_valid_with_hysteresis ( roll_valid , roll , ROLL_MAX , ROLL_LOWERED_MAX )
msg = messaging . new_message ( ' liveParameters ' )
liveParameters = msg . liveParameters
liveParameters . posenetValid = True
liveParameters . sensorValid = sensors_valid
liveParameters . steerRatio = float ( x [ States . STEER_RATIO ] . item ( ) )
liveParameters . steerRatioValid = min_sr < = liveParameters . steerRatio < = max_sr
liveParameters . stiffnessFactor = float ( x [ States . STIFFNESS ] . item ( ) )
liveParameters . stiffnessFactorValid = 0.2 < = liveParameters . stiffnessFactor < = 5.0
liveParameters . roll = float ( roll )
liveParameters . angleOffsetAverageDeg = float ( angle_offset_average )
liveParameters . angleOffsetAverageValid = bool ( avg_offset_valid )
liveParameters . angleOffsetDeg = float ( angle_offset )
liveParameters . angleOffsetValid = bool ( total_offset_valid )
liveParameters . valid = all ( (
liveParameters . angleOffsetAverageValid ,
liveParameters . angleOffsetValid ,
roll_valid ,
roll_std < ROLL_STD_MAX ,
liveParameters . stiffnessFactorValid ,
liveParameters . steerRatioValid ,
) )
liveParameters . steerRatioStd = float ( P [ States . STEER_RATIO ] . item ( ) )
liveParameters . stiffnessFactorStd = float ( P [ States . STIFFNESS ] . item ( ) )
liveParameters . angleOffsetAverageStd = float ( P [ States . ANGLE_OFFSET ] . item ( ) )
liveParameters . angleOffsetFastStd = float ( P [ States . ANGLE_OFFSET_FAST ] . item ( ) )
if DEBUG :
liveParameters . debugFilterState = log . LiveParametersData . FilterState . new_message ( )
liveParameters . debugFilterState . value = x . tolist ( )
liveParameters . debugFilterState . std = P . tolist ( )
msg . valid = sm . all_checks ( )
if sm . frame % 1200 == 0 : # once a minute
params = {
' carFingerprint ' : CP . carFingerprint ,
' steerRatio ' : liveParameters . steerRatio ,
' stiffnessFactor ' : liveParameters . stiffnessFactor ,
' angleOffsetAverageDeg ' : liveParameters . angleOffsetAverageDeg ,
}
params_reader . put_nonblocking ( " LiveParameters " , json . dumps ( params ) )
pm . send ( ' liveParameters ' , msg )
if __name__ == " __main__ " :
main ( )