#!/usr/bin/env python3 import json import os import math import time import binascii import requests import serial import subprocess class LPAError(Exception): pass class LPA: def __init__(self): self.env = os.environ.copy() self.env['LPAC_APDU'] = 'qmi' self.env['QMI_DEVICE'] = '/dev/cdc-wdm0' self.timeout_sec = 30 def _invoke(self, cmd): ret = subprocess.Popen(['sudo', 'lpac', *cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) try: out, err = ret.communicate(timeout=self.timeout_sec) except subprocess.TimeoutExpired as e: ret.kill() raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e if ret.returncode != 0: raise Exception(f"lpac {cmd} failed: {err}") messages = [] for line in out.decode().strip().splitlines(): if line.startswith('{'): message = json.loads(line) # lpac response format validations assert 'type' in message assert message['type'] == 'lpa' assert 'payload' in message assert 'code' in message['payload'] if message['payload']['code'] != 0: raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: {message['payload']['message']}") messages.append(message['payload']['data']) # TODO: confirm we should always get a message if len(messages) == 0: raise LPAError(f"lpac {cmd} returned no messages") return messages def list_profiles(self): raw = self._invoke(['profile', 'list'])[-1] # only one message profiles = [] for profile in raw: profiles.append({ 'iccid': profile['iccid'], 'isdp_aid': profile['isdpAid'], 'nickname': profile['profileNickname'], 'enabled': profile['profileState'] == 'enabled', 'provider': profile['serviceProviderName'], }) return profiles def post(url, payload): print() print("POST to", url) r = requests.post( url, data=payload, verify=False, headers={ "Content-Type": "application/json", "X-Admin-Protocol": "gsma/rsp/v2.2.0", "charset": "utf-8", "User-Agent": "gsma-rsp-lpad", }, ) print("resp", r) print("resp text", repr(r.text)) print() r.raise_for_status() ret = f"HTTP/1.1 {r.status_code}" ret += ''.join(f"{k}: {v}" for k, v in r.headers.items() if k != 'Connection') return ret.encode() + r.content class LPA: def __init__(self): self.dev = serial.Serial('/dev/ttyUSB2', baudrate=57600, timeout=1, bytesize=8) self.dev.reset_input_buffer() self.dev.reset_output_buffer() assert "OK" in self.at("AT") def at(self, cmd): print(f"==> {cmd}") self.dev.write(cmd.encode() + b'\r\n') r = b"" cnt = 0 while b"OK" not in r and b"ERROR" not in r and cnt < 20: r += self.dev.read(8192).strip() cnt += 1 r = r.decode() print(f"<== {repr(r)}") return r def download_ota(self, qr): return self.at(f'AT+QESIM="ota","{qr}"') def download(self, qr): smdp = qr.split('$')[1] out = self.at(f'AT+QESIM="download","{qr}"') for _ in range(5): line = out.split("+QESIM: ")[1].split("\r\n\r\nOK")[0] parts = [x.strip().strip('"') for x in line.split(',', maxsplit=4)] print(repr(parts)) trans, ret, url, payloadlen, payload = parts assert trans == "trans" and ret == "0" assert len(payload) == int(payloadlen) r = post(f"https://{smdp}/{url}", payload) to_send = binascii.hexlify(r).decode() chunk_len = 1400 for i in range(math.ceil(len(to_send) / chunk_len)): state = 1 if (i+1)*chunk_len < len(to_send) else 0 data = to_send[i * chunk_len : (i+1)*chunk_len] out = self.at(f'AT+QESIM="trans",{len(to_send)},{state},{i},{len(data)},"{data}"') assert "OK" in out if '+QESIM:"download",1' in out: raise Exception("profile install failed") elif '+QESIM:"download",0' in out: print("done, successfully loaded") break def enable(self, iccid): self.at(f'AT+QESIM="enable","{iccid}"') def disable(self, iccid): self.at(f'AT+QESIM="disable","{iccid}"') def delete(self, iccid): self.at(f'AT+QESIM="delete","{iccid}"') def list_profiles(self): out = self.at('AT+QESIM="list"') return out.strip().splitlines()[1:] if __name__ == "__main__": import sys if "RESTART" in os.environ: subprocess.check_call("sudo systemctl stop ModemManager", shell=True) subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) while not os.path.exists('/dev/ttyUSB2'): time.sleep(1) time.sleep(3) lpa = LPA() print(lpa.list_profiles()) if len(sys.argv) > 1: lpa.download(sys.argv[1]) print(lpa.list_profiles())