import time import pyray as rl from dataclasses import dataclass from cereal import messaging, log from openpilot.system.hardware import TICI from openpilot.system.ui.lib.application import gui_app, FontWeight # Constants ALERT_COLORS = { log.SelfdriveState.AlertStatus.normal: rl.Color(0, 0, 0, 220), # Black log.SelfdriveState.AlertStatus.userPrompt: rl.Color(0xFE, 0x8C, 0x34, 220), # Orange log.SelfdriveState.AlertStatus.critical: rl.Color(0xC9, 0x22, 0x31, 220), # Red } ALERT_HEIGHTS = { log.SelfdriveState.AlertSize.small: 271, log.SelfdriveState.AlertSize.mid: 420, } ALERT_BORDER_RADIUS = 30 SELFDRIVE_STATE_TIMEOUT = 5 # Seconds SELFDRIVE_UNRESPONSIVE_TIMEOUT = 10 # Seconds @dataclass class Alert: text1: str = "" text2: str = "" alert_type: str = "" size: log.SelfdriveState.AlertSize = log.SelfdriveState.AlertSize.none status: log.SelfdriveState.AlertStatus = log.SelfdriveState.AlertStatus.normal # Pre-defined alert instances ALERT_STARTUP_PENDING = Alert( text1="openpilot Unavailable", text2="Waiting to start", alert_type="selfdriveWaiting", size=log.SelfdriveState.AlertSize.mid, status=log.SelfdriveState.AlertStatus.normal, ) ALERT_CRITICAL_TIMEOUT = Alert( text1="TAKE CONTROL IMMEDIATELY", text2="System Unresponsive", alert_type="selfdriveUnresponsive", size=log.SelfdriveState.AlertSize.full, status=log.SelfdriveState.AlertStatus.critical, ) ALERT_CRITICAL_REBOOT = Alert( text1="System Unresponsive", text2="Reboot Device", alert_type="selfdriveUnresponsivePermanent", size=log.SelfdriveState.AlertSize.full, status=log.SelfdriveState.AlertStatus.critical, ) class AlertRenderer: def __init__(self): """Initialize the alert renderer.""" self.alert: Alert = Alert() # TODO: use ui_state to determine when to start self.started_frame: int = 0 self.font_regular: rl.Font = gui_app.font(FontWeight.NORMAL) self.font_bold: rl.Font = gui_app.font(FontWeight.BOLD) self.font_metrics_cache: dict[tuple[str, int, str], rl.Vector2] = {} def update_state(self, sm: messaging.SubMaster) -> None: """Update alert state based on SubMaster data.""" self.alert = self.get_alert(sm) def get_alert(self, sm: messaging.SubMaster) -> Alert: """Generate the current alert based on selfdrive state.""" ss = sm['selfdriveState'] # Check if waiting to start if sm.recv_frame['selfdriveState'] < self.started_frame: return ALERT_STARTUP_PENDING # Handle selfdrive timeout ss_missing = time.monotonic() - sm.recv_time['selfdriveState'] if TICI: if ss_missing > SELFDRIVE_STATE_TIMEOUT: if ss.enabled and (ss_missing - SELFDRIVE_STATE_TIMEOUT) < SELFDRIVE_UNRESPONSIVE_TIMEOUT: return ALERT_CRITICAL_TIMEOUT return ALERT_CRITICAL_REBOOT # Return current alert from selfdrive state return Alert( text1=ss.alertText1, text2=ss.alertText2, alert_type=ss.alertType, size=self._get_enum_value(ss.alertSize, log.SelfdriveState.AlertSize), status=self._get_enum_value(ss.alertStatus, log.SelfdriveState.AlertStatus)) def draw(self, rect: rl.Rectangle, sm: messaging.SubMaster) -> None: """Render the alert within the specified rectangle.""" self.update_state(sm) alert_size = self._get_enum_value(self.alert.size, log.SelfdriveState.AlertSize) if alert_size == log.SelfdriveState.AlertSize.none: return # Calculate alert rectangle margin = 0 if alert_size == log.SelfdriveState.AlertSize.full else 40 height = ALERT_HEIGHTS.get(alert_size, rect.height) alert_rect = rl.Rectangle( rect.x + margin, rect.y + rect.height - height + margin, rect.width - margin * 2, height - margin * 2, ) # Draw background alert_status = self._get_enum_value(self.alert.status, log.SelfdriveState.AlertStatus) color = ALERT_COLORS.get(alert_status, ALERT_COLORS[log.SelfdriveState.AlertStatus.normal]) if alert_size != log.SelfdriveState.AlertSize.full: roundness = ALERT_BORDER_RADIUS / (min(alert_rect.width, alert_rect.height) / 2) rl.draw_rectangle_rounded(alert_rect, roundness, 10, color) else: rl.draw_rectangle_rec(alert_rect, color) # Draw text center_x = rect.x + rect.width / 2 center_y = alert_rect.y + alert_rect.height / 2 self._draw_text(alert_size, alert_rect, center_x, center_y) def _draw_text( self, alert_size: log.SelfdriveState.AlertSize, alert_rect: rl.Rectangle, center_x: float, center_y: float ) -> None: """Draw text based on alert size.""" if alert_size == log.SelfdriveState.AlertSize.small: font_size = 74 text_width = self._measure_text(self.font_bold, self.alert.text1, font_size, 'bold').x rl.draw_text_ex( self.font_bold, self.alert.text1, rl.Vector2(center_x - text_width / 2, center_y - font_size / 2), font_size, 0, rl.WHITE, ) elif alert_size == log.SelfdriveState.AlertSize.mid: font_size1 = 88 text1_width = self._measure_text(self.font_bold, self.alert.text1, font_size1, 'bold').x rl.draw_text_ex( self.font_bold, self.alert.text1, rl.Vector2(center_x - text1_width / 2, center_y - 125), font_size1, 0, rl.WHITE, ) font_size2 = 66 text2_width = self._measure_text(self.font_regular, self.alert.text2, font_size2, 'regular').x rl.draw_text_ex( self.font_regular, self.alert.text2, rl.Vector2(center_x - text2_width / 2, center_y + 21), font_size2, 0, rl.WHITE, ) elif alert_size == log.SelfdriveState.AlertSize.full: is_long = len(self.alert.text1) > 15 font_size1 = 132 if is_long else 177 text1_y = alert_rect.y + (240 if is_long else 270) wrapped_text1 = self._wrap_text(self.alert.text1, alert_rect.width - 100, font_size1, self.font_bold) for i, line in enumerate(wrapped_text1): line_width = self._measure_text(self.font_bold, line, font_size1, 'bold').x rl.draw_text_ex( self.font_bold, line, rl.Vector2(center_x - line_width / 2, text1_y + i * font_size1), font_size1, 0, rl.WHITE, ) font_size2 = 88 text2_y = alert_rect.y + alert_rect.height - (361 if is_long else 420) wrapped_text2 = self._wrap_text(self.alert.text2, alert_rect.width - 100, font_size2, self.font_regular) for i, line in enumerate(wrapped_text2): line_width = self._measure_text(self.font_regular, line, font_size2, 'regular').x rl.draw_text_ex( self.font_regular, line, rl.Vector2(center_x - line_width / 2, text2_y + i * font_size2), font_size2, 0, rl.WHITE, ) def _wrap_text(self, text: str, max_width: float, font_size: int, font: rl.Font) -> list[str]: """Wrap text to fit within max width.""" words = text.split() lines = [] current_line = "" for word in words: test_line = f"{current_line} {word}" if current_line else word if self._measure_text(font, test_line, font_size, 'bold' if font == self.font_bold else 'regular').x <= max_width: current_line = test_line else: if current_line: lines.append(current_line) current_line = word if current_line: lines.append(current_line) return lines def _measure_text(self, font: rl.Font, text: str, font_size: int, font_type: str) -> rl.Vector2: """Measure text dimensions with caching.""" key = (text, font_size, font_type) if key not in self.font_metrics_cache: self.font_metrics_cache[key] = rl.measure_text_ex(font, text, font_size, 0) return self.font_metrics_cache[key] @staticmethod def _get_enum_value(enum_value, enum_type: type): """Safely convert capnp enum to Python enum value.""" return enum_value.raw if hasattr(enum_value, 'raw') else enum_value