import os
import subprocess
from common . basedir import BASEDIR
class Spinner ( ) :
def __init__ ( self ) :
try :
self . spinner_proc = subprocess . Popen ( [ " ./spinner " ] ,
stdin = subprocess . PIPE ,
cwd = os . path . join ( BASEDIR , " selfdrive " , " ui " ) ,
close_fds = True )
except OSError :
self . spinner_proc = None
def __enter__ ( self ) :
return self
def update ( self , spinner_text : str ) :
if self . spinner_proc is not None :
self . spinner_proc . stdin . write ( spinner_text . encode ( ' utf8 ' ) + b " \n " )
try :
self . spinner_proc . stdin . flush ( )
except BrokenPipeError :
pass
def update_progress ( self , cur : float , total : float ) :
self . update ( str ( round ( 100 * cur / total ) ) )
def close ( self ) :
if self . spinner_proc is not None :
try :
self . spinner_proc . stdin . close ( )
except BrokenPipeError :
pass
self . spinner_proc . terminate ( )
self . spinner_proc = None
def __del__ ( self ) :
self . close ( )
def __exit__ ( self , exc_type , exc_value , traceback ) :
self . close ( )
if __name__ == " __main__ " :
import time
with Spinner ( ) as s :
s . update ( " Spinner text " )
time . sleep ( 5.0 )
print ( " gone " )
time . sleep ( 5.0 )