#!/usr/bin/env python3
import pyray as rl
import os
import threading
import time
from openpilot . common . basedir import BASEDIR
from openpilot . system . ui . lib . application import gui_app
from openpilot . system . ui . text import wrap_text
# Constants
PROGRESS_BAR_WIDTH = 1000
PROGRESS_BAR_HEIGHT = 20
DEGREES_PER_SECOND = 360.0 # one full rotation per second
MARGIN_H = 100
TEXTURE_SIZE = 360
FONT_SIZE = 88
LINE_HEIGHT = 96
DARKGRAY = ( 55 , 55 , 55 , 255 )
def clamp ( value , min_value , max_value ) :
return max ( min ( value , max_value ) , min_value )
class SpinnerRenderer :
def __init__ ( self ) :
self . _comma_texture = gui_app . load_texture_from_image ( os . path . join ( BASEDIR , " selfdrive/assets/img_spinner_comma.png " ) , TEXTURE_SIZE , TEXTURE_SIZE )
self . _spinner_texture = gui_app . load_texture_from_image ( os . path . join ( BASEDIR , " selfdrive/assets/img_spinner_track.png " ) , TEXTURE_SIZE , TEXTURE_SIZE ,
alpha_premultiply = True )
self . _rotation = 0.0
self . _progress : int | None = None
self . _wrapped_lines : list [ str ] = [ ]
self . _lock = threading . Lock ( )
def set_text ( self , text : str ) - > None :
with self . _lock :
if text . isdigit ( ) :
self . _progress = clamp ( int ( text ) , 0 , 100 )
self . _wrapped_lines = [ ]
else :
self . _progress = None
self . _wrapped_lines = wrap_text ( text , FONT_SIZE , gui_app . width - MARGIN_H )
def render ( self ) :
with self . _lock :
progress = self . _progress
wrapped_lines = self . _wrapped_lines
if wrapped_lines :
# Calculate total height required for spinner and text
spacing = 50
total_height = TEXTURE_SIZE + spacing + len ( wrapped_lines ) * LINE_HEIGHT
center_y = ( gui_app . height - total_height ) / 2.0 + TEXTURE_SIZE / 2.0
else :
# Center spinner vertically
spacing = 150
center_y = gui_app . height / 2.0
y_pos = center_y + TEXTURE_SIZE / 2.0 + spacing
center = rl . Vector2 ( gui_app . width / 2.0 , center_y )
spinner_origin = rl . Vector2 ( TEXTURE_SIZE / 2.0 , TEXTURE_SIZE / 2.0 )
comma_position = rl . Vector2 ( center . x - TEXTURE_SIZE / 2.0 , center . y - TEXTURE_SIZE / 2.0 )
delta_time = rl . get_frame_time ( )
self . _rotation = ( self . _rotation + DEGREES_PER_SECOND * delta_time ) % 360.0
# Draw rotating spinner and static comma logo
rl . draw_texture_pro ( self . _spinner_texture , rl . Rectangle ( 0 , 0 , TEXTURE_SIZE , TEXTURE_SIZE ) ,
rl . Rectangle ( center . x , center . y , TEXTURE_SIZE , TEXTURE_SIZE ) ,
spinner_origin , self . _rotation , rl . WHITE )
rl . draw_texture_v ( self . _comma_texture , comma_position , rl . WHITE )
# Display the progress bar or text based on user input
if progress is not None :
bar = rl . Rectangle ( center . x - PROGRESS_BAR_WIDTH / 2.0 , y_pos , PROGRESS_BAR_WIDTH , PROGRESS_BAR_HEIGHT )
rl . draw_rectangle_rounded ( bar , 1 , 10 , DARKGRAY )
bar . width * = progress / 100.0
rl . draw_rectangle_rounded ( bar , 1 , 10 , rl . WHITE )
elif wrapped_lines :
for i , line in enumerate ( wrapped_lines ) :
text_size = rl . measure_text_ex ( gui_app . font ( ) , line , FONT_SIZE , 0.0 )
rl . draw_text_ex ( gui_app . font ( ) , line , rl . Vector2 ( center . x - text_size . x / 2 , y_pos + i * LINE_HEIGHT ) ,
FONT_SIZE , 0.0 , rl . WHITE )
class Spinner :
def __init__ ( self ) :
self . _renderer : SpinnerRenderer | None = None
self . _stop_event = threading . Event ( )
self . _thread = threading . Thread ( target = self . _run )
self . _thread . start ( )
# wait for the renderer to be initialized
while self . _renderer is None and self . _thread . is_alive ( ) :
time . sleep ( 0.01 )
def update ( self , spinner_text : str ) :
if self . _renderer is not None :
self . _renderer . set_text ( spinner_text )
def update_progress ( self , cur : float , total : float ) :
self . update ( str ( round ( 100 * cur / total ) ) )
def _run ( self ) :
if os . getenv ( " CI " ) is not None :
return
gui_app . init_window ( " Spinner " )
self . _renderer = renderer = SpinnerRenderer ( )
try :
for _ in gui_app . render ( ) :
if self . _stop_event . is_set ( ) :
break
renderer . render ( )
finally :
gui_app . close ( )
def __enter__ ( self ) :
return self
def close ( self ) :
if self . _thread . is_alive ( ) :
self . _stop_event . set ( )
self . _thread . join ( timeout = 2.0 )
if self . _thread . is_alive ( ) :
print ( " WARNING: failed to join spinner thread " )
def __del__ ( self ) :
self . close ( )
def __exit__ ( self , exc_type , exc_val , exc_tb ) :
self . close ( )
if __name__ == " __main__ " :
with Spinner ( ) as s :
s . update ( " Spinner text " )
time . sleep ( 5 )