import pyray as rl import time from collections.abc import Callable from enum import Enum from cereal import messaging, log from openpilot.common.params import Params, UnknownKeyName from openpilot.selfdrive.ui.lib.prime_state import PrimeState UI_BORDER_SIZE = 30 class UIStatus(Enum): DISENGAGED = "disengaged" ENGAGED = "engaged" OVERRIDE = "override" class UIState: _instance: 'UIState | None' = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialize() return cls._instance def _initialize(self): self.params = Params() self.sm = messaging.SubMaster( [ "modelV2", "controlsState", "liveCalibration", "radarState", "deviceState", "pandaStates", "carParams", "driverMonitoringState", "carState", "driverStateV2", "roadCameraState", "wideRoadCameraState", "managerState", "selfdriveState", "longitudinalPlan", ] ) self.prime_state = PrimeState() # UI Status tracking self.status: UIStatus = UIStatus.DISENGAGED self.started_frame: int = 0 self._engaged_prev: bool = False self._started_prev: bool = False # Core state variables self.is_metric: bool = self.params.get_bool("IsMetric") self.started: bool = False self.ignition: bool = False self.panda_type: log.PandaState.PandaType = log.PandaState.PandaType.unknown self.personality: log.LongitudinalPersonality = log.LongitudinalPersonality.standard self.light_sensor: float = -1.0 self._update_params() @property def engaged(self) -> bool: return self.started and self.sm["selfdriveState"].enabled def is_onroad(self) -> bool: return self.started def is_offroad(self) -> bool: return not self.started def update(self) -> None: self.sm.update(0) self._update_state() self._update_status() device.update() def _update_state(self) -> None: # Handle panda states updates if self.sm.updated["pandaStates"]: panda_states = self.sm["pandaStates"] if len(panda_states) > 0: # Get panda type from first panda self.panda_type = panda_states[0].pandaType # Check ignition status across all pandas if self.panda_type != log.PandaState.PandaType.unknown: self.ignition = any(state.ignitionLine or state.ignitionCan for state in panda_states) elif self.sm.frame - self.sm.recv_frame["pandaStates"] > 5 * rl.get_fps(): self.panda_type = log.PandaState.PandaType.unknown # Handle wide road camera state updates if self.sm.updated["wideRoadCameraState"]: cam_state = self.sm["wideRoadCameraState"] # Scale factor based on sensor type scale = 6.0 if cam_state.sensor == 'ar0231' else 1.0 self.light_sensor = max(100.0 - scale * cam_state.exposureValPercent, 0.0) elif not self.sm.alive["wideRoadCameraState"] or not self.sm.valid["wideRoadCameraState"]: self.light_sensor = -1 # Update started state self.started = self.sm["deviceState"].started and self.ignition def _update_status(self) -> None: if self.started and self.sm.updated["selfdriveState"]: ss = self.sm["selfdriveState"] state = ss.state if state in (log.SelfdriveState.OpenpilotState.preEnabled, log.SelfdriveState.OpenpilotState.overriding): self.status = UIStatus.OVERRIDE else: self.status = UIStatus.ENGAGED if ss.enabled else UIStatus.DISENGAGED # Check for engagement state changes if self.engaged != self._engaged_prev: self._engaged_prev = self.engaged # Handle onroad/offroad transition if self.started != self._started_prev or self.sm.frame == 1: if self.started: self.status = UIStatus.DISENGAGED self.started_frame = self.sm.frame self._started_prev = self.started def _update_params(self) -> None: try: self.is_metric = self.params.get_bool("IsMetric") except UnknownKeyName: self.is_metric = False class Device: def __init__(self): self._ignition = False self._interaction_time: float = 0.0 self._interactive_timeout_callbacks: list[Callable] = [] self._prev_timed_out = False self.reset_interactive_timeout() def reset_interactive_timeout(self, timeout: int = -1) -> None: if timeout == -1: timeout = 10 if ui_state.ignition else 30 self._interaction_time = time.monotonic() + timeout def add_interactive_timeout_callback(self, callback: Callable): self._interactive_timeout_callbacks.append(callback) def update(self): # Handle interactive timeout ignition_just_turned_off = not ui_state.ignition and self._ignition self._ignition = ui_state.ignition interaction_timeout = time.monotonic() > self._interaction_time if ignition_just_turned_off or rl.is_mouse_button_down(rl.MouseButton.MOUSE_BUTTON_LEFT): self.reset_interactive_timeout() elif interaction_timeout and not self._prev_timed_out: for callback in self._interactive_timeout_callbacks: callback() self._prev_timed_out = interaction_timeout # Global instance ui_state = UIState() device = Device()