You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							129 lines
						
					
					
						
							3.8 KiB
						
					
					
				
			
		
		
	
	
							129 lines
						
					
					
						
							3.8 KiB
						
					
					
				import pyray as rl
 | 
						|
from enum import Enum
 | 
						|
from cereal import messaging, log
 | 
						|
from openpilot.common.params import Params, UnknownKeyName
 | 
						|
 | 
						|
 | 
						|
UI_BORDER_SIZE = 30
 | 
						|
 | 
						|
 | 
						|
class UIStatus(Enum):
 | 
						|
  DISENGAGED = "disengaged"
 | 
						|
  ENGAGED = "engaged"
 | 
						|
  OVERRIDE = "override"
 | 
						|
 | 
						|
 | 
						|
class UIState:
 | 
						|
  _instance: 'UIState | None' = None
 | 
						|
 | 
						|
  def __new__(cls):
 | 
						|
    if cls._instance is None:
 | 
						|
      cls._instance = super().__new__(cls)
 | 
						|
      cls._instance._initialize()
 | 
						|
    return cls._instance
 | 
						|
 | 
						|
  def _initialize(self):
 | 
						|
    self.params = Params()
 | 
						|
    self.sm = messaging.SubMaster(
 | 
						|
      [
 | 
						|
        "modelV2",
 | 
						|
        "controlsState",
 | 
						|
        "liveCalibration",
 | 
						|
        "radarState",
 | 
						|
        "deviceState",
 | 
						|
        "pandaStates",
 | 
						|
        "carParams",
 | 
						|
        "driverMonitoringState",
 | 
						|
        "carState",
 | 
						|
        "driverStateV2",
 | 
						|
        "roadCameraState",
 | 
						|
        "wideRoadCameraState",
 | 
						|
        "managerState",
 | 
						|
        "selfdriveState",
 | 
						|
        "longitudinalPlan",
 | 
						|
      ]
 | 
						|
    )
 | 
						|
 | 
						|
    # UI Status tracking
 | 
						|
    self.status: UIStatus = UIStatus.DISENGAGED
 | 
						|
    self.started_frame: int = 0
 | 
						|
    self._engaged_prev: bool = False
 | 
						|
    self._started_prev: bool = False
 | 
						|
 | 
						|
    # Core state variables
 | 
						|
    self.is_metric: bool = self.params.get_bool("IsMetric")
 | 
						|
    self.started: bool = False
 | 
						|
    self.ignition: bool = False
 | 
						|
    self.panda_type: log.PandaState.PandaType = log.PandaState.PandaType.unknown
 | 
						|
    self.personality: log.LongitudinalPersonality = log.LongitudinalPersonality.standard
 | 
						|
    self.light_sensor: float = -1.0
 | 
						|
 | 
						|
    self._update_params()
 | 
						|
 | 
						|
  @property
 | 
						|
  def engaged(self) -> bool:
 | 
						|
    return self.started and self.sm["selfdriveState"].enabled
 | 
						|
 | 
						|
  def update(self) -> None:
 | 
						|
    self.sm.update(0)
 | 
						|
    self._update_state()
 | 
						|
    self._update_status()
 | 
						|
 | 
						|
  def _update_state(self) -> None:
 | 
						|
    # Handle panda states updates
 | 
						|
    if self.sm.updated["pandaStates"]:
 | 
						|
      panda_states = self.sm["pandaStates"]
 | 
						|
 | 
						|
      if len(panda_states) > 0:
 | 
						|
        # Get panda type from first panda
 | 
						|
        self.panda_type = panda_states[0].pandaType
 | 
						|
        # Check ignition status across all pandas
 | 
						|
        if self.panda_type != log.PandaState.PandaType.unknown:
 | 
						|
          self.ignition = any(state.ignitionLine or state.ignitionCan for state in panda_states)
 | 
						|
    elif self.sm.frame - self.sm.recv_frame["pandaStates"] > 5 * rl.get_fps():
 | 
						|
      self.panda_type = log.PandaState.PandaType.unknown
 | 
						|
 | 
						|
    # Handle wide road camera state updates
 | 
						|
    if self.sm.updated["wideRoadCameraState"]:
 | 
						|
      cam_state = self.sm["wideRoadCameraState"]
 | 
						|
 | 
						|
      # Scale factor based on sensor type
 | 
						|
      scale = 6.0 if cam_state.sensor == 'ar0231' else 1.0
 | 
						|
      self.light_sensor = max(100.0 - scale * cam_state.exposureValPercent, 0.0)
 | 
						|
    elif not self.sm.alive["wideRoadCameraState"] or not self.sm.valid["wideRoadCameraState"]:
 | 
						|
      self.light_sensor = -1
 | 
						|
 | 
						|
    # Update started state
 | 
						|
    self.started = self.sm["deviceState"].started and self.ignition
 | 
						|
 | 
						|
  def _update_status(self) -> None:
 | 
						|
    if self.started and self.sm.updated["selfdriveState"]:
 | 
						|
      ss = self.sm["selfdriveState"]
 | 
						|
      state = ss.state
 | 
						|
 | 
						|
      if state in (log.SelfdriveState.OpenpilotState.preEnabled, log.SelfdriveState.OpenpilotState.overriding):
 | 
						|
        self.status = UIStatus.OVERRIDE
 | 
						|
      else:
 | 
						|
        self.status = UIStatus.ENGAGED if ss.enabled else UIStatus.DISENGAGED
 | 
						|
 | 
						|
    # Check for engagement state changes
 | 
						|
    if self.engaged != self._engaged_prev:
 | 
						|
      self._engaged_prev = self.engaged
 | 
						|
 | 
						|
    # Handle onroad/offroad transition
 | 
						|
    if self.started != self._started_prev or self.sm.frame == 1:
 | 
						|
      if self.started:
 | 
						|
        self.status = UIStatus.DISENGAGED
 | 
						|
        self.started_frame = self.sm.frame
 | 
						|
 | 
						|
      self._started_prev = self.started
 | 
						|
 | 
						|
  def _update_params(self) -> None:
 | 
						|
    try:
 | 
						|
      self.is_metric = self.params.get_bool("IsMetric")
 | 
						|
    except UnknownKeyName:
 | 
						|
      self.is_metric = False
 | 
						|
 | 
						|
 | 
						|
# Global instance
 | 
						|
ui_state = UIState()
 | 
						|
 |