#!/usr/bin/env python3
import json
import os
import math
import time
import binascii
import requests
import serial
import subprocess
from dataclasses import dataclass
@dataclass
class Profile :
iccid : str
isdp_aid : str
nickname : str
enabled : bool
provider : str
class LPAError ( Exception ) :
pass
class LPA2 :
def __init__ ( self ) :
self . env = os . environ . copy ( )
self . env [ ' LPAC_APDU ' ] = ' qmi '
self . env [ ' QMI_DEVICE ' ] = ' /dev/cdc-wdm0 '
self . timeout_sec = 45
def list_profiles ( self ) :
"""
List all profiles on the eUICC .
"""
raw = self . _invoke ( ' profile ' , ' list ' ) [ - 1 ] # only one message
profiles = [ ]
for profile in raw [ ' payload ' ] [ ' data ' ] :
profiles . append ( Profile (
iccid = profile [ ' iccid ' ] ,
isdp_aid = profile [ ' isdpAid ' ] ,
nickname = profile [ ' profileNickname ' ] ,
enabled = profile [ ' profileState ' ] == ' enabled ' ,
provider = profile [ ' serviceProviderName ' ] ,
) )
return profiles
def validate_successful ( self , msgs : list [ dict ] ) - > None :
"""
Validate that the last message is a success notification .
"""
assert msgs [ - 1 ] [ ' payload ' ] [ ' message ' ] == ' success ' , ' expected success notification '
def profile_exists ( self , iccid : str ) - > bool :
"""
Check if a profile exists on the eUICC .
"""
return any ( p . iccid == iccid for p in self . list_profiles ( ) )
def get_active_profile ( self ) :
"""
Get the active profile on the eUICC .
"""
profiles = self . list_profiles ( )
for profile in profiles :
if profile . enabled :
return profile
return None
def process_notifications ( self ) - > None :
"""
Process notifications from the LPA , typically to activate / deactivate the profile with the carrier .
"""
self . validate_successful ( self . _invoke ( ' notification ' , ' process ' , ' -a ' , ' -r ' ) )
def enable_profile ( self , iccid : str ) - > None :
"""
Enable the profile on the eUICC .
"""
if not self . profile_exists ( iccid ) :
raise LPAError ( f ' profile { iccid } does not exist ' )
latest = self . get_active_profile ( )
if latest is None :
raise LPAError ( ' no profile enabled ' )
if latest . iccid == iccid :
raise LPAError ( f ' profile { iccid } is already enabled ' )
else :
self . disable_profile ( latest . iccid )
self . validate_successful ( self . _invoke ( ' profile ' , ' enable ' , iccid ) )
self . process_notifications ( )
def disable_profile ( self , iccid : str ) - > None :
"""
Disable the profile on the eUICC .
"""
if not self . profile_exists ( iccid ) :
raise LPAError ( f ' profile { iccid } does not exist ' )
latest = self . get_active_profile ( )
if latest is None :
raise LPAError ( ' no profile enabled ' )
if latest . iccid != iccid :
raise LPAError ( f ' profile { iccid } is not enabled ' )
self . validate_successful ( self . _invoke ( ' profile ' , ' disable ' , iccid ) )
self . process_notifications ( )
def delete_profile ( self , iccid : str ) - > None :
"""
Delete the profile on the eUICC .
"""
if not self . profile_exists ( iccid ) :
raise LPAError ( f ' profile { iccid } does not exist ' )
self . validate_successful ( self . _invoke ( ' profile ' , ' delete ' , iccid ) )
self . process_notifications ( )
def nickname_profile ( self , iccid : str , nickname : str ) - > None :
"""
Set the nickname of the profile on the eUICC .
"""
if not self . profile_exists ( iccid ) :
raise LPAError ( f ' profile { iccid } does not exist ' )
self . validate_successful ( self . _invoke ( ' profile ' , ' nickname ' , iccid , nickname ) )
self . process_notifications ( )
def download_profile ( self , qr : str , nickname : str | None = None ) - > None :
"""
Download the profile from the eUICC .
"""
msgs = self . _invoke ( ' profile ' , ' download ' , ' -a ' , qr )
self . validate_successful ( msgs )
new_profile = next ( ( m for m in msgs if m [ ' payload ' ] [ ' message ' ] == ' es8p_meatadata_parse ' ) , None )
if new_profile is None :
raise LPAError ( ' no new profile found ' )
if nickname :
self . nickname_profile ( new_profile [ ' payload ' ] [ ' data ' ] [ ' iccid ' ] , nickname )
self . process_notifications ( )
def _invoke ( self , * cmd : str ) :
print ( f " invoking lpac { ' ' . join ( list ( cmd ) ) } " )
ret = subprocess . Popen ( [ ' sudo ' , ' -E ' , ' lpac ' ] + list ( cmd ) , shell = False , stdout = subprocess . PIPE , stderr = subprocess . PIPE , env = self . env )
try :
out , err = ret . communicate ( timeout = self . timeout_sec )
except subprocess . TimeoutExpired as e :
ret . kill ( )
raise LPAError ( f " lpac { cmd } timed out after { self . timeout_sec } seconds " ) from e
messages = [ ]
for line in out . decode ( ) . strip ( ) . splitlines ( ) :
if line . startswith ( ' { ' ) :
message = json . loads ( line )
# lpac response format validations
assert ' type ' in message , ' expected type in message '
assert message [ ' type ' ] == ' lpa ' or message [ ' type ' ] == ' progress ' , ' expected lpa or progress message type '
assert ' payload ' in message , ' expected payload in message '
assert ' code ' in message [ ' payload ' ] , ' expected code in message payload '
if message [ ' payload ' ] [ ' code ' ] != 0 :
raise LPAError ( f " lpac { ' ' . join ( cmd ) } failed with code { message [ ' payload ' ] [ ' code ' ] } : < { message [ ' payload ' ] [ ' message ' ] } > { message [ ' payload ' ] [ ' data ' ] } " )
assert ' data ' in message [ ' payload ' ] , ' expected data in message payload '
messages . append ( message )
if len ( messages ) == 0 :
raise LPAError ( f " lpac { cmd } returned no messages " )
return messages
def post ( url , payload ) :
print ( )
print ( " POST to " , url )
r = requests . post (
url ,
data = payload ,
verify = False ,
headers = {
" Content-Type " : " application/json " ,
" X-Admin-Protocol " : " gsma/rsp/v2.2.0 " ,
" charset " : " utf-8 " ,
" User-Agent " : " gsma-rsp-lpad " ,
} ,
)
print ( " resp " , r )
print ( " resp text " , repr ( r . text ) )
print ( )
r . raise_for_status ( )
ret = f " HTTP/1.1 { r . status_code } "
ret + = ' ' . join ( f " { k } : { v } " for k , v in r . headers . items ( ) if k != ' Connection ' )
return ret . encode ( ) + r . content
class LPA :
def __init__ ( self ) :
self . dev = serial . Serial ( ' /dev/ttyUSB2 ' , baudrate = 57600 , timeout = 1 , bytesize = 8 )
self . dev . reset_input_buffer ( )
self . dev . reset_output_buffer ( )
assert " OK " in self . at ( " AT " )
def at ( self , cmd ) :
print ( f " ==> { cmd } " )
self . dev . write ( cmd . encode ( ) + b ' \r \n ' )
r = b " "
cnt = 0
while b " OK " not in r and b " ERROR " not in r and cnt < 20 :
r + = self . dev . read ( 8192 ) . strip ( )
cnt + = 1
r = r . decode ( )
print ( f " <== { repr ( r ) } " )
return r
def download_ota ( self , qr ) :
return self . at ( f ' AT+QESIM= " ota " , " { qr } " ' )
def download ( self , qr ) :
smdp = qr . split ( ' $ ' ) [ 1 ]
out = self . at ( f ' AT+QESIM= " download " , " { qr } " ' )
for _ in range ( 5 ) :
line = out . split ( " +QESIM: " ) [ 1 ] . split ( " \r \n \r \n OK " ) [ 0 ]
parts = [ x . strip ( ) . strip ( ' " ' ) for x in line . split ( ' , ' , maxsplit = 4 ) ]
print ( repr ( parts ) )
trans , ret , url , payloadlen , payload = parts
assert trans == " trans " and ret == " 0 "
assert len ( payload ) == int ( payloadlen )
r = post ( f " https:// { smdp } / { url } " , payload )
to_send = binascii . hexlify ( r ) . decode ( )
chunk_len = 1400
for i in range ( math . ceil ( len ( to_send ) / chunk_len ) ) :
state = 1 if ( i + 1 ) * chunk_len < len ( to_send ) else 0
data = to_send [ i * chunk_len : ( i + 1 ) * chunk_len ]
out = self . at ( f ' AT+QESIM= " trans " , { len ( to_send ) } , { state } , { i } , { len ( data ) } , " { data } " ' )
assert " OK " in out
if ' +QESIM: " download " ,1 ' in out :
raise Exception ( " profile install failed " )
elif ' +QESIM: " download " ,0 ' in out :
print ( " done, successfully loaded " )
break
def enable ( self , iccid ) :
self . at ( f ' AT+QESIM= " enable " , " { iccid } " ' )
def disable ( self , iccid ) :
self . at ( f ' AT+QESIM= " disable " , " { iccid } " ' )
def delete ( self , iccid ) :
self . at ( f ' AT+QESIM= " delete " , " { iccid } " ' )
def list_profiles ( self ) :
out = self . at ( ' AT+QESIM= " list " ' )
return out . strip ( ) . splitlines ( ) [ 1 : ]
if __name__ == " __main__ " :
import sys
lpa = LPA2 ( )
print ( lpa . list_profiles ( ) )
if len ( sys . argv ) > 2 :
if sys . argv [ 1 ] == ' enable ' :
lpa . enable_profile ( sys . argv [ 2 ] )
elif sys . argv [ 1 ] == ' disable ' :
lpa . disable_profile ( sys . argv [ 2 ] )
elif sys . argv [ 1 ] == ' delete ' :
lpa . delete_profile ( sys . argv [ 2 ] )
elif sys . argv [ 1 ] == ' download ' :
assert len ( sys . argv ) == 4 , ' expected profile nickname '
lpa . download_profile ( sys . argv [ 2 ] , sys . argv [ 3 ] )
else :
raise Exception ( f " invalid command: { sys . argv [ 1 ] } " )
if " RESTART " in os . environ :
subprocess . check_call ( " sudo systemctl stop ModemManager " , shell = True )
subprocess . check_call ( " /usr/comma/lte/lte.sh stop_blocking " , shell = True )
subprocess . check_call ( " /usr/comma/lte/lte.sh start " , shell = True )
while not os . path . exists ( ' /dev/ttyUSB2 ' ) :
time . sleep ( 1 )