You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							98 lines
						
					
					
						
							3.6 KiB
						
					
					
				
			
		
		
	
	
							98 lines
						
					
					
						
							3.6 KiB
						
					
					
				| from cereal import log
 | |
| from openpilot.selfdrive.selfdrived.events import Events, ET
 | |
| from openpilot.common.realtime import DT_CTRL
 | |
| 
 | |
| State = log.SelfdriveState.OpenpilotState
 | |
| 
 | |
| SOFT_DISABLE_TIME = 3  # seconds
 | |
| ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding)
 | |
| ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES)
 | |
| 
 | |
| class StateMachine:
 | |
|   def __init__(self):
 | |
|     self.current_alert_types = [ET.PERMANENT]
 | |
|     self.state = State.disabled
 | |
|     self.soft_disable_timer = 0
 | |
| 
 | |
|   def update(self, events: Events):
 | |
|     # decrement the soft disable timer at every step, as it's reset on
 | |
|     # entrance in SOFT_DISABLING state
 | |
|     self.soft_disable_timer = max(0, self.soft_disable_timer - 1)
 | |
| 
 | |
|     self.current_alert_types = [ET.PERMANENT]
 | |
| 
 | |
|     # ENABLED, SOFT DISABLING, PRE ENABLING, OVERRIDING
 | |
|     if self.state != State.disabled:
 | |
|       # user and immediate disable always have priority in a non-disabled state
 | |
|       if events.contains(ET.USER_DISABLE):
 | |
|         self.state = State.disabled
 | |
|         self.current_alert_types.append(ET.USER_DISABLE)
 | |
| 
 | |
|       elif events.contains(ET.IMMEDIATE_DISABLE):
 | |
|         self.state = State.disabled
 | |
|         self.current_alert_types.append(ET.IMMEDIATE_DISABLE)
 | |
| 
 | |
|       else:
 | |
|         # ENABLED
 | |
|         if self.state == State.enabled:
 | |
|           if events.contains(ET.SOFT_DISABLE):
 | |
|             self.state = State.softDisabling
 | |
|             self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
 | |
|             self.current_alert_types.append(ET.SOFT_DISABLE)
 | |
| 
 | |
|           elif events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL):
 | |
|             self.state = State.overriding
 | |
|             self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
 | |
| 
 | |
|         # SOFT DISABLING
 | |
|         elif self.state == State.softDisabling:
 | |
|           if not events.contains(ET.SOFT_DISABLE):
 | |
|             # no more soft disabling condition, so go back to ENABLED
 | |
|             self.state = State.enabled
 | |
| 
 | |
|           elif self.soft_disable_timer > 0:
 | |
|             self.current_alert_types.append(ET.SOFT_DISABLE)
 | |
| 
 | |
|           elif self.soft_disable_timer <= 0:
 | |
|             self.state = State.disabled
 | |
| 
 | |
|         # PRE ENABLING
 | |
|         elif self.state == State.preEnabled:
 | |
|           if not events.contains(ET.PRE_ENABLE):
 | |
|             self.state = State.enabled
 | |
|           else:
 | |
|             self.current_alert_types.append(ET.PRE_ENABLE)
 | |
| 
 | |
|         # OVERRIDING
 | |
|         elif self.state == State.overriding:
 | |
|           if events.contains(ET.SOFT_DISABLE):
 | |
|             self.state = State.softDisabling
 | |
|             self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
 | |
|             self.current_alert_types.append(ET.SOFT_DISABLE)
 | |
|           elif not (events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL)):
 | |
|             self.state = State.enabled
 | |
|           else:
 | |
|             self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
 | |
| 
 | |
|     # DISABLED
 | |
|     elif self.state == State.disabled:
 | |
|       if events.contains(ET.ENABLE):
 | |
|         if events.contains(ET.NO_ENTRY):
 | |
|           self.current_alert_types.append(ET.NO_ENTRY)
 | |
| 
 | |
|         else:
 | |
|           if events.contains(ET.PRE_ENABLE):
 | |
|             self.state = State.preEnabled
 | |
|           elif events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL):
 | |
|             self.state = State.overriding
 | |
|           else:
 | |
|             self.state = State.enabled
 | |
|           self.current_alert_types.append(ET.ENABLE)
 | |
| 
 | |
|     # Check if openpilot is engaged and actuators are enabled
 | |
|     enabled = self.state in ENABLED_STATES
 | |
|     active = self.state in ACTIVE_STATES
 | |
|     if active:
 | |
|       self.current_alert_types.append(ET.WARNING)
 | |
|     return enabled, active
 | |
| 
 | |
| 
 |