openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

124 lines
4.2 KiB

#!/usr/bin/env python3
import time
import json
import jwt
from pathlib import Path
import threading
import pyray as rl
from datetime import datetime, timedelta, UTC
from openpilot.common.api import api_get
from openpilot.common.params import Params
from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert
from openpilot.system.hardware import HARDWARE, PC
from openpilot.system.hardware.hw import Paths
from openpilot.common.swaglog import cloudlog
from openpilot.system.ui.spinner import Spinner
from openpilot.system.ui.lib.application import gui_app
UNREGISTERED_DONGLE_ID = "UnregisteredDevice"
spinner: Spinner | None = None
def is_registered_device() -> bool:
dongle = Params().get("DongleId", encoding='utf-8')
return dongle not in (None, UNREGISTERED_DONGLE_ID)
def _show_spinner_window(end_evt: threading.Event):
global spinner
gui_app.init_window("registering device")
spinner = Spinner()
while not end_evt.is_set():
rl.begin_drawing()
rl.clear_background(rl.BLACK)
spinner.render()
rl.end_drawing()
gui_app.close()
def register(show_spinner=False) -> str | None:
"""
All devices built since March 2024 come with all
info stored in /persist/. This is kept around
only for devices built before then.
With a backend update to take serial number instead
of dongle ID to some endpoints, this can be removed
entirely.
"""
params = Params()
dongle_id: str | None = params.get("DongleId", encoding='utf8')
if dongle_id is None and Path(Paths.persist_root()+"/comma/dongle_id").is_file():
# not all devices will have this; added early in comma 3X production (2/28/24)
with open(Paths.persist_root()+"/comma/dongle_id") as f:
dongle_id = f.read().strip()
pubkey = Path(Paths.persist_root()+"/comma/id_rsa.pub")
if not pubkey.is_file():
dongle_id = UNREGISTERED_DONGLE_ID
cloudlog.warning(f"missing public key: {pubkey}")
elif dongle_id is None:
global spinner
spinner_thread = None
if show_spinner:
end_evt = threading.Event()
spinner_thread = threading.Thread(target=_show_spinner_window, args=(end_evt,))
spinner_thread.start()
# Create registration token, in the future, this key will make JWTs directly
with open(Paths.persist_root()+"/comma/id_rsa.pub") as f1, open(Paths.persist_root()+"/comma/id_rsa") as f2:
public_key = f1.read()
private_key = f2.read()
# Block until we get the imei
serial = HARDWARE.get_serial()
start_time = time.monotonic()
imei1: str | None = None
imei2: str | None = None
while imei1 is None and imei2 is None:
try:
imei1, imei2 = HARDWARE.get_imei(0), HARDWARE.get_imei(1)
except Exception:
cloudlog.exception("Error getting imei, trying again...")
time.sleep(1)
1 month ago
if time.monotonic() - start_time > 60 and spinner:
spinner.set_text(f"registering device - serial: {serial}, IMEI: ({imei1}, {imei2})")
backoff = 0
start_time = time.monotonic()
while True:
try:
register_token = jwt.encode({'register': True, 'exp': datetime.now(UTC).replace(tzinfo=None) + timedelta(hours=1)}, private_key, algorithm='RS256')
cloudlog.info("getting pilotauth")
resp = api_get("v2/pilotauth/", method='POST', timeout=15,
imei=imei1, imei2=imei2, serial=serial, public_key=public_key, register_token=register_token)
if resp.status_code in (402, 403):
cloudlog.info(f"Unable to register device, got {resp.status_code}")
dongle_id = UNREGISTERED_DONGLE_ID
else:
dongleauth = json.loads(resp.text)
dongle_id = dongleauth["dongle_id"]
break
except Exception:
cloudlog.exception("failed to authenticate")
backoff = min(backoff + 1, 15)
time.sleep(backoff)
1 month ago
if time.monotonic() - start_time > 60 and spinner:
spinner.set_text(f"registering device - serial: {serial}, IMEI: ({imei1}, {imei2})")
if spinner_thread:
end_evt.set()
spinner_thread.join()
if dongle_id:
params.put("DongleId", dongle_id)
set_offroad_alert("Offroad_UnofficialHardware", (dongle_id == UNREGISTERED_DONGLE_ID) and not PC)
return dongle_id
if __name__ == "__main__":
print(register())