ui(raylib): create BaseWindow (#35074)

* ui(raylib): create BaseWindow

* test without typing

* revert

* Revert "test without typing"

This reverts commit c8a5e1b832.

* lines
pull/35078/head
Cameron Clough 1 month ago committed by GitHub
parent 41b34c6f43
commit 124198460b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 57
      system/ui/lib/window.py
  2. 42
      system/ui/spinner.py
  3. 45
      system/ui/text.py

@ -0,0 +1,57 @@
import threading
import time
import os
from typing import Generic, Protocol, TypeVar
from openpilot.system.ui.lib.application import gui_app
class RendererProtocol(Protocol):
def render(self): ...
R = TypeVar("R", bound=RendererProtocol)
class BaseWindow(Generic[R]):
def __init__(self, title: str):
self._title = title
self._renderer: R | None = None
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._run)
self._thread.start()
# wait for the renderer to be initialized
while self._renderer is None and self._thread.is_alive():
time.sleep(0.01)
def _create_renderer(self) -> R:
raise NotImplementedError("Subclasses of BaseWindow must implement _create_renderer()")
def _run(self):
if os.getenv("CI") is not None:
return
gui_app.init_window("Spinner")
self._renderer = self._create_renderer()
try:
for _ in gui_app.render():
if self._stop_event.is_set():
break
self._renderer.render()
finally:
gui_app.close()
def __enter__(self):
return self
def close(self):
if self._thread.is_alive():
self._stop_event.set()
self._thread.join(timeout=2.0)
if self._thread.is_alive():
print(f"WARNING: failed to join {self._title} thread")
def __del__(self):
self.close()
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

@ -6,6 +6,7 @@ import time
from openpilot.common.basedir import BASEDIR from openpilot.common.basedir import BASEDIR
from openpilot.system.ui.lib.application import gui_app from openpilot.system.ui.lib.application import gui_app
from openpilot.system.ui.lib.window import BaseWindow
from openpilot.system.ui.text import wrap_text from openpilot.system.ui.text import wrap_text
# Constants # Constants
@ -85,16 +86,12 @@ class SpinnerRenderer:
FONT_SIZE, 0.0, rl.WHITE) FONT_SIZE, 0.0, rl.WHITE)
class Spinner: class Spinner(BaseWindow[SpinnerRenderer]):
def __init__(self): def __init__(self):
self._renderer: SpinnerRenderer | None = None super().__init__("Spinner")
self._stop_event = threading.Event()
self._thread = threading.Thread(target=self._run)
self._thread.start()
# wait for the renderer to be initialized def _create_renderer(self):
while self._renderer is None and self._thread.is_alive(): return SpinnerRenderer()
time.sleep(0.01)
def update(self, spinner_text: str): def update(self, spinner_text: str):
if self._renderer is not None: if self._renderer is not None:
@ -103,35 +100,6 @@ class Spinner:
def update_progress(self, cur: float, total: float): def update_progress(self, cur: float, total: float):
self.update(str(round(100 * cur / total))) self.update(str(round(100 * cur / total)))
def _run(self):
if os.getenv("CI") is not None:
return
gui_app.init_window("Spinner")
self._renderer = renderer = SpinnerRenderer()
try:
for _ in gui_app.render():
if self._stop_event.is_set():
break
renderer.render()
finally:
gui_app.close()
def __enter__(self):
return self
def close(self):
if self._thread.is_alive():
self._stop_event.set()
self._thread.join(timeout=2.0)
if self._thread.is_alive():
print("WARNING: failed to join spinner thread")
def __del__(self):
self.close()
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if __name__ == "__main__": if __name__ == "__main__":
with Spinner() as s: with Spinner() as s:

@ -1,13 +1,12 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os
import re import re
import threading
import time import time
import pyray as rl import pyray as rl
from openpilot.system.hardware import HARDWARE, PC from openpilot.system.hardware import HARDWARE, PC
from openpilot.system.ui.lib.button import gui_button, ButtonStyle from openpilot.system.ui.lib.button import gui_button, ButtonStyle
from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel from openpilot.system.ui.lib.scroll_panel import GuiScrollPanel
from openpilot.system.ui.lib.application import gui_app from openpilot.system.ui.lib.application import gui_app
from openpilot.system.ui.lib.window import BaseWindow
MARGIN = 50 MARGIN = 50
SPACING = 40 SPACING = 40
@ -74,52 +73,18 @@ class TextWindowRenderer:
return ret return ret
class TextWindow: class TextWindow(BaseWindow[TextWindowRenderer]):
def __init__(self, text: str): def __init__(self, text: str):
self._text = text self._text = text
super().__init__("Text")
self._renderer: TextWindowRenderer | None = None def _create_renderer(self):
self._stop_event = threading.Event() return TextWindowRenderer(self._text)
self._thread = threading.Thread(target=self._run)
self._thread.start()
# wait for the renderer to be initialized
while self._renderer is None and self._thread.is_alive():
time.sleep(0.01)
def wait_for_exit(self): def wait_for_exit(self):
while self._thread.is_alive(): while self._thread.is_alive():
time.sleep(0.01) time.sleep(0.01)
def _run(self):
if os.getenv("CI") is not None:
return
gui_app.init_window("Text")
self._renderer = renderer = TextWindowRenderer(self._text)
try:
for _ in gui_app.render():
if self._stop_event.is_set():
break
renderer.render()
finally:
gui_app.close()
def __enter__(self):
return self
def close(self):
if self._thread.is_alive():
self._stop_event.set()
self._thread.join(timeout=2.0)
if self._thread.is_alive():
print("WARNING: failed to join text window thread")
def __del__(self):
self.close()
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
if __name__ == "__main__": if __name__ == "__main__":
with TextWindow(DEMO_TEXT): with TextWindow(DEMO_TEXT):

Loading…
Cancel
Save