from enum import IntEnum import os import threading import time from functools import lru_cache from openpilot.common.api import Api, api_get from openpilot.common.params import Params from openpilot.common.swaglog import cloudlog from openpilot.system.athena.registration import UNREGISTERED_DONGLE_ID TOKEN_EXPIRY_HOURS = 2 class PrimeType(IntEnum): UNKNOWN = -2, UNPAIRED = -1, NONE = 0, MAGENTA = 1, LITE = 2, BLUE = 3, MAGENTA_NEW = 4, PURPLE = 5, @lru_cache(maxsize=1) def get_token(dongle_id: str, t: int): return Api(dongle_id).get_token(expiry_hours=TOKEN_EXPIRY_HOURS) class PrimeState: FETCH_INTERVAL = 5.0 # seconds between API calls API_TIMEOUT = 10.0 # seconds for API requests SLEEP_INTERVAL = 0.5 # seconds to sleep between checks in the worker thread def __init__(self): self._params = Params() self._lock = threading.Lock() self.prime_type: PrimeType = self._load_initial_state() self._running = False self._thread = None self.start() def _load_initial_state(self) -> PrimeType: prime_type_str = os.getenv("PRIME_TYPE") or self._params.get("PrimeType") try: if prime_type_str is not None: return PrimeType(int(prime_type_str)) except (ValueError, TypeError): pass return PrimeType.UNKNOWN def _fetch_prime_status(self) -> None: dongle_id = self._params.get("DongleId") if not dongle_id or dongle_id == UNREGISTERED_DONGLE_ID: return try: identity_token = get_token(dongle_id, int(time.monotonic() / (TOKEN_EXPIRY_HOURS / 2 * 60 * 60))) response = api_get(f"v1.1/devices/{dongle_id}", timeout=self.API_TIMEOUT, access_token=identity_token) if response.status_code == 200: data = response.json() is_paired = data.get("is_paired", False) prime_type = data.get("prime_type", 0) self.set_type(PrimeType(prime_type) if is_paired else PrimeType.UNPAIRED) elif response.status_code == 401: get_token.cache_clear() except Exception as e: cloudlog.error(f"Failed to fetch prime status: {e}") def set_type(self, prime_type: PrimeType) -> None: with self._lock: if prime_type != self.prime_type: self.prime_type = prime_type self._params.put("PrimeType", int(prime_type)) cloudlog.info(f"Prime type updated to {prime_type}") def _worker_thread(self) -> None: while self._running: self._fetch_prime_status() for _ in range(int(self.FETCH_INTERVAL / self.SLEEP_INTERVAL)): if not self._running: break time.sleep(self.SLEEP_INTERVAL) def start(self) -> None: if self._thread and self._thread.is_alive(): return self._running = True self._thread = threading.Thread(target=self._worker_thread, daemon=True) self._thread.start() def stop(self) -> None: self._running = False if self._thread and self._thread.is_alive(): self._thread.join(timeout=1.0) def get_type(self) -> PrimeType: with self._lock: return self.prime_type def is_prime(self) -> bool: with self._lock: return bool(self.prime_type > PrimeType.NONE) def __del__(self): self.stop()