import  math 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  cereal  import  car ,  log 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . conversions  import  Conversions  as  CV 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . numpy_fast  import  clip 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . realtime  import  DT_CTRL 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								# WARNING: this value was determined based on the model's training distribution, 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								#          model predictions above this speed can be unpredictable 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								# V_CRUISE's are in kph 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								V_CRUISE_MIN  =  8 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								V_CRUISE_MAX  =  145 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								V_CRUISE_UNSET  =  255 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								V_CRUISE_INITIAL  =  40 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								V_CRUISE_INITIAL_EXPERIMENTAL_MODE  =  105 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								IMPERIAL_INCREMENT  =  round ( CV . MPH_TO_KPH ,  1 )   # round here to avoid rounding errors incrementing set speed 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								MIN_SPEED  =  1.0 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								CONTROL_N  =  17 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								CAR_ROTATION_RADIUS  =  0.0 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								# EU guidelines 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								MAX_LATERAL_JERK  =  5.0 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								MAX_VEL_ERR  =  5.0 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								ButtonEvent  =  car . CarState . ButtonEvent 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								ButtonType  =  car . CarState . ButtonEvent . Type 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								CRUISE_LONG_PRESS  =  50 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								CRUISE_NEAREST_FUNC  =  { 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  ButtonType . accelCruise :  math . ceil , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ButtonType . decelCruise :  math . floor , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								} 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								CRUISE_INTERVAL_SIGN  =  { 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  ButtonType . accelCruise :  + 1 , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ButtonType . decelCruise :  - 1 , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								} 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  VCruiseHelper : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __init__ ( self ,  CP ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . CP  =  CP 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . v_cruise_kph  =  V_CRUISE_UNSET 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . v_cruise_cluster_kph  =  V_CRUISE_UNSET 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . v_cruise_kph_last  =  0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . button_timers  =  { ButtonType . decelCruise :  0 ,  ButtonType . accelCruise :  0 } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . button_change_states  =  { btn :  { " standstill " :  False ,  " enabled " :  False }  for  btn  in  self . button_timers } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  @property 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  v_cruise_initialized ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    return  self . v_cruise_kph  !=  V_CRUISE_UNSET 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  update_v_cruise ( self ,  CS ,  enabled ,  is_metric ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . v_cruise_kph_last  =  self . v_cruise_kph 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  CS . cruiseState . available : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  not  self . CP . pcmCruise : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # if stock cruise is completely disabled, then we can use our own set speed logic 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . _update_v_cruise_non_pcm ( CS ,  enabled ,  is_metric ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . v_cruise_cluster_kph  =  self . v_cruise_kph 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . update_button_timers ( CS ,  enabled ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . v_cruise_kph  =  CS . cruiseState . speed  *  CV . MS_TO_KPH 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . v_cruise_cluster_kph  =  CS . cruiseState . speedCluster  *  CV . MS_TO_KPH 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . v_cruise_kph  =  V_CRUISE_UNSET 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . v_cruise_cluster_kph  =  V_CRUISE_UNSET 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _update_v_cruise_non_pcm ( self ,  CS ,  enabled ,  is_metric ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # handle button presses. TODO: this should be in state_control, but a decelCruise press 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # would have the effect of both enabling and changing speed is checked after the state transition 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  not  enabled : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    long_press  =  False 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    button_type  =  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    v_cruise_delta  =  1.  if  is_metric  else  IMPERIAL_INCREMENT 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  b  in  CS . buttonEvents : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  b . type . raw  in  self . button_timers  and  not  b . pressed : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  self . button_timers [ b . type . raw ]  >  CRUISE_LONG_PRESS : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          return   # end long press 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        button_type  =  b . type . raw 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  k  in  self . button_timers . keys ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  self . button_timers [ k ]  and  self . button_timers [ k ]  %  CRUISE_LONG_PRESS  ==  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          button_type  =  k 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          long_press  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          break 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  button_type  is  None : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # Don't adjust speed when pressing resume to exit standstill 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    cruise_standstill  =  self . button_change_states [ button_type ] [ " standstill " ]  or  CS . cruiseState . standstill 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  button_type  ==  ButtonType . accelCruise  and  cruise_standstill : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # Don't adjust speed if we've enabled since the button was depressed (some ports enable on rising edge) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  not  self . button_change_states [ button_type ] [ " enabled " ] : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    v_cruise_delta  =  v_cruise_delta  *  ( 5  if  long_press  else  1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  long_press  and  self . v_cruise_kph  %  v_cruise_delta  !=  0 :   # partial interval 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . v_cruise_kph  =  CRUISE_NEAREST_FUNC [ button_type ] ( self . v_cruise_kph  /  v_cruise_delta )  *  v_cruise_delta 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . v_cruise_kph  + =  v_cruise_delta  *  CRUISE_INTERVAL_SIGN [ button_type ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # If set is pressed while overriding, clip cruise speed to minimum of vEgo 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  CS . gasPressed  and  button_type  in  ( ButtonType . decelCruise ,  ButtonType . setCruise ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . v_cruise_kph  =  max ( self . v_cruise_kph ,  CS . vEgo  *  CV . MS_TO_KPH ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . v_cruise_kph  =  clip ( round ( self . v_cruise_kph ,  1 ) ,  V_CRUISE_MIN ,  V_CRUISE_MAX ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  update_button_timers ( self ,  CS ,  enabled ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # increment timer for buttons still pressed 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  k  in  self . button_timers : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  self . button_timers [ k ]  >  0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . button_timers [ k ]  + =  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  b  in  CS . buttonEvents : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  b . type . raw  in  self . button_timers : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        # Start/end timer and store current state on change of button pressed 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . button_timers [ b . type . raw ]  =  1  if  b . pressed  else  0 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . button_change_states [ b . type . raw ]  =  { " standstill " :  CS . cruiseState . standstill ,  " enabled " :  enabled } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  initialize_v_cruise ( self ,  CS ,  experimental_mode :  bool )  - >  None : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # initializing is handled by the PCM 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  self . CP . pcmCruise : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      return 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    initial  =  V_CRUISE_INITIAL_EXPERIMENTAL_MODE  if  experimental_mode  else  V_CRUISE_INITIAL 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # 250kph or above probably means we never had a set speed 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  any ( b . type  in  ( ButtonType . accelCruise ,  ButtonType . resumeCruise )  for  b  in  CS . buttonEvents )  and  self . v_cruise_kph_last  <  250 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . v_cruise_kph  =  self . v_cruise_kph_last 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    else : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . v_cruise_kph  =  int ( round ( clip ( CS . vEgo  *  CV . MS_TO_KPH ,  initial ,  V_CRUISE_MAX ) ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . v_cruise_cluster_kph  =  self . v_cruise_kph 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								def  clip_curvature ( v_ego ,  prev_curvature ,  new_curvature ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  v_ego  =  max ( MIN_SPEED ,  v_ego ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  max_curvature_rate  =  MAX_LATERAL_JERK  /  ( v_ego * * 2 )  # inexact calculation, check https://github.com/commaai/openpilot/pull/24755 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  safe_desired_curvature  =  clip ( new_curvature , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                                prev_curvature  -  max_curvature_rate  *  DT_CTRL , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                                prev_curvature  +  max_curvature_rate  *  DT_CTRL ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  return  safe_desired_curvature 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								def  get_speed_error ( modelV2 :  log . ModelDataV2 ,  v_ego :  float )  - >  float : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  # ToDo: Try relative error, and absolute speed 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  len ( modelV2 . temporalPose . trans ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    vel_err  =  clip ( modelV2 . temporalPose . trans [ 0 ]  -  v_ego ,  - MAX_VEL_ERR ,  MAX_VEL_ERR ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  float ( vel_err ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  0.0