|
|
|
@ -2,12 +2,15 @@ from enum import IntEnum |
|
|
|
|
import os |
|
|
|
|
import threading |
|
|
|
|
import time |
|
|
|
|
from functools import lru_cache |
|
|
|
|
|
|
|
|
|
from openpilot.common.api import Api, api_get |
|
|
|
|
from openpilot.common.params import Params |
|
|
|
|
from openpilot.common.swaglog import cloudlog |
|
|
|
|
from openpilot.system.athena.registration import UNREGISTERED_DONGLE_ID |
|
|
|
|
|
|
|
|
|
TOKEN_EXPIRY_HOURS = 2 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PrimeType(IntEnum): |
|
|
|
|
UNKNOWN = -2, |
|
|
|
@ -20,6 +23,12 @@ class PrimeType(IntEnum): |
|
|
|
|
PURPLE = 5, |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@lru_cache(maxsize=1) |
|
|
|
|
def get_token(dongle_id: str, t: int): |
|
|
|
|
print('getting token') |
|
|
|
|
return Api(dongle_id).get_token(expiry_hours=TOKEN_EXPIRY_HOURS) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class PrimeState: |
|
|
|
|
FETCH_INTERVAL = 5.0 # seconds between API calls |
|
|
|
|
API_TIMEOUT = 10.0 # seconds for API requests |
|
|
|
@ -49,13 +58,15 @@ class PrimeState: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
identity_token = Api(dongle_id).get_token() |
|
|
|
|
identity_token = get_token(dongle_id, int(time.monotonic() / (TOKEN_EXPIRY_HOURS / 2 * 60 * 60))) |
|
|
|
|
response = api_get(f"v1.1/devices/{dongle_id}", timeout=self.API_TIMEOUT, access_token=identity_token) |
|
|
|
|
if response.status_code == 200: |
|
|
|
|
data = response.json() |
|
|
|
|
is_paired = data.get("is_paired", False) |
|
|
|
|
prime_type = data.get("prime_type", 0) |
|
|
|
|
self.set_type(PrimeType(prime_type) if is_paired else PrimeType.UNPAIRED) |
|
|
|
|
elif response.status_code == 401: |
|
|
|
|
get_token.cache_clear() |
|
|
|
|
except Exception as e: |
|
|
|
|
cloudlog.error(f"Failed to fetch prime status: {e}") |
|
|
|
|
|
|
|
|
|