From 122182176161bff56e51b68afcfec07264925093 Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Sat, 31 May 2025 02:03:38 +0800 Subject: [PATCH] system/ui: migrate c++ alert renderer to python (#35386) * rebase * cache metrics * measure text * type hint * improve * fix roundness * rebase --- system/ui/onroad/alert_renderer.py | 234 ++++++++++++++++++++++++ system/ui/onroad/augmented_road_view.py | 5 +- 2 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 system/ui/onroad/alert_renderer.py diff --git a/system/ui/onroad/alert_renderer.py b/system/ui/onroad/alert_renderer.py new file mode 100644 index 0000000000..600c721680 --- /dev/null +++ b/system/ui/onroad/alert_renderer.py @@ -0,0 +1,234 @@ +import numpy as np +import pyray as rl +from dataclasses import dataclass +from cereal import messaging, log +from openpilot.system.ui.lib.application import gui_app, FontWeight + +# Constants +ALERT_COLORS = { + log.SelfdriveState.AlertStatus.normal: rl.Color(0, 0, 0, 150), # Black + log.SelfdriveState.AlertStatus.userPrompt: rl.Color(0xFE, 0x8C, 0x34, 100), # Orange + log.SelfdriveState.AlertStatus.critical: rl.Color(0xC9, 0x22, 0x31, 150), # Red +} + +ALERT_HEIGHTS = { + log.SelfdriveState.AlertSize.small: 271, + log.SelfdriveState.AlertSize.mid: 420, +} + +SELFDRIVE_STATE_TIMEOUT = 5 # Seconds +SELFDRIVE_UNRESPONSIVE_TIMEOUT = 10 # Seconds + + +@dataclass +class Alert: + text1: str = "" + text2: str = "" + alert_type: str = "" + size: log.SelfdriveState.AlertSize = log.SelfdriveState.AlertSize.none + status: log.SelfdriveState.AlertStatus = log.SelfdriveState.AlertStatus.normal + + def is_equal(self, other: 'Alert') -> bool: + """Check if two alerts are equal.""" + return ( + self.text1 == other.text1 + and self.text2 == other.text2 + and self.alert_type == other.alert_type + and self.size == other.size + and self.status == other.status + ) + + +class AlertRenderer: + def __init__(self): + """Initialize the alert renderer.""" + self.alert: Alert = Alert() + self.started_frame: int = 0 + self.font_regular: rl.Font = gui_app.font(FontWeight.NORMAL) + self.font_bold: rl.Font = gui_app.font(FontWeight.BOLD) + self.font_metrics_cache: dict[tuple[str, int, str], rl.Vector2] = {} + + def clear(self) -> None: + """Reset the alert to its default state.""" + self.alert = Alert() + + def update_state(self, sm: messaging.SubMaster, started_frame: int) -> None: + """Update alert state based on SubMaster data.""" + self.started_frame = started_frame + new_alert = self.get_alert(sm) + if not self.alert.is_equal(new_alert): + self.alert = new_alert + + def get_alert(self, sm: messaging.SubMaster) -> Alert: + """Generate the current alert based on selfdrive state.""" + if not sm.valid['selfdriveState']: + return Alert() + + ss = sm['selfdriveState'] + selfdrive_frame = sm.recv_frame['selfdriveState'] + alert_status = self._get_enum_value(ss.alertStatus, log.SelfdriveState.AlertStatus) + + # Return current alert if selfdrive state is recent + if selfdrive_frame >= self.started_frame: + return Alert( + text1=ss.alertText1, + text2=ss.alertText2, + alert_type=ss.alertType, + size=self._get_enum_value(ss.alertSize, log.SelfdriveState.AlertSize), + status=alert_status, + ) + + # Handle selfdrive timeout + ss_missing = (np.uint64(rl.get_time() * 1e9) - sm.recv_time['selfdriveState']) / 1e9 + if selfdrive_frame < self.started_frame: + return Alert( + text1="openpilot Unavailable", + text2="Waiting to start", + alert_type="selfdriveWaiting", + size=log.SelfdriveState.AlertSize.mid, + status=log.SelfdriveState.AlertStatus.normal, + ) + elif ss_missing > SELFDRIVE_STATE_TIMEOUT: + if ss.enabled and (ss_missing - SELFDRIVE_STATE_TIMEOUT) < SELFDRIVE_UNRESPONSIVE_TIMEOUT: + return Alert( + text1="TAKE CONTROL IMMEDIATELY", + text2="System Unresponsive", + alert_type="selfdriveUnresponsive", + size=log.SelfdriveState.AlertSize.full, + status=log.SelfdriveState.AlertStatus.critical, + ) + return Alert( + text1="System Unresponsive", + text2="Reboot Device", + alert_type="selfdriveUnresponsivePermanent", + size=log.SelfdriveState.AlertSize.mid, + status=log.SelfdriveState.AlertStatus.normal, + ) + + return Alert() + + def draw(self, rect: rl.Rectangle, sm: messaging.SubMaster) -> None: + """Render the alert within the specified rectangle.""" + self.update_state(sm, sm.recv_frame['selfdriveState']) + alert_size = self._get_enum_value(self.alert.size, log.SelfdriveState.AlertSize) + if alert_size == log.SelfdriveState.AlertSize.none: + return + + # Calculate alert rectangle + margin = 0 if alert_size == log.SelfdriveState.AlertSize.full else 40 + radius = 0 if alert_size == log.SelfdriveState.AlertSize.full else 30 + height = ALERT_HEIGHTS.get(alert_size, rect.height) + alert_rect = rl.Rectangle( + rect.x + margin, + rect.y + rect.height - height + margin, + rect.width - margin * 2, + height - margin * 2, + ) + + # Draw background + alert_status = self._get_enum_value(self.alert.status, log.SelfdriveState.AlertStatus) + color = ALERT_COLORS.get(alert_status, ALERT_COLORS[log.SelfdriveState.AlertStatus.normal]) + if alert_size != log.SelfdriveState.AlertSize.full: + roundness = radius / (min(alert_rect.width, alert_rect.height) / 2) + rl.draw_rectangle_rounded(alert_rect, roundness, 10, color) + else: + rl.draw_rectangle_rec(alert_rect, color) + + # Draw text + center_x = rect.x + rect.width / 2 + center_y = alert_rect.y + alert_rect.height / 2 + self._draw_text(alert_size, alert_rect, center_x, center_y) + + def _draw_text( + self, alert_size: log.SelfdriveState.AlertSize, alert_rect: rl.Rectangle, center_x: float, center_y: float + ) -> None: + """Draw text based on alert size.""" + if alert_size == log.SelfdriveState.AlertSize.small: + font_size = 74 + text_width = self._measure_text(self.font_bold, self.alert.text1, font_size, 'bold').x + rl.draw_text_ex( + self.font_bold, + self.alert.text1, + rl.Vector2(center_x - text_width / 2, center_y - font_size / 2), + font_size, + 0, + rl.WHITE, + ) + elif alert_size == log.SelfdriveState.AlertSize.mid: + font_size1 = 88 + text1_width = self._measure_text(self.font_bold, self.alert.text1, font_size1, 'bold').x + rl.draw_text_ex( + self.font_bold, + self.alert.text1, + rl.Vector2(center_x - text1_width / 2, center_y - 125), + font_size1, + 0, + rl.WHITE, + ) + font_size2 = 66 + text2_width = self._measure_text(self.font_regular, self.alert.text2, font_size2, 'regular').x + rl.draw_text_ex( + self.font_regular, + self.alert.text2, + rl.Vector2(center_x - text2_width / 2, center_y + 21), + font_size2, + 0, + rl.WHITE, + ) + elif alert_size == log.SelfdriveState.AlertSize.full: + is_long = len(self.alert.text1) > 15 + font_size1 = 132 if is_long else 177 + text1_y = alert_rect.y + (240 if is_long else 270) + wrapped_text1 = self._wrap_text(self.alert.text1, alert_rect.width - 100, font_size1, self.font_bold) + for i, line in enumerate(wrapped_text1): + line_width = self._measure_text(self.font_bold, line, font_size1, 'bold').x + rl.draw_text_ex( + self.font_bold, + line, + rl.Vector2(center_x - line_width / 2, text1_y + i * font_size1), + font_size1, + 0, + rl.WHITE, + ) + font_size2 = 88 + text2_y = alert_rect.y + alert_rect.height - (361 if is_long else 420) + wrapped_text2 = self._wrap_text(self.alert.text2, alert_rect.width - 100, font_size2, self.font_regular) + for i, line in enumerate(wrapped_text2): + line_width = self._measure_text(self.font_regular, line, font_size2, 'regular').x + rl.draw_text_ex( + self.font_regular, + line, + rl.Vector2(center_x - line_width / 2, text2_y + i * font_size2), + font_size2, + 0, + rl.WHITE, + ) + + def _wrap_text(self, text: str, max_width: float, font_size: int, font: rl.Font) -> list[str]: + """Wrap text to fit within max width.""" + words = text.split() + lines = [] + current_line = "" + for word in words: + test_line = f"{current_line} {word}" if current_line else word + if self._measure_text(font, test_line, font_size, 'bold' if font == self.font_bold else 'regular').x <= max_width: + current_line = test_line + else: + if current_line: + lines.append(current_line) + current_line = word + if current_line: + lines.append(current_line) + return lines + + def _measure_text(self, font: rl.Font, text: str, font_size: int, font_type: str) -> rl.Vector2: + """Measure text dimensions with caching.""" + key = (text, font_size, font_type) + if key not in self.font_metrics_cache: + self.font_metrics_cache[key] = rl.measure_text_ex(font, text, font_size, 0) + return self.font_metrics_cache[key] + + @staticmethod + def _get_enum_value(enum_value, enum_type: type): + """Safely convert capnp enum to Python enum value.""" + return enum_value.raw if hasattr(enum_value, 'raw') else enum_value diff --git a/system/ui/onroad/augmented_road_view.py b/system/ui/onroad/augmented_road_view.py index 83d2271028..18f94325f4 100644 --- a/system/ui/onroad/augmented_road_view.py +++ b/system/ui/onroad/augmented_road_view.py @@ -4,6 +4,7 @@ from enum import Enum from cereal import messaging, log from msgq.visionipc import VisionStreamType +from openpilot.system.ui.onroad.alert_renderer import AlertRenderer from openpilot.system.ui.onroad.driver_state import DriverStateRenderer from openpilot.system.ui.onroad.hud_renderer import HudRenderer from openpilot.system.ui.onroad.model_renderer import ModelRenderer @@ -43,6 +44,7 @@ class AugmentedRoadView(CameraView): self.model_renderer = ModelRenderer() self._hud_renderer = HudRenderer() + self.alert_renderer = AlertRenderer() self.driver_state_renderer = DriverStateRenderer() def render(self, rect): @@ -75,6 +77,7 @@ class AugmentedRoadView(CameraView): # Draw all UI overlays self.model_renderer.draw(self._content_rect, self.sm) self._hud_renderer.draw(self._content_rect, self.sm) + self.alert_renderer.draw(self._content_rect, self.sm) self.driver_state_renderer.draw(self._content_rect, self.sm) # Custom UI extension point - add custom overlays here @@ -118,7 +121,7 @@ class AugmentedRoadView(CameraView): def _calc_frame_matrix(self, rect: rl.Rectangle) -> np.ndarray: # Check if we can use cached matrix - calib_time = self.sm.recv_frame.get('liveCalibration', 0) + calib_time = self.sm.recv_frame['liveCalibration'] current_dims = (self._content_rect.width, self._content_rect.height) if (self._last_calib_time == calib_time and self._last_rect_dims == current_dims and