openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

98 lines
2.7 KiB

from enum import IntEnum
import os
import threading
import time
from openpilot.common.api import Api, api_get
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
class PrimeType(IntEnum):
UNKNOWN = -2,
UNPAIRED = -1,
NONE = 0,
MAGENTA = 1,
LITE = 2,
BLUE = 3,
MAGENTA_NEW = 4,
PURPLE = 5,
class PrimeState:
FETCH_INTERVAL = 5.0 # seconds between API calls
API_TIMEOUT = 10.0 # seconds for API requests
SLEEP_INTERVAL = 0.5 # seconds to sleep between checks in the worker thread
def __init__(self):
self._params = Params()
self._lock = threading.Lock()
self.prime_type: PrimeType = self._load_initial_state()
self._running = False
self._thread = None
self.start()
def _load_initial_state(self) -> PrimeType:
prime_type_str = os.getenv("PRIME_TYPE") or self._params.get("PrimeType", encoding='utf8')
try:
if prime_type_str is not None:
return PrimeType(int(prime_type_str))
except (ValueError, TypeError):
pass
return PrimeType.UNKNOWN
def _fetch_prime_status(self) -> None:
dongle_id = self._params.get("DongleId", encoding='utf8')
if not dongle_id:
return
try:
identity_token = Api(dongle_id).get_token()
response = api_get(f"v1.1/devices/{dongle_id}", timeout=self.API_TIMEOUT, access_token=identity_token)
if response.status_code == 200:
data = response.json()
is_paired = data.get("is_paired", False)
prime_type = data.get("prime_type", 0)
self.set_type(PrimeType(prime_type) if is_paired else PrimeType.UNPAIRED)
except Exception as e:
print(f"Failed to fetch prime status: {e}")
def set_type(self, prime_type: PrimeType) -> None:
with self._lock:
if prime_type != self.prime_type:
self.prime_type = prime_type
self._params.put("PrimeType", str(int(prime_type)))
cloudlog.info(f"Prime type updated to {prime_type}")
def _worker_thread(self) -> None:
while self._running:
self._fetch_prime_status()
for _ in range(int(self.FETCH_INTERVAL / self.SLEEP_INTERVAL)):
if not self._running:
break
time.sleep(self.SLEEP_INTERVAL)
def start(self) -> None:
if self._thread and self._thread.is_alive():
return
self._running = True
self._thread = threading.Thread(target=self._worker_thread, daemon=True)
self._thread.start()
def stop(self) -> None:
self._running = False
if self._thread and self._thread.is_alive():
self._thread.join(timeout=1.0)
def get_type(self) -> PrimeType:
with self._lock:
return self.prime_type
def is_prime(self) -> bool:
with self._lock:
return bool(self.prime_type > PrimeType.NONE)
def __del__(self):
self.stop()