controlsd: pull out selfdrive state machine (#33477)
	
		
	
				
					
				
			* controlsd: pull out selfdrive state machine * cleanup test * cleanupvw-mqb-aeb
							parent
							
								
									54a2626e44
								
							
						
					
					
						commit
						77f4f57e73
					
				
				 3 changed files with 148 additions and 147 deletions
			
			
		@ -0,0 +1,98 @@ | 
				
			||||
from cereal import log | 
				
			||||
from openpilot.selfdrive.controls.lib.events import Events, ET | 
				
			||||
from openpilot.common.realtime import DT_CTRL | 
				
			||||
 | 
				
			||||
State = log.SelfdriveState.OpenpilotState | 
				
			||||
 | 
				
			||||
SOFT_DISABLE_TIME = 3  # seconds | 
				
			||||
ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding) | 
				
			||||
ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES) | 
				
			||||
 | 
				
			||||
class StateMachine: | 
				
			||||
  def __init__(self): | 
				
			||||
    self.current_alert_types = [ET.PERMANENT] | 
				
			||||
    self.state = State.disabled | 
				
			||||
    self.soft_disable_timer = 0 | 
				
			||||
 | 
				
			||||
  def update(self, events: Events): | 
				
			||||
    # decrement the soft disable timer at every step, as it's reset on | 
				
			||||
    # entrance in SOFT_DISABLING state | 
				
			||||
    self.soft_disable_timer = max(0, self.soft_disable_timer - 1) | 
				
			||||
 | 
				
			||||
    self.current_alert_types = [ET.PERMANENT] | 
				
			||||
 | 
				
			||||
    # ENABLED, SOFT DISABLING, PRE ENABLING, OVERRIDING | 
				
			||||
    if self.state != State.disabled: | 
				
			||||
      # user and immediate disable always have priority in a non-disabled state | 
				
			||||
      if events.contains(ET.USER_DISABLE): | 
				
			||||
        self.state = State.disabled | 
				
			||||
        self.current_alert_types.append(ET.USER_DISABLE) | 
				
			||||
 | 
				
			||||
      elif events.contains(ET.IMMEDIATE_DISABLE): | 
				
			||||
        self.state = State.disabled | 
				
			||||
        self.current_alert_types.append(ET.IMMEDIATE_DISABLE) | 
				
			||||
 | 
				
			||||
      else: | 
				
			||||
        # ENABLED | 
				
			||||
        if self.state == State.enabled: | 
				
			||||
          if events.contains(ET.SOFT_DISABLE): | 
				
			||||
            self.state = State.softDisabling | 
				
			||||
            self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL) | 
				
			||||
            self.current_alert_types.append(ET.SOFT_DISABLE) | 
				
			||||
 | 
				
			||||
          elif events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL): | 
				
			||||
            self.state = State.overriding | 
				
			||||
            self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL] | 
				
			||||
 | 
				
			||||
        # SOFT DISABLING | 
				
			||||
        elif self.state == State.softDisabling: | 
				
			||||
          if not events.contains(ET.SOFT_DISABLE): | 
				
			||||
            # no more soft disabling condition, so go back to ENABLED | 
				
			||||
            self.state = State.enabled | 
				
			||||
 | 
				
			||||
          elif self.soft_disable_timer > 0: | 
				
			||||
            self.current_alert_types.append(ET.SOFT_DISABLE) | 
				
			||||
 | 
				
			||||
          elif self.soft_disable_timer <= 0: | 
				
			||||
            self.state = State.disabled | 
				
			||||
 | 
				
			||||
        # PRE ENABLING | 
				
			||||
        elif self.state == State.preEnabled: | 
				
			||||
          if not events.contains(ET.PRE_ENABLE): | 
				
			||||
            self.state = State.enabled | 
				
			||||
          else: | 
				
			||||
            self.current_alert_types.append(ET.PRE_ENABLE) | 
				
			||||
 | 
				
			||||
        # OVERRIDING | 
				
			||||
        elif self.state == State.overriding: | 
				
			||||
          if events.contains(ET.SOFT_DISABLE): | 
				
			||||
            self.state = State.softDisabling | 
				
			||||
            self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL) | 
				
			||||
            self.current_alert_types.append(ET.SOFT_DISABLE) | 
				
			||||
          elif not (events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL)): | 
				
			||||
            self.state = State.enabled | 
				
			||||
          else: | 
				
			||||
            self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL] | 
				
			||||
 | 
				
			||||
    # DISABLED | 
				
			||||
    elif self.state == State.disabled: | 
				
			||||
      if events.contains(ET.ENABLE): | 
				
			||||
        if events.contains(ET.NO_ENTRY): | 
				
			||||
          self.current_alert_types.append(ET.NO_ENTRY) | 
				
			||||
 | 
				
			||||
        else: | 
				
			||||
          if events.contains(ET.PRE_ENABLE): | 
				
			||||
            self.state = State.preEnabled | 
				
			||||
          elif events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL): | 
				
			||||
            self.state = State.overriding | 
				
			||||
          else: | 
				
			||||
            self.state = State.enabled | 
				
			||||
          self.current_alert_types.append(ET.ENABLE) | 
				
			||||
 | 
				
			||||
    # Check if openpilot is engaged and actuators are enabled | 
				
			||||
    enabled = self.state in ENABLED_STATES | 
				
			||||
    active = self.state in ACTIVE_STATES | 
				
			||||
    if active: | 
				
			||||
      self.current_alert_types.append(ET.WARNING) | 
				
			||||
    return enabled, active | 
				
			||||
 | 
				
			||||
					Loading…
					
					
				
		Reference in new issue