From 5198b1b079c37742c1050f02ce0aa6dd42b038b9 Mon Sep 17 00:00:00 2001 From: Maxime Desroches Date: Mon, 3 Nov 2025 22:45:00 -0800 Subject: [PATCH] support ECDSA (#36555) * keys * remove * remove * too small --- common/api.py | 16 +++++++++++++--- system/athena/registration.py | 17 +++++++---------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/common/api.py b/common/api.py index 005655b21d..656c05ed59 100644 --- a/common/api.py +++ b/common/api.py @@ -7,11 +7,14 @@ from openpilot.system.version import get_version API_HOST = os.getenv('API_HOST', 'https://api.commadotai.com') + # name : jwt signature algorithm +KEYS = {"id_rsa" : "RS256", + "id_ecdsa" : "ES256"} + class Api: def __init__(self, dongle_id): self.dongle_id = dongle_id - with open(Paths.persist_root()+'/comma/id_rsa') as f: - self.private_key = f.read() + self.jwt_algorithm, self.private_key, _ = get_key_pair() def get(self, *args, **kwargs): return self.request('GET', *args, **kwargs) @@ -32,7 +35,7 @@ class Api: } if payload_extra is not None: payload.update(payload_extra) - token = jwt.encode(payload, self.private_key, algorithm='RS256') + token = jwt.encode(payload, self.private_key, algorithm=self.jwt_algorithm) if isinstance(token, bytes): token = token.decode('utf8') return token @@ -46,3 +49,10 @@ def api_get(endpoint, method='GET', timeout=None, access_token=None, **params): headers['User-Agent'] = "openpilot-" + get_version() return requests.request(method, API_HOST + "/" + endpoint, timeout=timeout, headers=headers, params=params) + +def get_key_pair(): + for key in KEYS: + if os.path.isfile(Paths.persist_root() + f'/comma/{key}') and os.path.isfile(Paths.persist_root() + f'/comma/{key}.pub'): + with open(Paths.persist_root() + f'/comma/{key}') as private, open(Paths.persist_root() + f'/comma/{key}.pub') as public: + return KEYS[key], private.read(), public.read() + return None, None, None diff --git a/system/athena/registration.py b/system/athena/registration.py index 80c087188d..81aee1ebf9 100755 --- a/system/athena/registration.py +++ b/system/athena/registration.py @@ -5,7 +5,7 @@ import jwt from pathlib import Path from datetime import datetime, timedelta, UTC -from openpilot.common.api import api_get +from openpilot.common.api import api_get, get_key_pair from openpilot.common.params import Params from openpilot.common.spinner import Spinner from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert @@ -39,20 +39,17 @@ def register(show_spinner=False) -> str | None: with open(Paths.persist_root()+"/comma/dongle_id") as f: dongle_id = f.read().strip() - pubkey = Path(Paths.persist_root()+"/comma/id_rsa.pub") - if not pubkey.is_file(): + # Create registration token, in the future, this key will make JWTs directly + jwt_algo, private_key, public_key = get_key_pair() + + if not public_key: dongle_id = UNREGISTERED_DONGLE_ID - cloudlog.warning(f"missing public key: {pubkey}") + cloudlog.warning("missing public key") elif dongle_id is None: if show_spinner: spinner = Spinner() spinner.update("registering device") - # Create registration token, in the future, this key will make JWTs directly - with open(Paths.persist_root()+"/comma/id_rsa.pub") as f1, open(Paths.persist_root()+"/comma/id_rsa") as f2: - public_key = f1.read() - private_key = f2.read() - # Block until we get the imei serial = HARDWARE.get_serial() start_time = time.monotonic() @@ -72,7 +69,7 @@ def register(show_spinner=False) -> str | None: start_time = time.monotonic() while True: try: - register_token = jwt.encode({'register': True, 'exp': datetime.now(UTC).replace(tzinfo=None) + timedelta(hours=1)}, private_key, algorithm='RS256') + register_token = jwt.encode({'register': True, 'exp': datetime.now(UTC).replace(tzinfo=None) + timedelta(hours=1)}, private_key, algorithm=jwt_algo) cloudlog.info("getting pilotauth") resp = api_get("v2/pilotauth/", method='POST', timeout=15, imei=imei1, imei2=imei2, serial=serial, public_key=public_key, register_token=register_token)