import os
import time
import json
import jwt
from datetime import datetime , timedelta
from common . api import api_get
from common . params import Params
from common . spinner import Spinner
from common . file_helpers import mkdirs_exists_ok
from common . basedir import PERSIST
from selfdrive . hardware import HARDWARE
from selfdrive . swaglog import cloudlog
def register ( show_spinner = False ) :
params = Params ( )
params . put ( " SubscriberInfo " , HARDWARE . get_subscriber_info ( ) )
IMEI = params . get ( " IMEI " , encoding = ' utf8 ' )
HardwareSerial = params . get ( " HardwareSerial " , encoding = ' utf8 ' )
dongle_id = params . get ( " DongleId " , encoding = ' utf8 ' )
needs_registration = None in ( IMEI , HardwareSerial , dongle_id )
# create a key for auth
# your private key is kept on your device persist partition and never sent to our servers
# do not erase your persist partition
if not os . path . isfile ( PERSIST + " /comma/id_rsa.pub " ) :
needs_registration = True
cloudlog . warning ( " generating your personal RSA key " )
mkdirs_exists_ok ( PERSIST + " /comma " )
assert os . system ( " openssl genrsa -out " + PERSIST + " /comma/id_rsa.tmp 2048 " ) == 0
assert os . system ( " openssl rsa -in " + PERSIST + " /comma/id_rsa.tmp -pubout -out " + PERSIST + " /comma/id_rsa.tmp.pub " ) == 0
os . rename ( PERSIST + " /comma/id_rsa.tmp " , PERSIST + " /comma/id_rsa " )
os . rename ( PERSIST + " /comma/id_rsa.tmp.pub " , PERSIST + " /comma/id_rsa.pub " )
if needs_registration :
if show_spinner :
spinner = Spinner ( )
spinner . update ( " registering device " )
# Create registration token, in the future, this key will make JWTs directly
private_key = open ( PERSIST + " /comma/id_rsa " ) . read ( )
public_key = open ( PERSIST + " /comma/id_rsa.pub " ) . read ( )
register_token = jwt . encode ( { ' register ' : True , ' exp ' : datetime . utcnow ( ) + timedelta ( hours = 1 ) } , private_key , algorithm = ' RS256 ' )
# Block until we get the imei
imei1 , imei2 = None , None
while imei1 is None and imei2 is None :
try :
imei1 , imei2 = HARDWARE . get_imei ( 0 ) , HARDWARE . get_imei ( 1 )
except Exception :
cloudlog . exception ( " Error getting imei, trying again... " )
time . sleep ( 1 )
serial = HARDWARE . get_serial ( )
params . put ( " IMEI " , imei1 )
params . put ( " HardwareSerial " , serial )
while True :
try :
cloudlog . info ( " getting pilotauth " )
resp = api_get ( " v2/pilotauth/ " , method = ' POST ' , timeout = 15 ,
imei = imei1 , imei2 = imei2 , serial = serial , public_key = public_key , register_token = register_token )
if resp . status_code == 402 :
cloudlog . info ( " Uknown serial number while trying to register device " )
dongle_id = None
else :
dongleauth = json . loads ( resp . text )
dongle_id = dongleauth [ " dongle_id " ]
params . put ( " DongleId " , dongle_id )
break
except Exception :
cloudlog . exception ( " failed to authenticate " )
time . sleep ( 1 )
if show_spinner :
spinner . close ( )
return dongle_id
if __name__ == " __main__ " :
print ( register ( ) )