You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							227 lines
						
					
					
						
							7.2 KiB
						
					
					
				
			
		
		
	
	
							227 lines
						
					
					
						
							7.2 KiB
						
					
					
				import pyray as rl
 | 
						|
import numpy as np
 | 
						|
import time
 | 
						|
import threading
 | 
						|
from collections.abc import Callable
 | 
						|
from enum import Enum
 | 
						|
from cereal import messaging, log
 | 
						|
from openpilot.common.filter_simple import FirstOrderFilter
 | 
						|
from openpilot.common.params import Params, UnknownKeyName
 | 
						|
from openpilot.common.swaglog import cloudlog
 | 
						|
from openpilot.selfdrive.ui.lib.prime_state import PrimeState
 | 
						|
from openpilot.system.ui.lib.application import DEFAULT_FPS
 | 
						|
from openpilot.system.hardware import HARDWARE
 | 
						|
from openpilot.system.ui.lib.application import gui_app
 | 
						|
 | 
						|
UI_BORDER_SIZE = 30
 | 
						|
BACKLIGHT_OFFROAD = 50
 | 
						|
 | 
						|
 | 
						|
class UIStatus(Enum):
 | 
						|
  DISENGAGED = "disengaged"
 | 
						|
  ENGAGED = "engaged"
 | 
						|
  OVERRIDE = "override"
 | 
						|
 | 
						|
 | 
						|
