from enum import IntEnum
import os
import threading
import time
from openpilot . common . api import Api , api_get
from openpilot . common . params import Params
from openpilot . common . swaglog import cloudlog
class PrimeType ( IntEnum ) :
UNKNOWN = - 2 ,
UNPAIRED = - 1 ,
NONE = 0 ,
MAGENTA = 1 ,
LITE = 2 ,
BLUE = 3 ,
MAGENTA_NEW = 4 ,
PURPLE = 5 ,
class PrimeState :
FETCH_INTERVAL = 5.0 # seconds between API calls
API_TIMEOUT = 10.0 # seconds for API requests
SLEEP_INTERVAL = 0.5 # seconds to sleep between checks in the worker thread
def __init__ ( self ) :
self . _params = Params ( )
self . _lock = threading . Lock ( )
self . prime_type : PrimeType = self . _load_initial_state ( )
self . _running = False
self . _thread = None
self . start ( )
def _load_initial_state ( self ) - > PrimeType :
prime_type_str = os . getenv ( " PRIME_TYPE " ) or self . _params . get ( " PrimeType " , encoding = ' utf8 ' )
try :
if prime_type_str is not None :
return PrimeType ( int ( prime_type_str ) )
except ( ValueError , TypeError ) :
pass
return PrimeType . UNKNOWN
def _fetch_prime_status ( self ) - > None :
dongle_id = self . _params . get ( " DongleId " , encoding = ' utf8 ' )
if not dongle_id :
return
try :
identity_token = Api ( dongle_id ) . get_token ( )
response = api_get ( f " v1.1/devices/ { dongle_id } " , timeout = self . API_TIMEOUT , access_token = identity_token )
if response . status_code == 200 :
data = response . json ( )
is_paired = data . get ( " is_paired " , False )
prime_type = data . get ( " prime_type " , 0 )
self . set_type ( PrimeType ( prime_type ) if is_paired else PrimeType . UNPAIRED )
except Exception as e :
cloudlog . error ( f " Failed to fetch prime status: { e } " )
def set_type ( self , prime_type : PrimeType ) - > None :
with self . _lock :
if prime_type != self . prime_type :
self . prime_type = prime_type
self . _params . put ( " PrimeType " , str ( int ( prime_type ) ) )
cloudlog . info ( f " Prime type updated to { prime_type } " )
def _worker_thread ( self ) - > None :
while self . _running :
self . _fetch_prime_status ( )
for _ in range ( int ( self . FETCH_INTERVAL / self . SLEEP_INTERVAL ) ) :
if not self . _running :
break
time . sleep ( self . SLEEP_INTERVAL )
def start ( self ) - > None :
if self . _thread and self . _thread . is_alive ( ) :
return
self . _running = True
self . _thread = threading . Thread ( target = self . _worker_thread , daemon = True )
self . _thread . start ( )
def stop ( self ) - > None :
self . _running = False
if self . _thread and self . _thread . is_alive ( ) :
self . _thread . join ( timeout = 1.0 )
def get_type ( self ) - > PrimeType :
with self . _lock :
return self . prime_type
def is_prime ( self ) - > bool :
with self . _lock :
return bool ( self . prime_type > PrimeType . NONE )
def __del__ ( self ) :
self . stop ( )