You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
98 lines
3.6 KiB
98 lines
3.6 KiB
from cereal import log
|
|
from openpilot.selfdrive.selfdrived.events import Events, ET
|
|
from openpilot.common.realtime import DT_CTRL
|
|
|
|
State = log.SelfdriveState.OpenpilotState
|
|
|
|
SOFT_DISABLE_TIME = 3 # seconds
|
|
ACTIVE_STATES = (State.enabled, State.softDisabling, State.overriding)
|
|
ENABLED_STATES = (State.preEnabled, *ACTIVE_STATES)
|
|
|
|
class StateMachine:
|
|
def __init__(self):
|
|
self.current_alert_types = [ET.PERMANENT]
|
|
self.state = State.disabled
|
|
self.soft_disable_timer = 0
|
|
|
|
def update(self, events: Events):
|
|
# decrement the soft disable timer at every step, as it's reset on
|
|
# entrance in SOFT_DISABLING state
|
|
self.soft_disable_timer = max(0, self.soft_disable_timer - 1)
|
|
|
|
self.current_alert_types = [ET.PERMANENT]
|
|
|
|
# ENABLED, SOFT DISABLING, PRE ENABLING, OVERRIDING
|
|
if self.state != State.disabled:
|
|
# user and immediate disable always have priority in a non-disabled state
|
|
if events.contains(ET.USER_DISABLE):
|
|
self.state = State.disabled
|
|
self.current_alert_types.append(ET.USER_DISABLE)
|
|
|
|
elif events.contains(ET.IMMEDIATE_DISABLE):
|
|
self.state = State.disabled
|
|
self.current_alert_types.append(ET.IMMEDIATE_DISABLE)
|
|
|
|
else:
|
|
# ENABLED
|
|
if self.state == State.enabled:
|
|
if events.contains(ET.SOFT_DISABLE):
|
|
self.state = State.softDisabling
|
|
self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
|
|
self.current_alert_types.append(ET.SOFT_DISABLE)
|
|
|
|
elif events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL):
|
|
self.state = State.overriding
|
|
self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
|
|
|
|
# SOFT DISABLING
|
|
elif self.state == State.softDisabling:
|
|
if not events.contains(ET.SOFT_DISABLE):
|
|
# no more soft disabling condition, so go back to ENABLED
|
|
self.state = State.enabled
|
|
|
|
elif self.soft_disable_timer > 0:
|
|
self.current_alert_types.append(ET.SOFT_DISABLE)
|
|
|
|
elif self.soft_disable_timer <= 0:
|
|
self.state = State.disabled
|
|
|
|
# PRE ENABLING
|
|
elif self.state == State.preEnabled:
|
|
if not events.contains(ET.PRE_ENABLE):
|
|
self.state = State.enabled
|
|
else:
|
|
self.current_alert_types.append(ET.PRE_ENABLE)
|
|
|
|
# OVERRIDING
|
|
elif self.state == State.overriding:
|
|
if events.contains(ET.SOFT_DISABLE):
|
|
self.state = State.softDisabling
|
|
self.soft_disable_timer = int(SOFT_DISABLE_TIME / DT_CTRL)
|
|
self.current_alert_types.append(ET.SOFT_DISABLE)
|
|
elif not (events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL)):
|
|
self.state = State.enabled
|
|
else:
|
|
self.current_alert_types += [ET.OVERRIDE_LATERAL, ET.OVERRIDE_LONGITUDINAL]
|
|
|
|
# DISABLED
|
|
elif self.state == State.disabled:
|
|
if events.contains(ET.ENABLE):
|
|
if events.contains(ET.NO_ENTRY):
|
|
self.current_alert_types.append(ET.NO_ENTRY)
|
|
|
|
else:
|
|
if events.contains(ET.PRE_ENABLE):
|
|
self.state = State.preEnabled
|
|
elif events.contains(ET.OVERRIDE_LATERAL) or events.contains(ET.OVERRIDE_LONGITUDINAL):
|
|
self.state = State.overriding
|
|
else:
|
|
self.state = State.enabled
|
|
self.current_alert_types.append(ET.ENABLE)
|
|
|
|
# Check if openpilot is engaged and actuators are enabled
|
|
enabled = self.state in ENABLED_STATES
|
|
active = self.state in ACTIVE_STATES
|
|
if active:
|
|
self.current_alert_types.append(ET.WARNING)
|
|
return enabled, active
|
|
|
|
|