class UIState:
 | 
						|
  _instance: 'UIState | None' = None
 | 
						|
 | 
						|
  def __new__(cls):
 | 
						|
    if cls._instance is None:
 | 
						|
      cls._instance = super().__new__(cls)
 | 
						|
      cls._instance._initialize()
 | 
						|
    return cls._instance
 | 
						|
 | 
						|
  def _initialize(self):
 | 
						|
    self.params = Params()
 | 
						|
    self.sm = messaging.SubMaster(
 | 
						|
      [
 | 
						|
        "modelV2",
 | 
						|
        "controlsState",
 | 
						|
        "liveCalibration",
 | 
						|
        "radarState",
 | 
						|
        "deviceState",
 | 
						|
        "pandaStates",
 | 
						|
        "carParams",
 | 
						|
        "driverMonitoringState",
 | 
						|
        "carState",
 | 
						|
        "driverStateV2",
 | 
						|
        "roadCameraState",
 | 
						|
        "wideRoadCameraState",
 | 
						|
        "managerState",
 | 
						|
        "selfdriveState",
 | 
						|
        "longitudinalPlan",
 | 
						|
      ]
 | 
						|
    )
 | 
						|
 | 
						|
    self.prime_state = PrimeState()
 | 
						|
 | 
						|
    # UI Status tracking
 | 
						|
    self.status: UIStatus = UIStatus.DISENGAGED
 | 
						|
    self.started_frame: int = 0
 | 
						|
    self._engaged_prev: bool = False
 | 
						|
    self._started_prev: bool = False
 | 
						|
 | 
						|
    # Core state variables
 | 
						|
    self.is_metric: bool = self.params.get_bool("IsMetric")
 | 
						|
    self.started: bool = False
 | 
						|
    self.ignition: bool = False
 | 
						|
    self.panda_type: log.PandaState.PandaType = log.PandaState.PandaType.unknown
 | 
						|
    self.personality: log.LongitudinalPersonality = log.LongitudinalPersonality.standard
 | 
						|
    self.light_sensor: float = -1.0
 | 
						|
 | 
						|
    self._update_params()
 | 
						|
 | 
						|
  @property
 | 
						|
  def engaged(self) -> bool:
 | 
						|
    return self.started and self.sm["selfdriveState"].enabled
 | 
						|
 | 
						|
  def is_onroad(self) -> bool:
 | 
						|
    return self.started
 | 
						|
 | 
						|
  def is_offroad(self) -> bool:
 | 
						|
    return not self.started
 | 
						|
 | 
						|
  def update(self) -> None:
 | 
						|
    self.sm.update(0)
 | 
						|
    self._update_state()
 | 
						|
    self._update_status()
 | 
						|
    device.update()
 | 
						|
 | 
						|
  def _update_state(self) -> None:
 | 
						|
    # Handle panda states updates
 | 
						|
    if self.sm.updated["pandaStates"]:
 | 
						|
      panda_states = self.sm["pandaStates"]
 | 
						|
 | 
						|
      if len(panda_states) > 0:
 | 
						|
        # Get panda type from first panda
 | 
						|
        self.panda_type = panda_states[0].pandaType
 | 
						|
        # Check ignition status across all pandas
 | 
						|
        if self.panda_type != log.PandaState.PandaType.unknown:
 | 
						|
          self.ignition = any(state.ignitionLine or state.ignitionCan for state in panda_states)
 | 
						|
    elif self.sm.frame - self.sm.recv_frame["pandaStates"] > 5 * rl.get_fps():
 | 
						|
      self.panda_type = log.PandaState.PandaType.unknown
 | 
						|
 | 
						|
    # Handle wide road camera state updates
 | 
						|
    if self.sm.updated["wideRoadCameraState"]:
 | 
						|
      cam_state = self.sm["wideRoadCameraState"]
 | 
						|
      self.light_sensor = max(100.0 - cam_state.exposureValPercent, 0.0)
 | 
						|
    elif not self.sm.alive["wideRoadCameraState"] or not self.sm.valid["wideRoadCameraState"]:
 | 
						|
      self.light_sensor = -1
 | 
						|
 | 
						|
    # Update started state
 | 
						|
    self.started = self.sm["deviceState"].started and self.ignition
 | 
						|
 | 
						|
  def _update_status(self) -> None:
 | 
						|
    if self.started and self.sm.updated["selfdriveState"]:
 | 
						|
      ss = self.sm["selfdriveState"]
 | 
						|
      state = ss.state
 | 
						|
 | 
						|
      if state in (log.SelfdriveState.OpenpilotState.preEnabled, log.SelfdriveState.OpenpilotState.overriding):
 | 
						|
        self.status = UIStatus.OVERRIDE
 | 
						|
      else:
 | 
						|
        self.status = UIStatus.ENGAGED if ss.enabled else UIStatus.DISENGAGED
 | 
						|
 | 
						|
    # Check for engagement state changes
 | 
						|
    if self.engaged != self._engaged_prev:
 | 
						|
      self._engaged_prev = self.engaged
 | 
						|
 | 
						|
    # Handle onroad/offroad transition
 | 
						|
    if self.started != self._started_prev or self.sm.frame == 1:
 | 
						|
      if self.started:
 | 
						|
        self.status = UIStatus.DISENGAGED
 | 
						|
        self.started_frame = self.sm.frame
 | 
						|
 | 
						|
      self._started_prev = self.started
 | 
						|
 | 
						|
  def _update_params(self) -> None:
 | 
						|
    try:
 | 
						|
      self.is_metric = self.params.get_bool("IsMetric")
 | 
						|
    except UnknownKeyName:
 | 
						|
      self.is_metric = False
 | 
						|
 | 
						|
 | 
						|
