@ -1,6 +1,9 @@
import os
import time
import json
import jwt
from datetime import datetime , timedelta
from common . api import api_get
from common . params import Params
@ -12,7 +15,7 @@ from selfdrive.version import version, terms_version, training_version, get_git_
get_git_branch , get_git_remote
def register ( ) :
def register ( spinner = None ) :
params = Params ( )
params . put ( " Version " , version )
params . put ( " TermsVersion " , terms_version )
@ -23,10 +26,13 @@ def register():
params . put ( " GitRemote " , get_git_remote ( default = " " ) )
params . put ( " SubscriberInfo " , HARDWARE . get_subscriber_info ( ) )
needs_registration = False
# create a key for auth
# your private key is kept on your device persist partition and never sent to our servers
# do not erase your persist partition
if not os . path . isfile ( PERSIST + " /comma/id_rsa.pub " ) :
needs_registration = True
cloudlog . warning ( " generating your personal RSA key " )
mkdirs_exists_ok ( PERSIST + " /comma " )
assert os . system ( " openssl genrsa -out " + PERSIST + " /comma/id_rsa.tmp 2048 " ) == 0
@ -39,31 +45,40 @@ def register():
os . chmod ( PERSIST + ' /comma/id_rsa ' , 0o744 )
dongle_id = params . get ( " DongleId " , encoding = ' utf8 ' )
public_key = open ( PERSIST + " /comma/id_rsa.pub " ) . read ( )
needs_registration = needs_registration or dongle_id is None
# create registration token
# in the future, this key will make JWTs directly
private_key = open ( PERSIST + " /comma/id_rsa " ) . read ( )
if needs_registration :
if spinner is not None :
spinner . update ( " registering device " )
# late import
import jwt
# Create registration token, in the future, this key will make JWTs directly
private_key = open ( PERSIST + " /comma/id_rsa " ) . read ( )
public_key = open ( PERSIST + " /comma/id_rsa.pub " ) . read ( )
register_token = jwt . encode ( { ' register ' : True , ' exp ' : datetime . utcnow ( ) + timedelta ( hours = 1 ) } , private_key , algorithm = ' RS256 ' )
# Block until we get the imei
imei1 , imei2 = None , None
while imei1 is None and imei2 is None :
try :
imei1 , imei2 = HARDWARE . get_imei ( 0 ) , HARDWARE . get_imei ( 1 )
except Exception :
cloudlog . exception ( " Error getting imei, trying again... " )
time . sleep ( 1 )
while True :
try :
cloudlog . info ( " getting pilotauth " )
resp = api_get ( " v2/pilotauth/ " , method = ' POST ' , timeout = 15 ,
imei = HARDWARE . get_imei ( 0 ) , imei2 = HARDWARE . get_imei ( 1 ) , serial = HARDWARE . get_serial ( ) , public_key = public_key , register_token = register_token )
imei = imei1 , imei2 = imei2 , serial = HARDWARE . get_serial ( ) , public_key = public_key , register_token = register_token )
dongleauth = json . loads ( resp . text )
dongle_id = dongleauth [ " dongle_id " ]
params . put ( " DongleId " , dongle_id )
return dongle_id
break
except Exception :
cloudlog . exception ( " failed to authenticate " )
if dongle_id is not None :
time . sleep ( 1 )
return dongle_id
else :
return None
if __name__ == " __main__ " :
print ( register ( ) )