You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							92 lines
						
					
					
						
							3.5 KiB
						
					
					
				
			
		
		
	
	
							92 lines
						
					
					
						
							3.5 KiB
						
					
					
				from cereal import log
 | 
						|
from openpilot.common.realtime import DT_CTRL
 | 
						|
from openpilot.selfdrive.selfdrived.state import StateMachine, SOFT_DISABLE_TIME
 | 
						|
from openpilot.selfdrive.selfdrived.events import Events, ET, EVENTS, NormalPermanentAlert
 | 
						|
 | 
						|
State = log.SelfdriveState.OpenpilotState
 | 
						|
 | 
						|
# The event types that maintain the current state
 | 
						|
MAINTAIN_STATES = {State.enabled: (None,), State.disabled: (None,), State.softDisabling: (ET.SOFT_DISABLE,),
 | 
						|
                   State.preEnabled: (ET.PRE_ENABLE,), State.overriding: (ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL)}
 | 
						|
ALL_STATES = tuple(State.schema.enumerants.values())
 | 
						|
# The event types checked in DISABLED section of state machine
 | 
						|
ENABLE_EVENT_TYPES = (ET.ENABLE, ET.PRE_ENABLE, ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL)
 | 
						|
 | 
						|
 | 
						|
def make_event(event_types):
 | 
						|
  event = {}
 | 
						|
  for ev in event_types:
 | 
						|
    event[ev] = NormalPermanentAlert("alert")
 | 
						|
  EVENTS[0] = event
 | 
						|
  return 0
 | 
						|
 | 
						|
 | 
						|
class TestStateMachine:
 | 
						|
  def setup_method(self):
 | 
						|
    self.events = Events()
 | 
						|
    self.state_machine = StateMachine()
 | 
						|
    self.state_machine.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
 | 
						|
 | 
						|
  def test_immediate_disable(self):
 | 
						|
    for state in ALL_STATES:
 | 
						|
      for et in MAINTAIN_STATES[state]:
 | 
						|
        self.events.add(make_event([et, ET.IMMEDIATE_DISABLE]))
 | 
						|
        self.state_machine.state = state
 | 
						|
        self.state_machine.update(self.events)
 | 
						|
        assert State.disabled == self.state_machine.state
 | 
						|
        self.events.clear()
 | 
						|
 | 
						|
  def test_user_disable(self):
 | 
						|
    for state in ALL_STATES:
 | 
						|
      for et in MAINTAIN_STATES[state]:
 | 
						|
        self.events.add(make_event([et, ET.USER_DISABLE]))
 | 
						|
        self.state_machine.state = state
 | 
						|
        self.state_machine.update(self.events)
 | 
						|
        assert State.disabled == self.state_machine.state
 | 
						|
        self.events.clear()
 | 
						|
 | 
						|
  def test_soft_disable(self):
 | 
						|
    for state in ALL_STATES:
 | 
						|
      if state == State.preEnabled:  # preEnabled considers NO_ENTRY instead
 | 
						|
        continue
 | 
						|
      for et in MAINTAIN_STATES[state]:
 | 
						|
        self.events.add(make_event([et, ET.SOFT_DISABLE]))
 | 
						|
        self.state_machine.state = state
 | 
						|
        self.state_machine.update(self.events)
 | 
						|
        assert self.state_machine.state == State.disabled if state == State.disabled else State.softDisabling
 | 
						|
        self.events.clear()
 | 
						|
 | 
						|
  def test_soft_disable_timer(self):
 | 
						|
    self.state_machine.state = State.enabled
 | 
						|
    self.events.add(make_event([ET.SOFT_DISABLE]))
 | 
						|
    self.state_machine.update(self.events)
 | 
						|
    for _ in range(int(SOFT_DISABLE_TIME / DT_CTRL)):
 | 
						|
      assert self.state_machine.state == State.softDisabling
 | 
						|
      self.state_machine.update(self.events)
 | 
						|
 | 
						|
    assert self.state_machine.state == State.disabled
 | 
						|
 | 
						|
  def test_no_entry(self):
 | 
						|
    # Make sure noEntry keeps us disabled
 | 
						|
    for et in ENABLE_EVENT_TYPES:
 | 
						|
      self.events.add(make_event([ET.NO_ENTRY, et]))
 | 
						|
      self.state_machine.update(self.events)
 | 
						|
      assert self.state_machine.state == State.disabled
 | 
						|
      self.events.clear()
 | 
						|
 | 
						|
  def test_no_entry_pre_enable(self):
 | 
						|
    # preEnabled with noEntry event
 | 
						|
    self.state_machine.state = State.preEnabled
 | 
						|
    self.events.add(make_event([ET.NO_ENTRY, ET.PRE_ENABLE]))
 | 
						|
    self.state_machine.update(self.events)
 | 
						|
    assert self.state_machine.state == State.preEnabled
 | 
						|
 | 
						|
  def test_maintain_states(self):
 | 
						|
    # Given current state's event type, we should maintain state
 | 
						|
    for state in ALL_STATES:
 | 
						|
      for et in MAINTAIN_STATES[state]:
 | 
						|
        self.state_machine.state = state
 | 
						|
        self.events.add(make_event([et]))
 | 
						|
        self.state_machine.update(self.events)
 | 
						|
        assert self.state_machine.state == state
 | 
						|
        self.events.clear()
 | 
						|
 |