parent
58a3b0195e
commit
a5a9ebee45
2 changed files with 254 additions and 139 deletions
@ -1,161 +1,115 @@ |
||||
#!/usr/bin/env python3 |
||||
import json |
||||
import os |
||||
import subprocess |
||||
import math |
||||
import time |
||||
from dataclasses import dataclass |
||||
import binascii |
||||
import requests |
||||
import serial |
||||
import subprocess |
||||
|
||||
@dataclass |
||||
class Profile: |
||||
iccid: str |
||||
isdp_aid: str |
||||
nickname: str |
||||
enabled: bool |
||||
provider: str |
||||
|
||||
class LPAError(Exception): |
||||
pass |
||||
def post(url, payload): |
||||
print() |
||||
print("POST to", url) |
||||
r = requests.post( |
||||
url, |
||||
data=payload, |
||||
verify=False, |
||||
headers={ |
||||
"Content-Type": "application/json", |
||||
"X-Admin-Protocol": "gsma/rsp/v2.2.0", |
||||
"charset": "utf-8", |
||||
"User-Agent": "gsma-rsp-lpad", |
||||
}, |
||||
) |
||||
print("resp", r) |
||||
print("resp text", repr(r.text)) |
||||
print() |
||||
r.raise_for_status() |
||||
|
||||
ret = f"HTTP/1.1 {r.status_code}" |
||||
ret += ''.join(f"{k}: {v}" for k, v in r.headers.items() if k != 'Connection') |
||||
return ret.encode() + r.content |
||||
|
||||
|
||||
class LPA: |
||||
def __init__(self): |
||||
self.env = os.environ.copy() |
||||
self.env['LPAC_APDU'] = 'qmi' |
||||
self.env['QMI_DEVICE'] = '/dev/cdc-wdm0' |
||||
|
||||
self.timeout_sec = 30 |
||||
self.dev = serial.Serial('/dev/ttyUSB2', baudrate=57600, timeout=1, bytesize=8) |
||||
self.dev.reset_input_buffer() |
||||
self.dev.reset_output_buffer() |
||||
assert "OK" in self.at("AT") |
||||
|
||||
def at(self, cmd): |
||||
print(f"==> {cmd}") |
||||
self.dev.write(cmd.encode() + b'\r\n') |
||||
|
||||
r = b"" |
||||
cnt = 0 |
||||
while b"OK" not in r and b"ERROR" not in r and cnt < 20: |
||||
r += self.dev.read(8192).strip() |
||||
cnt += 1 |
||||
r = r.decode() |
||||
print(f"<== {repr(r)}") |
||||
return r |
||||
|
||||
def download_ota(self, qr): |
||||
return self.at(f'AT+QESIM="ota","{qr}"') |
||||
|
||||
def download(self, qr): |
||||
smdp = qr.split('$')[1] |
||||
out = self.at(f'AT+QESIM="download","{qr}"') |
||||
for _ in range(5): |
||||
line = out.split("+QESIM: ")[1].split("\r\n\r\nOK")[0] |
||||
|
||||
parts = [x.strip().strip('"') for x in line.split(',', maxsplit=4)] |
||||
print(repr(parts)) |
||||
trans, ret, url, payloadlen, payload = parts |
||||
assert trans == "trans" and ret == "0" |
||||
assert len(payload) == int(payloadlen) |
||||
|
||||
r = post(f"https://{smdp}/{url}", payload) |
||||
to_send = binascii.hexlify(r).decode() |
||||
|
||||
chunk_len = 1400 |
||||
for i in range(math.ceil(len(to_send) / chunk_len)): |
||||
state = 1 if (i+1)*chunk_len < len(to_send) else 0 |
||||
data = to_send[i * chunk_len : (i+1)*chunk_len] |
||||
out = self.at(f'AT+QESIM="trans",{len(to_send)},{state},{i},{len(data)},"{data}"') |
||||
assert "OK" in out |
||||
|
||||
if '+QESIM:"download",1' in out: |
||||
raise Exception("profile install failed") |
||||
elif '+QESIM:"download",0' in out: |
||||
print("done, successfully loaded") |
||||
break |
||||
|
||||
def enable(self, iccid): |
||||
self.at(f'AT+QESIM="enable","{iccid}"') |
||||
|
||||
def disable(self, iccid): |
||||
self.at(f'AT+QESIM="disable","{iccid}"') |
||||
|
||||
def delete(self, iccid): |
||||
self.at(f'AT+QESIM="delete","{iccid}"') |
||||
|
||||
def list_profiles(self): |
||||
""" |
||||
List all profiles on the eUICC. |
||||
""" |
||||
raw = self._invoke('profile', 'list')[-1] # only one message |
||||
|
||||
profiles = [] |
||||
for profile in raw['payload']['data']: |
||||
profiles.append(Profile( |
||||
iccid=profile['iccid'], |
||||
isdp_aid=profile['isdpAid'], |
||||
nickname=profile['profileNickname'], |
||||
enabled=profile['profileState'] == 'enabled', |
||||
provider=profile['serviceProviderName'], |
||||
)) |
||||
|
||||
return profiles |
||||
|
||||
def profile_exists(self, iccid: str) -> bool: |
||||
""" |
||||
Check if a profile exists on the eUICC. |
||||
""" |
||||
profiles = self.list_profiles() |
||||
return any(profile.iccid == iccid for profile in profiles) |
||||
|
||||
def get_active_profile(self): |
||||
""" |
||||
Get the active profile on the eUICC. |
||||
""" |
||||
profiles = self.list_profiles() |
||||
for profile in profiles: |
||||
if profile.enabled: |
||||
return profile |
||||
return None |
||||
|
||||
def process_notifications(self) -> None: |
||||
""" |
||||
Process notifications from the LPA, typically to activate/deactivate the profile with the carrier. |
||||
""" |
||||
msgs = self._invoke('notification', 'process', '-a', '-r') |
||||
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' |
||||
|
||||
def enable_profile(self, iccid: str) -> None: |
||||
""" |
||||
Enable the profile on the eUICC. |
||||
""" |
||||
self.validate_profile(iccid) |
||||
latest = self.get_active_profile() |
||||
if latest is not None and latest.iccid == iccid: |
||||
raise LPAError(f'profile {iccid} is already enabled') |
||||
elif latest is not None: |
||||
self.disable_profile(latest.iccid) |
||||
|
||||
msgs = self._invoke('profile', 'enable', iccid) |
||||
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' |
||||
self.process_notifications() |
||||
|
||||
def disable_profile(self, iccid: str) -> None: |
||||
""" |
||||
Disable the profile on the eUICC. |
||||
""" |
||||
self.validate_profile(iccid) |
||||
latest = self.get_active_profile() |
||||
if latest is None: |
||||
return |
||||
if latest.iccid != iccid: |
||||
raise LPAError(f'profile {iccid} is not enabled') |
||||
|
||||
msgs = self._invoke('profile', 'disable', iccid) |
||||
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' |
||||
self.process_notifications() |
||||
|
||||
|
||||
def validate_profile(self, iccid: str) -> None: |
||||
""" |
||||
Validate the profile on the eUICC. |
||||
""" |
||||
if not self.profile_exists(iccid): |
||||
raise LPAError(f'profile {iccid} does not exist') |
||||
|
||||
|
||||
def _invoke(self, *cmd: str): |
||||
print(f"-> lpac {' '.join(list(cmd))}") |
||||
ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) |
||||
try: |
||||
out, err = ret.communicate(timeout=self.timeout_sec) |
||||
except subprocess.TimeoutExpired as e: |
||||
ret.kill() |
||||
raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e |
||||
|
||||
messages = [] |
||||
for line in out.decode().strip().splitlines(): |
||||
if line.startswith('{'): |
||||
message = json.loads(line) |
||||
|
||||
# lpac response format validations |
||||
assert 'type' in message, 'expected type in message' |
||||
assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type' |
||||
assert 'payload' in message, 'expected payload in message' |
||||
assert 'code' in message['payload'], 'expected code in message payload' |
||||
|
||||
if message['payload']['code'] != 0: |
||||
raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: <{message['payload']['message']}> {message['payload']['data']}") |
||||
|
||||
assert 'data' in message['payload'], 'expected data in message payload' |
||||
|
||||
messages.append(message) |
||||
|
||||
if len(messages) == 0: |
||||
raise LPAError(f"lpac {cmd} returned no messages") |
||||
|
||||
return messages |
||||
out = self.at('AT+QESIM="list"') |
||||
return out.strip().splitlines()[1:] |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
import sys |
||||
|
||||
lpa = LPA() |
||||
print(lpa.list_profiles()) |
||||
|
||||
if len(sys.argv) > 2: |
||||
if sys.argv[1] == 'enable': |
||||
lpa.enable_profile(sys.argv[2]) |
||||
elif sys.argv[1] == 'disable': |
||||
lpa.disable_profile(sys.argv[2]) |
||||
else: |
||||
raise Exception(f"invalid command: {sys.argv[1]}") |
||||
|
||||
if "RESTART" in os.environ: |
||||
subprocess.check_call("sudo systemctl stop ModemManager", shell=True) |
||||
subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) |
||||
subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) |
||||
while not os.path.exists('/dev/ttyUSB2'): |
||||
time.sleep(1) |
||||
time.sleep(3) |
||||
|
||||
lpa = LPA() |
||||
print(lpa.list_profiles()) |
||||
if len(sys.argv) > 1: |
||||
lpa.download(sys.argv[1]) |
||||
print(lpa.list_profiles()) |
||||
|
@ -0,0 +1,161 @@ |
||||
#!/usr/bin/env python3 |
||||
import json |
||||
import os |
||||
import subprocess |
||||
import time |
||||
from dataclasses import dataclass |
||||
|
||||
@dataclass |
||||
class Profile: |
||||
iccid: str |
||||
isdp_aid: str |
||||
nickname: str |
||||
enabled: bool |
||||
provider: str |
||||
|
||||
class LPAError(Exception): |
||||
pass |
||||
|
||||
|
||||
class LPA: |
||||
def __init__(self): |
||||
self.env = os.environ.copy() |
||||
self.env['LPAC_APDU'] = 'qmi' |
||||
self.env['QMI_DEVICE'] = '/dev/cdc-wdm0' |
||||
|
||||
self.timeout_sec = 30 |
||||
|
||||
def list_profiles(self): |
||||
""" |
||||
List all profiles on the eUICC. |
||||
""" |
||||
raw = self._invoke('profile', 'list')[-1] # only one message |
||||
|
||||
profiles = [] |
||||
for profile in raw['payload']['data']: |
||||
profiles.append(Profile( |
||||
iccid=profile['iccid'], |
||||
isdp_aid=profile['isdpAid'], |
||||
nickname=profile['profileNickname'], |
||||
enabled=profile['profileState'] == 'enabled', |
||||
provider=profile['serviceProviderName'], |
||||
)) |
||||
|
||||
return profiles |
||||
|
||||
def profile_exists(self, iccid: str) -> bool: |
||||
""" |
||||
Check if a profile exists on the eUICC. |
||||
""" |
||||
profiles = self.list_profiles() |
||||
return any(profile.iccid == iccid for profile in profiles) |
||||
|
||||
def get_active_profile(self): |
||||
""" |
||||
Get the active profile on the eUICC. |
||||
""" |
||||
profiles = self.list_profiles() |
||||
for profile in profiles: |
||||
if profile.enabled: |
||||
return profile |
||||
return None |
||||
|
||||
def process_notifications(self) -> None: |
||||
""" |
||||
Process notifications from the LPA, typically to activate/deactivate the profile with the carrier. |
||||
""" |
||||
msgs = self._invoke('notification', 'process', '-a', '-r') |
||||
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' |
||||
|
||||
def enable_profile(self, iccid: str) -> None: |
||||
""" |
||||
Enable the profile on the eUICC. |
||||
""" |
||||
self.validate_profile(iccid) |
||||
latest = self.get_active_profile() |
||||
if latest is not None and latest.iccid == iccid: |
||||
raise LPAError(f'profile {iccid} is already enabled') |
||||
elif latest is not None: |
||||
self.disable_profile(latest.iccid) |
||||
|
||||
msgs = self._invoke('profile', 'enable', iccid) |
||||
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' |
||||
self.process_notifications() |
||||
|
||||
def disable_profile(self, iccid: str) -> None: |
||||
""" |
||||
Disable the profile on the eUICC. |
||||
""" |
||||
self.validate_profile(iccid) |
||||
latest = self.get_active_profile() |
||||
if latest is None: |
||||
return |
||||
if latest.iccid != iccid: |
||||
raise LPAError(f'profile {iccid} is not enabled') |
||||
|
||||
msgs = self._invoke('profile', 'disable', iccid) |
||||
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' |
||||
self.process_notifications() |
||||
|
||||
|
||||
def validate_profile(self, iccid: str) -> None: |
||||
""" |
||||
Validate the profile on the eUICC. |
||||
""" |
||||
if not self.profile_exists(iccid): |
||||
raise LPAError(f'profile {iccid} does not exist') |
||||
|
||||
|
||||
def _invoke(self, *cmd: str): |
||||
print(f"-> lpac {' '.join(list(cmd))}") |
||||
ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) |
||||
try: |
||||
out, err = ret.communicate(timeout=self.timeout_sec) |
||||
except subprocess.TimeoutExpired as e: |
||||
ret.kill() |
||||
raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e |
||||
|
||||
messages = [] |
||||
for line in out.decode().strip().splitlines(): |
||||
if line.startswith('{'): |
||||
message = json.loads(line) |
||||
|
||||
# lpac response format validations |
||||
assert 'type' in message, 'expected type in message' |
||||
assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type' |
||||
assert 'payload' in message, 'expected payload in message' |
||||
assert 'code' in message['payload'], 'expected code in message payload' |
||||
|
||||
if message['payload']['code'] != 0: |
||||
raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: <{message['payload']['message']}> {message['payload']['data']}") |
||||
|
||||
assert 'data' in message['payload'], 'expected data in message payload' |
||||
|
||||
messages.append(message) |
||||
|
||||
if len(messages) == 0: |
||||
raise LPAError(f"lpac {cmd} returned no messages") |
||||
|
||||
return messages |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
import sys |
||||
|
||||
lpa = LPA() |
||||
print(lpa.list_profiles()) |
||||
|
||||
if len(sys.argv) > 2: |
||||
if sys.argv[1] == 'enable': |
||||
lpa.enable_profile(sys.argv[2]) |
||||
elif sys.argv[1] == 'disable': |
||||
lpa.disable_profile(sys.argv[2]) |
||||
else: |
||||
raise Exception(f"invalid command: {sys.argv[1]}") |
||||
|
||||
if "RESTART" in os.environ: |
||||
subprocess.check_call("sudo systemctl stop ModemManager", shell=True) |
||||
subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) |
||||
subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) |
||||
while not os.path.exists('/dev/ttyUSB2'): |
||||
time.sleep(1) |
Loading…
Reference in new issue