system/ui: add centralized UIState singleton for global state management (#35397)
* add centralized UIState singleton for global state management * safely import ui_state * merge master * merge masterpull/35428/head
parent
b59841329b
commit
d46066225d
7 changed files with 183 additions and 60 deletions
@ -0,0 +1,129 @@ |
|||||||
|
import pyray as rl |
||||||
|
from enum import Enum |
||||||
|
from cereal import messaging, log |
||||||
|
from openpilot.common.params import Params, UnknownKeyName |
||||||
|
|
||||||
|
|
||||||
|
UI_BORDER_SIZE = 30 |
||||||
|
|
||||||
|
|
||||||
|
class UIStatus(Enum): |
||||||
|
DISENGAGED = "disengaged" |
||||||
|
ENGAGED = "engaged" |
||||||
|
OVERRIDE = "override" |
||||||
|
|
||||||
|
|
||||||
|
class UIState: |
||||||
|
_instance: 'UIState | None' = None |
||||||
|
|
||||||
|
def __new__(cls): |
||||||
|
if cls._instance is None: |
||||||
|
cls._instance = super().__new__(cls) |
||||||
|
cls._instance._initialize() |
||||||
|
return cls._instance |
||||||
|
|
||||||
|
def _initialize(self): |
||||||
|
self.params = Params() |
||||||
|
self.sm = messaging.SubMaster( |
||||||
|
[ |
||||||
|
"modelV2", |
||||||
|
"controlsState", |
||||||
|
"liveCalibration", |
||||||
|
"radarState", |
||||||
|
"deviceState", |
||||||
|
"pandaStates", |
||||||
|
"carParams", |
||||||
|
"driverMonitoringState", |
||||||
|
"carState", |
||||||
|
"driverStateV2", |
||||||
|
"roadCameraState", |
||||||
|
"wideRoadCameraState", |
||||||
|
"managerState", |
||||||
|
"selfdriveState", |
||||||
|
"longitudinalPlan", |
||||||
|
] |
||||||
|
) |
||||||
|
|
||||||
|
# UI Status tracking |
||||||
|
self.status: UIStatus = UIStatus.DISENGAGED |
||||||
|
self.started_frame: int = 0 |
||||||
|
self._engaged_prev: bool = False |
||||||
|
self._started_prev: bool = False |
||||||
|
|
||||||
|
# Core state variables |
||||||
|
self.is_metric: bool = self.params.get_bool("IsMetric") |
||||||
|
self.started: bool = False |
||||||
|
self.ignition: bool = False |
||||||
|
self.panda_type: log.PandaState.PandaType = log.PandaState.PandaType.unknown |
||||||
|
self.personality: log.LongitudinalPersonality = log.LongitudinalPersonality.standard |
||||||
|
self.light_sensor: float = -1.0 |
||||||
|
|
||||||
|
self._update_params() |
||||||
|
|
||||||
|
@property |
||||||
|
def engaged(self) -> bool: |
||||||
|
return self.started and self.sm["selfdriveState"].enabled |
||||||
|
|
||||||
|
def update(self) -> None: |
||||||
|
self.sm.update(0) |
||||||
|
self._update_state() |
||||||
|
self._update_status() |
||||||
|
|
||||||
|
def _update_state(self) -> None: |
||||||
|
# Handle panda states updates |
||||||
|
if self.sm.updated["pandaStates"]: |
||||||
|
panda_states = self.sm["pandaStates"] |
||||||
|
|
||||||
|
if len(panda_states) > 0: |
||||||
|
# Get panda type from first panda |
||||||
|
self.panda_type = panda_states[0].pandaType |
||||||
|
# Check ignition status across all pandas |
||||||
|
if self.panda_type != log.PandaState.PandaType.unknown: |
||||||
|
self.ignition = any(state.ignitionLine or state.ignitionCan for state in panda_states) |
||||||
|
elif self.sm.frame - self.sm.recv_frame["pandaStates"] > 5 * rl.get_fps(): |
||||||
|
self.panda_type = log.PandaState.PandaType.unknown |
||||||
|
|
||||||
|
# Handle wide road camera state updates |
||||||
|
if self.sm.updated["wideRoadCameraState"]: |
||||||
|
cam_state = self.sm["wideRoadCameraState"] |
||||||
|
|
||||||
|
# Scale factor based on sensor type |
||||||
|
scale = 6.0 if cam_state.sensor == 'ar0231' else 1.0 |
||||||
|
self.light_sensor = max(100.0 - scale * cam_state.exposureValPercent, 0.0) |
||||||
|
elif not self.sm.alive["wideRoadCameraState"] or not self.sm.valid["wideRoadCameraState"]: |
||||||
|
self.light_sensor = -1 |
||||||
|
|
||||||
|
# Update started state |
||||||
|
self.started = self.sm["deviceState"].started and self.ignition |
||||||
|
|
||||||
|
def _update_status(self) -> None: |
||||||
|
if self.started and self.sm.updated["selfdriveState"]: |
||||||
|
ss = self.sm["selfdriveState"] |
||||||
|
state = ss.state |
||||||
|
|
||||||
|
if state in (log.SelfdriveState.OpenpilotState.preEnabled, log.SelfdriveState.OpenpilotState.overriding): |
||||||
|
self.status = UIStatus.OVERRIDE |
||||||
|
else: |
||||||
|
self.status = UIStatus.ENGAGED if ss.enabled else UIStatus.DISENGAGED |
||||||
|
|
||||||
|
# Check for engagement state changes |
||||||
|
if self.engaged != self._engaged_prev: |
||||||
|
self._engaged_prev = self.engaged |
||||||
|
|
||||||
|
# Handle onroad/offroad transition |
||||||
|
if self.started != self._started_prev or self.sm.frame == 1: |
||||||
|
if self.started: |
||||||
|
self.status = UIStatus.DISENGAGED |
||||||
|
self.started_frame = self.sm.frame |
||||||
|
|
||||||
|
self._started_prev = self.started |
||||||
|
|
||||||
|
def _update_params(self) -> None: |
||||||
|
try: |
||||||
|
self.is_metric = self.params.get_bool("IsMetric") |
||||||
|
except UnknownKeyName: |
||||||
|
self.is_metric = False |
||||||
|
|
||||||
|
|
||||||
|
# Global instance |
||||||
|
ui_state = UIState() |
Loading…
Reference in new issue