from cereal import log from openpilot.selfdrive.selfdrived.events import Events, ET from openpilot.common.realtime import DT_CTRL State = log.SelfdriveState.OpenpilotState SOFT_DISABLE_TIME = 3 # seconds ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding) ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES) class StateMachine: def __init__(self): self.current_alert_types = [ET.PERMANENT] self.state = State.disabled self.soft_disable_timer = 0 def update(self, events: Events): # decrement the soft disable timer at every step, as it's reset on # entrance in SOFT_DISABLING state self.soft_disable_timer = max(0, self.soft_disable_timer - 1) self.current_alert_types = [ET.PERMANENT] # ENABLED, SOFT DISABLING, PRE ENABLING, OVERRIDING if self.state != State.disabled: # user and immediate disable always have priority in a non-disabled state if events.contains(ET.USER_DISABLE): self.state = State.disabled self.current_alert_types.append(ET.USER_DISABLE) elif events.contains(ET.IMMEDIATE_DISABLE): self.state = State.disabled self.current_alert_types.append(ET.IMMEDIATE_DISABLE) else: # ENABLED if self.state == State.enabled: if events.contains(ET.SOFT_DISABLE): self.state = State.softDisabling self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL) self.current_alert_types.append(ET.SOFT_DISABLE) elif events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL): self.state = State.overriding self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL] # SOFT DISABLING elif self.state == State.softDisabling: if not events.contains(ET.SOFT_DISABLE): # no more soft disabling condition, so go back to ENABLED self.state = State.enabled elif self.soft_disable_timer > 0: self.current_alert_types.append(ET.SOFT_DISABLE) elif self.soft_disable_timer <= 0: self.state = State.disabled # PRE ENABLING elif self.state == State.preEnabled: if not events.contains(ET.PRE_ENABLE): self.state = State.enabled else: self.current_alert_types.append(ET.PRE_ENABLE) # OVERRIDING elif self.state == State.overriding: if events.contains(ET.SOFT_DISABLE): self.state = State.softDisabling self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL) self.current_alert_types.append(ET.SOFT_DISABLE) elif not (events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL)): self.state = State.enabled else: self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL] # DISABLED elif self.state == State.disabled: if events.contains(ET.ENABLE): if events.contains(ET.NO_ENTRY): self.current_alert_types.append(ET.NO_ENTRY) else: if events.contains(ET.PRE_ENABLE): self.state = State.preEnabled elif events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL): self.state = State.overriding else: self.state = State.enabled self.current_alert_types.append(ET.ENABLE) # Check if openpilot is engaged and actuators are enabled enabled = self.state in ENABLED_STATES active = self.state in ACTIVE_STATES if active: self.current_alert_types.append(ET.WARNING) return enabled, active