import math
from cereal import car
from common . conversions import Conversions as CV
from common . numpy_fast import clip , interp
from common . realtime import DT_MDL
from selfdrive . modeld . constants import T_IDXS
# WARNING: this value was determined based on the model's training distribution,
# model predictions above this speed can be unpredictable
V_CRUISE_MAX = 145 # kph
V_CRUISE_MIN = 8 # kph
V_CRUISE_ENABLE_MIN = 40 # kph
V_CRUISE_INITIAL = 255 # kph
IMPERIAL_INCREMENT = 1.6 # should be CV.MPH_TO_KPH, but this causes rounding errors
MIN_SPEED = 1.0
LAT_MPC_N = 16
LON_MPC_N = 32
CONTROL_N = 17
CAR_ROTATION_RADIUS = 0.0
# EU guidelines
MAX_LATERAL_JERK = 5.0
ButtonEvent = car . CarState . ButtonEvent
ButtonType = car . CarState . ButtonEvent . Type
CRUISE_LONG_PRESS = 50
CRUISE_NEAREST_FUNC = {
ButtonType . accelCruise : math . ceil ,
ButtonType . decelCruise : math . floor ,
}
CRUISE_INTERVAL_SIGN = {
ButtonType . accelCruise : + 1 ,
ButtonType . decelCruise : - 1 ,
}
class VCruiseHelper :
def __init__ ( self , CP ) :
self . CP = CP
self . v_cruise_kph = V_CRUISE_INITIAL
self . v_cruise_cluster_kph = V_CRUISE_INITIAL
self . v_cruise_kph_last = 0
self . button_timers = { ButtonType . decelCruise : 0 , ButtonType . accelCruise : 0 }
self . button_change_states = { btn : { " standstill " : False , " enabled " : False } for btn in self . button_timers }
@property
def v_cruise_initialized ( self ) :
return self . v_cruise_kph != V_CRUISE_INITIAL
def update_v_cruise ( self , CS , enabled , is_metric ) :
self . v_cruise_kph_last = self . v_cruise_kph
if CS . cruiseState . available :
if not self . CP . pcmCruise :
# if stock cruise is completely disabled, then we can use our own set speed logic
self . _update_v_cruise_non_pcm ( CS , enabled , is_metric )
self . v_cruise_cluster_kph = self . v_cruise_kph
self . update_button_timers ( CS , enabled )
else :
self . v_cruise_kph = CS . cruiseState . speed * CV . MS_TO_KPH
self . v_cruise_cluster_kph = CS . cruiseState . speedCluster * CV . MS_TO_KPH
else :
self . v_cruise_kph = V_CRUISE_INITIAL
self . v_cruise_cluster_kph = V_CRUISE_INITIAL
def _update_v_cruise_non_pcm ( self , CS , enabled , is_metric ) :
# handle button presses. TODO: this should be in state_control, but a decelCruise press
# would have the effect of both enabling and changing speed is checked after the state transition
if not enabled :
return
long_press = False
button_type = None
v_cruise_delta = 1. if is_metric else IMPERIAL_INCREMENT
for b in CS . buttonEvents :
if b . type . raw in self . button_timers and not b . pressed :
if self . button_timers [ b . type . raw ] > CRUISE_LONG_PRESS :
return # end long press
button_type = b . type . raw
break
else :
for k in self . button_timers . keys ( ) :
if self . button_timers [ k ] and self . button_timers [ k ] % CRUISE_LONG_PRESS == 0 :
button_type = k
long_press = True
break
if button_type is None :
return
# Don't adjust speed when pressing resume to exit standstill
cruise_standstill = self . button_change_states [ button_type ] [ " standstill " ] or CS . cruiseState . standstill
if button_type == ButtonType . accelCruise and cruise_standstill :
return
# Don't adjust speed if we've enabled since the button was depressed (some ports enable on rising edge)
if not self . button_change_states [ button_type ] [ " enabled " ] :
return
v_cruise_delta = v_cruise_delta * ( 5 if long_press else 1 )
if long_press and self . v_cruise_kph % v_cruise_delta != 0 : # partial interval
self . v_cruise_kph = CRUISE_NEAREST_FUNC [ button_type ] ( self . v_cruise_kph / v_cruise_delta ) * v_cruise_delta
else :
self . v_cruise_kph + = v_cruise_delta * CRUISE_INTERVAL_SIGN [ button_type ]
# If set is pressed while overriding, clip cruise speed to minimum of vEgo
if CS . gasPressed and button_type in ( ButtonType . decelCruise , ButtonType . setCruise ) :
self . v_cruise_kph = max ( self . v_cruise_kph , CS . vEgo * CV . MS_TO_KPH )
self . v_cruise_kph = clip ( round ( self . v_cruise_kph , 1 ) , V_CRUISE_MIN , V_CRUISE_MAX )
def update_button_timers ( self , CS , enabled ) :
# increment timer for buttons still pressed
for k in self . button_timers :
if self . button_timers [ k ] > 0 :
self . button_timers [ k ] + = 1
for b in CS . buttonEvents :
if b . type . raw in self . button_timers :
# Start/end timer and store current state on change of button pressed
self . button_timers [ b . type . raw ] = 1 if b . pressed else 0
self . button_change_states [ b . type . raw ] = { " standstill " : CS . cruiseState . standstill , " enabled " : enabled }
def initialize_v_cruise ( self , CS ) :
# initializing is handled by the PCM
if self . CP . pcmCruise :
return
# 250kph or above probably means we never had a set speed
if any ( b . type in ( ButtonType . accelCruise , ButtonType . resumeCruise ) for b in CS . buttonEvents ) and self . v_cruise_kph_last < 250 :
self . v_cruise_kph = self . v_cruise_kph_last
else :
self . v_cruise_kph = int ( round ( clip ( CS . vEgo * CV . MS_TO_KPH , V_CRUISE_ENABLE_MIN , V_CRUISE_MAX ) ) )
self . v_cruise_cluster_kph = self . v_cruise_kph
def apply_deadzone ( error , deadzone ) :
if error > deadzone :
error - = deadzone
elif error < - deadzone :
error + = deadzone
else :
error = 0.
return error
def apply_slack ( error , deadzone ) :
if ( error > - deadzone ) and ( error < deadzone ) :
error = 0.
return error
def rate_limit ( new_value , last_value , dw_step , up_step ) :
return clip ( new_value , last_value + dw_step , last_value + up_step )
def get_lag_adjusted_curvature ( CP , v_ego , psis , curvatures , curvature_rates ) :
if len ( psis ) != CONTROL_N :
psis = [ 0.0 ] * CONTROL_N
curvatures = [ 0.0 ] * CONTROL_N
curvature_rates = [ 0.0 ] * CONTROL_N
v_ego = max ( MIN_SPEED , v_ego )
# TODO this needs more thought, use .2s extra for now to estimate other delays
delay = CP . steerActuatorDelay + .2
# MPC can plan to turn the wheel and turn back before t_delay. This means
# in high delay cases some corrections never even get commanded. So just use
# psi to calculate a simple linearization of desired curvature
current_curvature_desired = curvatures [ 0 ]
psi = interp ( delay , T_IDXS [ : CONTROL_N ] , psis )
average_curvature_desired = psi / ( v_ego * delay )
desired_curvature = 2 * average_curvature_desired - current_curvature_desired
# This is the "desired rate of the setpoint" not an actual desired rate
desired_curvature_rate = curvature_rates [ 0 ]
max_curvature_rate = MAX_LATERAL_JERK / ( v_ego * * 2 ) # inexact calculation, check https://github.com/commaai/openpilot/pull/24755
safe_desired_curvature_rate = clip ( desired_curvature_rate ,
- max_curvature_rate ,
max_curvature_rate )
safe_desired_curvature = clip ( desired_curvature ,
current_curvature_desired - max_curvature_rate * DT_MDL ,
current_curvature_desired + max_curvature_rate * DT_MDL )
return safe_desired_curvature , safe_desired_curvature_rate