You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					99 lines
				
				3.6 KiB
			
		
		
			
		
	
	
					99 lines
				
				3.6 KiB
			| 
								 
											1 year ago
										 
									 | 
							
								from cereal import log
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								from openpilot.selfdrive.selfdrived.events import Events, ET
							 | 
						||
| 
								 
											1 year ago
										 
									 | 
							
								from openpilot.common.realtime import DT_CTRL
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								State = log.SelfdriveState.OpenpilotState
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								SOFT_DISABLE_TIME = 3  # seconds
							 | 
						||
| 
								 | 
							
								ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding)
							 | 
						||
| 
								 | 
							
								ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class StateMachine:
							 | 
						||
| 
								 | 
							
								  def __init__(self):
							 | 
						||
| 
								 | 
							
								    self.current_alert_types = [ET.PERMANENT]
							 | 
						||
| 
								 | 
							
								    self.state = State.disabled
							 | 
						||
| 
								 | 
							
								    self.soft_disable_timer = 0
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  def update(self, events: Events):
							 | 
						||
| 
								 | 
							
								    # decrement the soft disable timer at every step, as it's reset on
							 | 
						||
| 
								 | 
							
								    # entrance in SOFT_DISABLING state
							 | 
						||
| 
								 | 
							
								    self.soft_disable_timer = max(0, self.soft_disable_timer - 1)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    self.current_alert_types = [ET.PERMANENT]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # ENABLED, SOFT DISABLING, PRE ENABLING, OVERRIDING
							 | 
						||
| 
								 | 
							
								    if self.state != State.disabled:
							 | 
						||
| 
								 | 
							
								      # user and immediate disable always have priority in a non-disabled state
							 | 
						||
| 
								 | 
							
								      if events.contains(ET.USER_DISABLE):
							 | 
						||
| 
								 | 
							
								        self.state = State.disabled
							 | 
						||
| 
								 | 
							
								        self.current_alert_types.append(ET.USER_DISABLE)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      elif events.contains(ET.IMMEDIATE_DISABLE):
							 | 
						||
| 
								 | 
							
								        self.state = State.disabled
							 | 
						||
| 
								 | 
							
								        self.current_alert_types.append(ET.IMMEDIATE_DISABLE)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      else:
							 | 
						||
| 
								 | 
							
								        # ENABLED
							 | 
						||
| 
								 | 
							
								        if self.state == State.enabled:
							 | 
						||
| 
								 | 
							
								          if events.contains(ET.SOFT_DISABLE):
							 | 
						||
| 
								 | 
							
								            self.state = State.softDisabling
							 | 
						||
| 
								 | 
							
								            self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
							 | 
						||
| 
								 | 
							
								            self.current_alert_types.append(ET.SOFT_DISABLE)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								          elif events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL):
							 | 
						||
| 
								 | 
							
								            self.state = State.overriding
							 | 
						||
| 
								 | 
							
								            self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # SOFT DISABLING
							 | 
						||
| 
								 | 
							
								        elif self.state == State.softDisabling:
							 | 
						||
| 
								 | 
							
								          if not events.contains(ET.SOFT_DISABLE):
							 | 
						||
| 
								 | 
							
								            # no more soft disabling condition, so go back to ENABLED
							 | 
						||
| 
								 | 
							
								            self.state = State.enabled
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								          elif self.soft_disable_timer > 0:
							 | 
						||
| 
								 | 
							
								            self.current_alert_types.append(ET.SOFT_DISABLE)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								          elif self.soft_disable_timer <= 0:
							 | 
						||
| 
								 | 
							
								            self.state = State.disabled
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # PRE ENABLING
							 | 
						||
| 
								 | 
							
								        elif self.state == State.preEnabled:
							 | 
						||
| 
								 | 
							
								          if not events.contains(ET.PRE_ENABLE):
							 | 
						||
| 
								 | 
							
								            self.state = State.enabled
							 | 
						||
| 
								 | 
							
								          else:
							 | 
						||
| 
								 | 
							
								            self.current_alert_types.append(ET.PRE_ENABLE)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        # OVERRIDING
							 | 
						||
| 
								 | 
							
								        elif self.state == State.overriding:
							 | 
						||
| 
								 | 
							
								          if events.contains(ET.SOFT_DISABLE):
							 | 
						||
| 
								 | 
							
								            self.state = State.softDisabling
							 | 
						||
| 
								 | 
							
								            self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
							 | 
						||
| 
								 | 
							
								            self.current_alert_types.append(ET.SOFT_DISABLE)
							 | 
						||
| 
								 | 
							
								          elif not (events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL)):
							 | 
						||
| 
								 | 
							
								            self.state = State.enabled
							 | 
						||
| 
								 | 
							
								          else:
							 | 
						||
| 
								 | 
							
								            self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # DISABLED
							 | 
						||
| 
								 | 
							
								    elif self.state == State.disabled:
							 | 
						||
| 
								 | 
							
								      if events.contains(ET.ENABLE):
							 | 
						||
| 
								 | 
							
								        if events.contains(ET.NO_ENTRY):
							 | 
						||
| 
								 | 
							
								          self.current_alert_types.append(ET.NO_ENTRY)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        else:
							 | 
						||
| 
								 | 
							
								          if events.contains(ET.PRE_ENABLE):
							 | 
						||
| 
								 | 
							
								            self.state = State.preEnabled
							 | 
						||
| 
								 | 
							
								          elif events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL):
							 | 
						||
| 
								 | 
							
								            self.state = State.overriding
							 | 
						||
| 
								 | 
							
								          else:
							 | 
						||
| 
								 | 
							
								            self.state = State.enabled
							 | 
						||
| 
								 | 
							
								          self.current_alert_types.append(ET.ENABLE)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    # Check if openpilot is engaged and actuators are enabled
							 | 
						||
| 
								 | 
							
								    enabled = self.state in ENABLED_STATES
							 | 
						||
| 
								 | 
							
								    active = self.state in ACTIVE_STATES
							 | 
						||
| 
								 | 
							
								    if active:
							 | 
						||
| 
								 | 
							
								      self.current_alert_types.append(ET.WARNING)
							 | 
						||
| 
								 | 
							
								    return enabled, active
							 | 
						||
| 
								 | 
							
								
							 |