#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  numpy  as  np 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  unittest 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  parameterized  import  parameterized_class 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal  import  log 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . params  import  Params 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . selfdrive . controls . lib . drive_helpers  import  VCruiseHelper ,  V_CRUISE_MIN ,  V_CRUISE_MAX ,  V_CRUISE_INITIAL ,  IMPERIAL_INCREMENT 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  cereal  import  car 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . conversions  import  Conversions  as  CV 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . selfdrive . test . longitudinal_maneuvers . maneuver  import  Maneuver 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								ButtonEvent  =  car . CarState . ButtonEvent 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								ButtonType  =  car . CarState . ButtonEvent . Type 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								def  run_cruise_simulation ( cruise ,  e2e ,  t_end = 20. ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  man  =  Maneuver ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ' ' , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    duration = t_end , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    initial_speed = max ( cruise  -  1. ,  0.0 ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lead_relevancy = True , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    initial_distance_lead = 100 , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cruise_values = [ cruise ] , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    prob_lead_values = [ 0.0 ] , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    breakpoints = [ 0. ] , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    e2e = e2e , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  valid ,  output  =  man . evaluate ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  assert  valid 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  return  output [ - 1 ,  3 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestCruiseSpeed ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_cruise_speed ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    params  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    personalities  =  [ log . LongitudinalPersonality . relaxed , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                     log . LongitudinalPersonality . standard , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                     log . LongitudinalPersonality . aggressive ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  personality  in  personalities : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      params . put ( " LongitudinalPersonality " ,  str ( personality ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  e2e  in  [ False ,  True ] : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        for  speed  in  [ 5 , 35 ] : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          print ( f ' Testing  { speed }  m/s ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          cruise_speed  =  float ( speed ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								          simulation_steady_state  =  run_cruise_simulation ( cruise_speed ,  e2e ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          self . assertAlmostEqual ( simulation_steady_state ,  cruise_speed ,  delta = .01 ,  msg = f ' Did not reach  { speed }  m/s ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								# TODO: test pcmCruise 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								@parameterized_class ( ( ' pcm_cruise ' , ) ,  [ ( False , ) ] ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestVCruiseHelper ( unittest . TestCase ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUp ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . CP  =  car . CarParams ( pcmCruise = self . pcm_cruise )   # pylint: disable=E1101 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . v_cruise_helper  =  VCruiseHelper ( self . CP ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . reset_cruise_speed_state ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  reset_cruise_speed_state ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # Two resets previous cruise speed 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  _  in  range ( 2 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . v_cruise_helper . update_v_cruise ( car . CarState ( cruiseState = { " available " :  False } ) ,  enabled = False ,  is_metric = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  enable ( self ,  v_ego ,  experimental_mode ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # Simulates user pressing set with a current speed 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . v_cruise_helper . initialize_v_cruise ( car . CarState ( vEgo = v_ego ) ,  experimental_mode ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_adjust_speed ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    Asserts  speed  changes  on  falling  edges  of  buttons . 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . enable ( V_CRUISE_INITIAL  *  CV . KPH_TO_MS ,  False ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  btn  in  ( ButtonType . accelCruise ,  ButtonType . decelCruise ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  pressed  in  ( True ,  False ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        CS  =  car . CarState ( cruiseState = { " available " :  True } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        CS . buttonEvents  =  [ ButtonEvent ( type = btn ,  pressed = pressed ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . v_cruise_helper . update_v_cruise ( CS ,  enabled = True ,  is_metric = False ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . assertEqual ( pressed ,  self . v_cruise_helper . v_cruise_kph  ==  self . v_cruise_helper . v_cruise_kph_last ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_rising_edge_enable ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    Some  car  interfaces  may  enable  on  rising  edge  of  a  button , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ensure  we  don ' t adjust speed if enabled changes mid-press. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # NOTE: enabled is always one frame behind the result from button press in controlsd 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  enabled ,  pressed  in  ( ( False ,  False ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                             ( False ,  True ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                             ( True ,  False ) ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      CS  =  car . CarState ( cruiseState = { " available " :  True } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      CS . buttonEvents  =  [ ButtonEvent ( type = ButtonType . decelCruise ,  pressed = pressed ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . v_cruise_helper . update_v_cruise ( CS ,  enabled = enabled ,  is_metric = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  pressed : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . enable ( V_CRUISE_INITIAL  *  CV . KPH_TO_MS ,  False ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # Expected diff on enabling. Speed should not change on falling edge of pressed 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . assertEqual ( not  pressed ,  self . v_cruise_helper . v_cruise_kph  ==  self . v_cruise_helper . v_cruise_kph_last ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_resume_in_standstill ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    Asserts  we  don ' t increment set speed if user presses resume/accel to exit cruise standstill. 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . enable ( 0 ,  False ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  standstill  in  ( True ,  False ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  pressed  in  ( True ,  False ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        CS  =  car . CarState ( cruiseState = { " available " :  True ,  " standstill " :  standstill } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        CS . buttonEvents  =  [ ButtonEvent ( type = ButtonType . accelCruise ,  pressed = pressed ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . v_cruise_helper . update_v_cruise ( CS ,  enabled = True ,  is_metric = False ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        # speed should only update if not at standstill and button falling edge 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        should_equal  =  standstill  or  pressed 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . assertEqual ( should_equal ,  self . v_cruise_helper . v_cruise_kph  ==  self . v_cruise_helper . v_cruise_kph_last ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_set_gas_pressed ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    Asserts  pressing  set  while  enabled  with  gas  pressed  sets 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    the  speed  to  the  maximum  of  vEgo  and  current  cruise  speed . 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  v_ego  in  np . linspace ( 0 ,  100 ,  101 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . reset_cruise_speed_state ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . enable ( V_CRUISE_INITIAL  *  CV . KPH_TO_MS ,  False ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # first decrement speed, then perform gas pressed logic 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      expected_v_cruise_kph  =  self . v_cruise_helper . v_cruise_kph  -  IMPERIAL_INCREMENT 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      expected_v_cruise_kph  =  max ( expected_v_cruise_kph ,  v_ego  *  CV . MS_TO_KPH )   # clip to min of vEgo 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      expected_v_cruise_kph  =  float ( np . clip ( round ( expected_v_cruise_kph ,  1 ) ,  V_CRUISE_MIN ,  V_CRUISE_MAX ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      CS  =  car . CarState ( vEgo = float ( v_ego ) ,  gasPressed = True ,  cruiseState = { " available " :  True } ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      CS . buttonEvents  =  [ ButtonEvent ( type = ButtonType . decelCruise ,  pressed = False ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . v_cruise_helper . update_v_cruise ( CS ,  enabled = True ,  is_metric = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # TODO: fix skipping first run due to enabled on rising edge exception 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  v_ego  ==  0.0 : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        continue 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . assertEqual ( expected_v_cruise_kph ,  self . v_cruise_helper . v_cruise_kph ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_initialize_v_cruise ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    Asserts  allowed  cruise  speeds  on  enabling  with  SET . 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    """ 
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    for  experimental_mode  in  ( True ,  False ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  v_ego  in  np . linspace ( 0 ,  100 ,  101 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . reset_cruise_speed_state ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertFalse ( self . v_cruise_helper . v_cruise_initialized ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        self . enable ( float ( v_ego ) ,  experimental_mode ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertTrue ( V_CRUISE_INITIAL  < =  self . v_cruise_helper . v_cruise_kph  < =  V_CRUISE_MAX ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . assertTrue ( self . v_cruise_helper . v_cruise_initialized ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  unittest . main ( )