support ECDSA (#36555)

* keys

* remove

* remove

* too small
pull/36342/head^2
Maxime Desroches 4 days ago committed by GitHub
parent e8a11591a8
commit 5198b1b079
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 16
      common/api.py
  2. 17
      system/athena/registration.py

@ -7,11 +7,14 @@ from openpilot.system.version import get_version
API_HOST = os.getenv('API_HOST', 'https://api.commadotai.com')
# name : jwt signature algorithm
KEYS = {"id_rsa" : "RS256",
"id_ecdsa" : "ES256"}
class Api:
def __init__(self, dongle_id):
self.dongle_id = dongle_id
with open(Paths.persist_root()+'/comma/id_rsa') as f:
self.private_key = f.read()
self.jwt_algorithm, self.private_key, _ = get_key_pair()
def get(self, *args, **kwargs):
return self.request('GET', *args, **kwargs)
@ -32,7 +35,7 @@ class Api:
}
if payload_extra is not None:
payload.update(payload_extra)
token = jwt.encode(payload, self.private_key, algorithm='RS256')
token = jwt.encode(payload, self.private_key, algorithm=self.jwt_algorithm)
if isinstance(token, bytes):
token = token.decode('utf8')
return token
@ -46,3 +49,10 @@ def api_get(endpoint, method='GET', timeout=None, access_token=None, **params):
headers['User-Agent'] = "openpilot-" + get_version()
return requests.request(method, API_HOST + "/" + endpoint, timeout=timeout, headers=headers, params=params)
def get_key_pair():
for key in KEYS:
if os.path.isfile(Paths.persist_root() + f'/comma/{key}') and os.path.isfile(Paths.persist_root() + f'/comma/{key}.pub'):
with open(Paths.persist_root() + f'/comma/{key}') as private, open(Paths.persist_root() + f'/comma/{key}.pub') as public:
return KEYS[key], private.read(), public.read()
return None, None, None

@ -5,7 +5,7 @@ import jwt
from pathlib import Path
from datetime import datetime, timedelta, UTC
from openpilot.common.api import api_get
from openpilot.common.api import api_get, get_key_pair
from openpilot.common.params import Params
from openpilot.common.spinner import Spinner
from openpilot.selfdrive.selfdrived.alertmanager import set_offroad_alert
@ -39,20 +39,17 @@ def register(show_spinner=False) -> str | None:
with open(Paths.persist_root()+"/comma/dongle_id") as f:
dongle_id = f.read().strip()
pubkey = Path(Paths.persist_root()+"/comma/id_rsa.pub")
if not pubkey.is_file():
# Create registration token, in the future, this key will make JWTs directly
jwt_algo, private_key, public_key = get_key_pair()
if not public_key:
dongle_id = UNREGISTERED_DONGLE_ID
cloudlog.warning(f"missing public key: {pubkey}")
cloudlog.warning("missing public key")
elif dongle_id is None:
if show_spinner:
spinner = Spinner()
spinner.update("registering device")
# Create registration token, in the future, this key will make JWTs directly
with open(Paths.persist_root()+"/comma/id_rsa.pub") as f1, open(Paths.persist_root()+"/comma/id_rsa") as f2:
public_key = f1.read()
private_key = f2.read()
# Block until we get the imei
serial = HARDWARE.get_serial()
start_time = time.monotonic()
@ -72,7 +69,7 @@ def register(show_spinner=False) -> str | None:
start_time = time.monotonic()
while True:
try:
register_token = jwt.encode({'register': True, 'exp': datetime.now(UTC).replace(tzinfo=None) + timedelta(hours=1)}, private_key, algorithm='RS256')
register_token = jwt.encode({'register': True, 'exp': datetime.now(UTC).replace(tzinfo=None) + timedelta(hours=1)}, private_key, algorithm=jwt_algo)
cloudlog.info("getting pilotauth")
resp = api_get("v2/pilotauth/", method='POST', timeout=15,
imei=imei1, imei2=imei2, serial=serial, public_key=public_key, register_token=register_token)

Loading…
Cancel
Save