openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

158 lines
5.8 KiB

#!/usr/bin/env python3
import math
import json
import numpy as np
import cereal.messaging as messaging
from cereal import car, log
from common.params import Params, put_nonblocking
from selfdrive.locationd.models.car_kf import CarKalman, ObservationKind, States
from selfdrive.locationd.models.constants import GENERATED_DIR
from selfdrive.swaglog import cloudlog
5 years ago
KalmanStatus = log.LiveLocationKalman.Status
class ParamsLearner:
def __init__(self, CP, steer_ratio, stiffness_factor, angle_offset):
self.kf = CarKalman(GENERATED_DIR, steer_ratio, stiffness_factor, angle_offset)
self.kf.filter.set_mass(CP.mass) # pylint: disable=no-member
self.kf.filter.set_rotational_inertia(CP.rotationalInertia) # pylint: disable=no-member
self.kf.filter.set_center_to_front(CP.centerToFront) # pylint: disable=no-member
self.kf.filter.set_center_to_rear(CP.wheelbase - CP.centerToFront) # pylint: disable=no-member
self.kf.filter.set_stiffness_front(CP.tireStiffnessFront) # pylint: disable=no-member
self.kf.filter.set_stiffness_rear(CP.tireStiffnessRear) # pylint: disable=no-member
self.active = False
self.speed = 0
self.steering_pressed = False
self.steering_angle = 0
self.valid = True
def handle_log(self, t, which, msg):
if which == 'liveLocationKalman':
yaw_rate = msg.angularVelocityCalibrated.value[2]
yaw_rate_std = msg.angularVelocityCalibrated.std[2]
5 years ago
if self.active:
5 years ago
if msg.inputsOK and msg.posenetOK and msg.status == KalmanStatus.valid:
self.kf.predict_and_observe(t,
ObservationKind.ROAD_FRAME_YAW_RATE,
np.array([[[-yaw_rate]]]),
np.array([np.atleast_2d(yaw_rate_std**2)]))
self.kf.predict_and_observe(t, ObservationKind.ANGLE_OFFSET_FAST, np.array([[[0]]]))
elif which == 'carState':
self.steering_angle = msg.steeringAngleDeg
self.steering_pressed = msg.steeringPressed
self.speed = msg.vEgo
5 years ago
in_linear_region = abs(self.steering_angle) < 45 or not self.steering_pressed
self.active = self.speed > 5 and in_linear_region
if self.active:
self.kf.predict_and_observe(t, ObservationKind.STEER_ANGLE, np.array([[[math.radians(msg.steeringAngleDeg)]]]))
self.kf.predict_and_observe(t, ObservationKind.ROAD_FRAME_X_SPEED, np.array([[[self.speed]]]))
if not self.active:
# Reset time when stopped so uncertainty doesn't grow
self.kf.filter.filter_time = t
self.kf.filter.reset_rewind()
def main(sm=None, pm=None):
if sm is None:
sm = messaging.SubMaster(['liveLocationKalman', 'carState'], poll=['liveLocationKalman'])
if pm is None:
pm = messaging.PubMaster(['liveParameters'])
params_reader = Params()
# wait for stats about the car to come in from controls
cloudlog.info("paramsd is waiting for CarParams")
CP = car.CarParams.from_bytes(params_reader.get("CarParams", block=True))
cloudlog.info("paramsd got CarParams")
min_sr, max_sr = 0.5 * CP.steerRatio, 2.0 * CP.steerRatio
params = params_reader.get("LiveParameters")
# Check if car model matches
if params is not None:
params = json.loads(params)
if params.get('carFingerprint', None) != CP.carFingerprint:
cloudlog.info("Parameter learner found parameters for wrong car.")
params = None
try:
if params is not None and not all((
abs(params.get('angleOffsetAverageDeg')) < 10.0,
min_sr <= params['steerRatio'] <= max_sr)):
cloudlog.info(f"Invalid starting values found {params}")
params = None
except Exception as e:
cloudlog.info(f"Error reading params {params}: {str(e)}")
params = None
# TODO: cache the params with the capnp struct
if params is None:
params = {
'carFingerprint': CP.carFingerprint,
'steerRatio': CP.steerRatio,
'stiffnessFactor': 1.0,
'angleOffsetAverageDeg': 0.0,
}
cloudlog.info("Parameter learner resetting to default values")
# When driving in wet conditions the stiffness can go down, and then be too low on the next drive
# Without a way to detect this we have to reset the stiffness every drive
params['stiffnessFactor'] = 1.0
learner = ParamsLearner(CP, params['steerRatio'], params['stiffnessFactor'], math.radians(params['angleOffsetAverageDeg']))
while True:
sm.update()
for which, updated in sm.updated.items():
if updated:
t = sm.logMonoTime[which] * 1e-9
learner.handle_log(t, which, sm[which])
if sm.updated['liveLocationKalman']:
msg = messaging.new_message('liveParameters')
msg.logMonoTime = sm.logMonoTime['carState']
msg.liveParameters.posenetValid = True
msg.liveParameters.sensorValid = True
x = learner.kf.x
msg.liveParameters.steerRatio = float(x[States.STEER_RATIO])
msg.liveParameters.stiffnessFactor = float(x[States.STIFFNESS])
msg.liveParameters.angleOffsetAverageDeg = math.degrees(x[States.ANGLE_OFFSET])
msg.liveParameters.angleOffsetDeg = msg.liveParameters.angleOffsetAverageDeg + math.degrees(x[States.ANGLE_OFFSET_FAST])
msg.liveParameters.valid = all((
abs(msg.liveParameters.angleOffsetAverageDeg) < 10.0,
abs(msg.liveParameters.angleOffsetDeg) < 10.0,
0.2 <= msg.liveParameters.stiffnessFactor <= 5.0,
min_sr <= msg.liveParameters.steerRatio <= max_sr,
))
if sm.frame % 1200 == 0: # once a minute
params = {
'carFingerprint': CP.carFingerprint,
'steerRatio': msg.liveParameters.steerRatio,
'stiffnessFactor': msg.liveParameters.stiffnessFactor,
'angleOffsetAverageDeg': msg.liveParameters.angleOffsetAverageDeg,
}
put_nonblocking("LiveParameters", json.dumps(params))
pm.send('liveParameters', msg)
if __name__ == "__main__":
main()