class Device:
 | 
						|
  def __init__(self):
 | 
						|
    self._ignition = False
 | 
						|
    self._interaction_time: float = -1
 | 
						|
    self._interactive_timeout_callbacks: list[Callable] = []
 | 
						|
    self._prev_timed_out = False
 | 
						|
    self._awake = False
 | 
						|
 | 
						|
    self._offroad_brightness: int = BACKLIGHT_OFFROAD
 | 
						|
    self._last_brightness: int = 0
 | 
						|
    self._brightness_filter = FirstOrderFilter(BACKLIGHT_OFFROAD, 10.00, 1 / DEFAULT_FPS)
 | 
						|
    self._brightness_thread: threading.Thread | None = None
 | 
						|
 | 
						|
  def reset_interactive_timeout(self, timeout: int = -1) -> None:
 | 
						|
    if timeout == -1:
 | 
						|
      timeout = 10 if ui_state.ignition else 30
 | 
						|
    self._interaction_time = time.monotonic() + timeout
 | 
						|
 | 
						|
  def add_interactive_timeout_callback(self, callback: Callable):
 | 
						|
    self._interactive_timeout_callbacks.append(callback)
 | 
						|
 | 
						|
  def update(self):
 | 
						|
    # do initial reset
 | 
						|
    if self._interaction_time <= 0:
 | 
						|
      self.reset_interactive_timeout()
 | 
						|
 | 
						|
    self._update_brightness()
 | 
						|
    self._update_wakefulness()
 | 
						|
 | 
						|
  def set_offroad_brightness(self, brightness: int):
 | 
						|
    # TODO: not yet used, should be used in prime widget for QR code, etc.
 | 
						|
    self._offroad_brightness = min(max(brightness, 0), 100)
 | 
						|
 | 
						|
  def _update_brightness(self):
 | 
						|
    clipped_brightness = self._offroad_brightness
 | 
						|
 | 
						|
    if ui_state.started and ui_state.light_sensor >= 0:
 | 
						|
      clipped_brightness = ui_state.light_sensor
 | 
						|
 | 
						|
      # CIE 1931 - https://www.photonstophotos.net/GeneralTopics/Exposure/Psychometric_Lightness_and_Gamma.htm
 | 
						|
      if clipped_brightness <= 8:
 | 
						|
        clipped_brightness = clipped_brightness / 903.3
 | 
						|
      else:
 | 
						|
        clipped_brightness = ((clipped_brightness + 16.0) / 116.0) ** 3.0
 | 
						|
 | 
						|
      clipped_brightness = float(np.clip(100 * clipped_brightness, 10, 100))
 | 
						|
 | 
						|
    brightness = round(self._brightness_filter.update(clipped_brightness))
 | 
						|
    if not self._awake:
 | 
						|
      brightness = 0
 | 
						|
 | 
						|
    if brightness != self._last_brightness:
 | 
						|
      if self._brightness_thread is None or not self._brightness_thread.is_alive():
 | 
						|
        cloudlog.debug(f"setting display brightness {brightness}")
 | 
						|
        self._brightness_thread = threading.Thread(target=HARDWARE.set_screen_brightness, args=(brightness,))
 | 
						|
        self._brightness_thread.start()
 | 
						|
        self._last_brightness = brightness
 | 
						|
 | 
						|
  def _update_wakefulness(self):
 | 
						|
    # Handle interactive timeout
 | 
						|
    ignition_just_turned_off = not ui_state.ignition and self._ignition
 | 
						|
    self._ignition = ui_state.ignition
 | 
						|
 | 
						|
    if ignition_just_turned_off or any(ev.left_down for ev in gui_app.mouse_events):
 | 
						|
      self.reset_interactive_timeout()
 | 
						|
 | 
						|
    interaction_timeout = time.monotonic() > self._interaction_time
 | 
						|
    if interaction_timeout and not self._prev_timed_out:
 | 
						|
      for callback in self._interactive_timeout_callbacks:
 | 
						|
        callback()
 | 
						|
    self._prev_timed_out = interaction_timeout
 | 
						|
 | 
						|
    self._set_awake(ui_state.ignition or not interaction_timeout)
 | 
						|
 | 
						|
  def _set_awake(self, on: bool):
 | 
						|
    if on != self._awake:
 | 
						|
      self._awake = on
 | 
						|
      cloudlog.debug(f"setting display power {int(on)}")
 | 
						|
      HARDWARE.set_display_power(on)
 | 
						|
 | 
						|
 | 
						|
# Global instance
 | 
						|
ui_state = UIState()
 | 
						|
device = Device()
 | 
						|
 |