@ -3,18 +3,22 @@ import cffi
import os
import time
import pyray as rl
import threading
from collections . abc import Callable
from collections import deque
from dataclasses import dataclass
from enum import IntEnum
from typing import NamedTuple
from importlib . resources import as_file , files
from openpilot . common . swaglog import cloudlog
from openpilot . system . hardware import HARDWARE
from openpilot . system . hardware import HARDWARE , PC
from openpilot . common . realtime import Ratekeeper
DEFAULT_FPS = int ( os . getenv ( " FPS " , " 60 " ) )
FPS_LOG_INTERVAL = 5 # Seconds between logging FPS drops
FPS_DROP_THRESHOLD = 0.9 # FPS drop threshold for triggering a warning
FPS_CRITICAL_THRESHOLD = 0.5 # Critical threshold for triggering strict actions
MOUSE_THREAD_RATE = 140 # touch controller runs at 140Hz
ENABLE_VSYNC = os . getenv ( " ENABLE_VSYNC " , " 0 " ) == " 1 "
SHOW_FPS = os . getenv ( " SHOW_FPS " ) == " 1 "
@ -47,6 +51,68 @@ class ModalOverlay:
callback : Callable | None = None
class MousePos ( NamedTuple ) :
x : float
y : float
class MouseEvent ( NamedTuple ) :
pos : MousePos
left_pressed : bool
left_released : bool
left_down : bool
t : float
class MouseState :
def __init__ ( self ) :
self . _events : deque [ MouseEvent ] = deque ( maxlen = MOUSE_THREAD_RATE ) # bound event list
self . _prev_mouse_event : MouseEvent | None = None
self . _rk = Ratekeeper ( MOUSE_THREAD_RATE )
self . _lock = threading . Lock ( )
self . _exit_event = threading . Event ( )
self . _thread = None
def get_events ( self ) - > list [ MouseEvent ] :
with self . _lock :
events = list ( self . _events )
self . _events . clear ( )
return events
def start ( self ) :
self . _exit_event . clear ( )
if self . _thread is None or not self . _thread . is_alive ( ) :
self . _thread = threading . Thread ( target = self . _run_thread , daemon = True )
self . _thread . start ( )
def stop ( self ) :
self . _exit_event . set ( )
if self . _thread is not None and self . _thread . is_alive ( ) :
self . _thread . join ( )
def _run_thread ( self ) :
while not self . _exit_event . is_set ( ) :
rl . poll_input_events ( )
self . _handle_mouse_event ( )
self . _rk . keep_time ( )
def _handle_mouse_event ( self ) :
mouse_pos = rl . get_mouse_position ( )
ev = MouseEvent (
MousePos ( mouse_pos . x , mouse_pos . y ) ,
rl . is_mouse_button_pressed ( rl . MouseButton . MOUSE_BUTTON_LEFT ) ,
rl . is_mouse_button_released ( rl . MouseButton . MOUSE_BUTTON_LEFT ) ,
rl . is_mouse_button_down ( rl . MouseButton . MOUSE_BUTTON_LEFT ) ,
time . monotonic ( ) ,
)
# Only add changes
if self . _prev_mouse_event is None or ev [ : - 1 ] != self . _prev_mouse_event [ : - 1 ] :
with self . _lock :
self . _events . append ( ev )
self . _prev_mouse_event = ev
class GuiApplication :
def __init__ ( self , width : int , height : int ) :
self . _fonts : dict [ FontWeight , rl . Font ] = { }
@ -63,8 +129,11 @@ class GuiApplication:
self . _trace_log_callback = None
self . _modal_overlay = ModalOverlay ( )
self . _mouse = MouseState ( )
self . _mouse_events : list [ MouseEvent ] = [ ]
# Debug variables
self . _mouse_history : deque [ rl . Vector2 ] = deque ( maxlen = DEFAULT_FPS * 2 )
self . _mouse_history : deque [ MousePos ] = deque ( maxlen = MOUSE_THREAD_RATE )
def request_close ( self ) :
self . _window_close_requested = True
@ -94,6 +163,9 @@ class GuiApplication:
self . _set_styles ( )
self . _load_fonts ( )
if not PC :
self . _mouse . start ( )
def set_modal_overlay ( self , overlay , callback : Callable | None = None ) :
self . _modal_overlay = ModalOverlay ( overlay = overlay , callback = callback )
@ -154,11 +226,25 @@ class GuiApplication:
rl . unload_render_texture ( self . _render_texture )
self . _render_texture = None
if not PC :
self . _mouse . stop ( )
rl . close_window ( )
@property
def mouse_events ( self ) - > list [ MouseEvent ] :
return self . _mouse_events
def render ( self ) :
try :
while not ( self . _window_close_requested or rl . window_should_close ( ) ) :
if PC :
# Thread is not used on PC, need to manually add mouse events
self . _mouse . _handle_mouse_event ( )
# Store all mouse events for the current frame
self . _mouse_events = self . _mouse . get_events ( )
if self . _render_texture :
rl . begin_texture_mode ( self . _render_texture )
rl . clear_background ( rl . BLACK )
@ -196,9 +282,10 @@ class GuiApplication:
rl . draw_fps ( 10 , 10 )
if SHOW_TOUCHES :
if rl . is_mouse_button_pressed ( rl . MouseButton . MOUSE_BUTTON_LEFT ) :
for mouse_event in self . _mouse_events :
if mouse_event . left_pressed :
self . _mouse_history . clear ( )
self . _mouse_history . append ( rl . get_mouse_position ( ) )
self . _mouse_history . append ( mouse_event . pos )
if self . _mouse_history :
mouse_pos = self . _mouse_history [ - 1 ]