open source driving agent
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

240 lines
8.9 KiB

import numpy as np
from common.numpy_fast import clip, interp
from selfdrive.config import Conversions as CV
class LongCtrlState:
#*** this function handles the long control state transitions
# long_control_state labels:
off = 0 # Off
pid = 1 # moving and tracking targets, with PID control running
stopping = 2 # stopping and changing controls to almost open loop as PID does not fit well at such a low speed
starting = 3 # starting and releasing brakes in open loop before giving back to PID
def long_control_state_trans(enabled, long_control_state, v_ego, v_target, v_pid, output_gb):
stopping_speed = 0.5
stopping_target_speed = 0.3
starting_target_speed = 0.5
brake_threshold_to_pid = 0.2
stopping_condition = ((v_ego < stopping_speed) and (v_pid < stopping_target_speed) and (v_target < stopping_target_speed))
if not enabled:
long_control_state = LongCtrlState.off
else:
if long_control_state == LongCtrlState.off:
if enabled:
long_control_state = LongCtrlState.pid
elif long_control_state == LongCtrlState.pid:
if stopping_condition:
long_control_state = LongCtrlState.stopping
elif long_control_state == LongCtrlState.stopping:
if (v_target > starting_target_speed):
long_control_state = LongCtrlState.starting
elif long_control_state == LongCtrlState.starting:
if stopping_condition:
long_control_state = LongCtrlState.stopping
elif output_gb >= -brake_threshold_to_pid:
long_control_state = LongCtrlState.pid
return long_control_state
def get_compute_gb():
# see debug/dump_accel_from_fiber.py
w0 = np.array([[ 1.22056961, -0.39625418, 0.67952657],
[ 1.03691769, 0.78210306, -0.41343188]])
b0 = np.array([ 0.01536703, -0.14335321, -0.26932889])
w2 = np.array([[-0.59124422, 0.42899439, 0.38660881],
[ 0.79973811, 0.13178682, 0.08550351],
[-0.15651935, -0.44360259, 0.76910877]])
b2 = np.array([ 0.15624429, 0.02294923, -0.0341086 ])
w4 = np.array([[-0.31521443],
[-0.38626176],
[ 0.52667892]])
b4 = np.array([-0.02922216])
def compute_output(dat, w0, b0, w2, b2, w4, b4):
m0 = np.dot(dat, w0) + b0
m0 = leakyrelu(m0, 0.1)
m2 = np.dot(m0, w2) + b2
m2 = leakyrelu(m2, 0.1)
m4 = np.dot(m2, w4) + b4
return m4
def leakyrelu(x, alpha):
return np.maximum(x, alpha * x)
def _compute_gb(dat):
#linearly extrap below v1 using v1 and v2 data
v1 = 5.
v2 = 10.
vx = dat[1]
if vx > 5.:
m4 = compute_output(dat, w0, b0, w2, b2, w4, b4)
else:
dat[1] = v1
m4v1 = compute_output(dat, w0, b0, w2, b2, w4, b4)
dat[1] = v2
m4v2 = compute_output(dat, w0, b0, w2, b2, w4, b4)
m4 = (vx - v1) * (m4v2 - m4v1) / (v2 - v1) + m4v1
return m4
return _compute_gb
# takes in [desired_accel, current_speed] -> [-1.0, 1.0] where -1.0 is max brake and 1.0 is max gas
compute_gb = get_compute_gb()
_KP_BP = [0., 5., 35.]
_KP_V = [1.2, 0.8, 0.5]
_kI_BP = [0., 35.]
_kI_V = [0.18, 0.12]
def pid_long_control(v_ego, v_pid, Ui_accel_cmd, gas_max, brake_max, jerk_factor, gear, rate):
#*** This function compute the gb pedal positions in order to track the desired speed
# proportional and integral terms. More precision at low speed
Kp = interp(v_ego, _KP_BP, _KP_V)
Ki = interp(v_ego, _kI_BP, _kI_V)
# scle Kp and Ki by jerk factor drom drive_thread
Kp = (1. + jerk_factor)*Kp
Ki = (1. + jerk_factor)*Ki
# this is ugly but can speed reports 0 when speed<0.3m/s and we can't have that jump
v_ego_min = 0.3
v_ego = max(v_ego, v_ego_min)
v_error = v_pid - v_ego
Up_accel_cmd = v_error*Kp
Ui_accel_cmd_new = Ui_accel_cmd + v_error*Ki*1.0/rate
accel_cmd_new = Ui_accel_cmd_new + Up_accel_cmd
output_gb_new = compute_gb([accel_cmd_new, v_ego])
# Anti-wind up for integrator: only update integrator if we not against the throttle and brake limits
# do not wind up if we are changing gear and we are on the gas pedal
if (((v_error >= 0. and (output_gb_new < gas_max or Ui_accel_cmd < 0)) or
(v_error <= 0. and (output_gb_new > - brake_max or Ui_accel_cmd > 0))) and
not (v_error >= 0. and gear == 11 and output_gb_new > 0)):
#update integrator
Ui_accel_cmd = Ui_accel_cmd_new
accel_cmd = Ui_accel_cmd + Up_accel_cmd
# go from accel to pedals
output_gb = compute_gb([accel_cmd, v_ego])
output_gb = output_gb[0]
# useful to know if control is against the limit
long_control_sat = False
if output_gb > gas_max or output_gb < -brake_max:
long_control_sat = True
output_gb = clip(output_gb, -brake_max, gas_max)
return output_gb, Up_accel_cmd, Ui_accel_cmd, long_control_sat
stopping_brake_rate = 0.2 # brake_travel/s while trying to stop
starting_brake_rate = 0.6 # brake_travel/s while releasing on restart
starting_Ui = 0.5 # Since we don't have much info about acceleration at this point, be conservative
brake_stopping_target = 0.5 # apply at least this amount of brake to maintain the vehicle stationary
_MAX_SPEED_ERROR_BP = [0., 30.] # speed breakpoints
_MAX_SPEED_ERROR_V = [1.5, .8] # max positive v_pid error VS actual speed; this avoids controls windup due to slow pedal resp
class LongControl(object):
def __init__(self):
self.long_control_state = LongCtrlState.off # initialized to off
self.long_control_sat = False
self.Up_accel_cmd = 0.
self.last_output_gb = 0.
self.reset(0.)
def reset(self, v_pid):
self.Ui_accel_cmd = 0.
self.v_pid = v_pid
def update(self, enabled, v_ego, v_cruise, v_target_lead, a_target, jerk_factor, VP):
brake_max_bp = [0., 5., 20., 100.] # speeds
brake_max_v = [1.0, 1.0, 0.8, 0.8] # values
# brake and gas limits
brake_max = interp(v_ego, brake_max_bp, brake_max_v)
# TODO: not every time
if VP.brake_only:
gas_max = 0
else:
gas_max_bp = [0., 100.] # speeds
gas_max_v = [0.6, 0.6] # values
gas_max = interp(v_ego, gas_max_bp, gas_max_v)
overshoot_allowance = 2.0 # overshoot allowed when changing accel sign
output_gb = self.last_output_gb
rate = 100
# limit max target speed based on cruise setting:
v_cruise_mph = round(v_cruise * CV.KPH_TO_MPH) # what's displayed in mph on the IC
v_target = min(v_target_lead, v_cruise_mph * CV.MPH_TO_MS / VP.ui_speed_fudge)
max_speed_delta_up = a_target[1]*1.0/rate
max_speed_delta_down = a_target[0]*1.0/rate
# *** long control substate transitions
self.long_control_state = long_control_state_trans(enabled, self.long_control_state, v_ego, v_target, self.v_pid, output_gb)
# *** long control behavior based on state
# TODO: move this to drive_helpers
# disabled
if self.long_control_state == LongCtrlState.off:
self.v_pid = v_ego # do nothing
output_gb = 0.
self.Ui_accel_cmd = 0.
# tracking objects and driving
elif self.long_control_state == LongCtrlState.pid:
#reset v_pid close to v_ego if it was too far and new v_target is closer to v_ego
if ((self.v_pid > v_ego + overshoot_allowance) and
(v_target < self.v_pid)):
self.v_pid = max(v_target, v_ego + overshoot_allowance)
elif ((self.v_pid < v_ego - overshoot_allowance) and
(v_target > self.v_pid)):
self.v_pid = min(v_target, v_ego - overshoot_allowance)
# move v_pid no faster than allowed accel limits
if (v_target > self.v_pid + max_speed_delta_up):
self.v_pid += max_speed_delta_up
elif (v_target < self.v_pid + max_speed_delta_down):
self.v_pid += max_speed_delta_down
else:
self.v_pid = v_target
# to avoid too much wind up on acceleration, limit positive speed error
if not VP.brake_only:
max_speed_error = interp(v_ego, _MAX_SPEED_ERROR_BP, _MAX_SPEED_ERROR_V)
self.v_pid = min(self.v_pid, v_ego + max_speed_error)
# TODO: removed anti windup on gear change, does it matter?
output_gb, self.Up_accel_cmd, self.Ui_accel_cmd, self.long_control_sat = pid_long_control(v_ego, self.v_pid, \
self.Ui_accel_cmd, gas_max, brake_max, jerk_factor, 0, rate)
# intention is to stop, switch to a different brake control until we stop
elif self.long_control_state == LongCtrlState.stopping:
if v_ego > 0. or output_gb > -brake_stopping_target:
output_gb -= stopping_brake_rate/rate
output_gb = clip(output_gb, -brake_max, gas_max)
self.v_pid = v_ego
self.Ui_accel_cmd = 0.
# intention is to move again, release brake fast before handling control to PID
elif self.long_control_state == LongCtrlState.starting:
if output_gb < -0.2:
output_gb += starting_brake_rate/rate
self.v_pid = v_ego
self.Ui_accel_cmd = starting_Ui
self.last_output_gb = output_gb
final_gas = clip(output_gb, 0., gas_max)
final_brake = -clip(output_gb, -brake_max, 0.)
return final_gas, final_brake