#!/usr/bin/env python3 import os import math import time import binascii import requests import serial import subprocess def post(url, payload): print() print("POST to", url) r = requests.post( url, data=payload, verify=False, headers={ "Content-Type": "application/json", "X-Admin-Protocol": "gsma/rsp/v2.2.0", "charset": "utf-8", "User-Agent": "gsma-rsp-lpad", }, ) print("resp", r) print("resp text", repr(r.text)) print() r.raise_for_status() ret = f"HTTP/1.1 {r.status_code}" ret += ''.join(f"{k}: {v}" for k, v in r.headers.items() if k != 'Connection') return ret.encode() + r.content class LPA: def __init__(self): self.dev = serial.Serial('/dev/ttyUSB2', baudrate=57600, timeout=1, bytesize=8) self.dev.reset_input_buffer() self.dev.reset_output_buffer() assert "OK" in self.at("AT") def at(self, cmd): print(f"==> {cmd}") self.dev.write(cmd.encode() + b'\r\n') r = b"" cnt = 0 while b"OK" not in r and b"ERROR" not in r and cnt < 20: r += self.dev.read(8192).strip() cnt += 1 r = r.decode() print(f"<== {repr(r)}") return r def download_ota(self, qr): return self.at(f'AT+QESIM="ota","{qr}"') def download(self, qr): smdp = qr.split('$')[1] out = self.at(f'AT+QESIM="download","{qr}"') for _ in range(5): line = out.split("+QESIM: ")[1].split("\r\n\r\nOK")[0] parts = [x.strip().strip('"') for x in line.split(',', maxsplit=4)] print(repr(parts)) trans, ret, url, payloadlen, payload = parts assert trans == "trans" and ret == "0" assert len(payload) == int(payloadlen) r = post(f"https://{smdp}/{url}", payload) to_send = binascii.hexlify(r).decode() chunk_len = 1400 for i in range(math.ceil(len(to_send) / chunk_len)): state = 1 if (i+1)*chunk_len < len(to_send) else 0 data = to_send[i * chunk_len : (i+1)*chunk_len] out = self.at(f'AT+QESIM="trans",{len(to_send)},{state},{i},{len(data)},"{data}"') assert "OK" in out if '+QESIM:"download",1' in out: raise Exception("profile install failed") elif '+QESIM:"download",0' in out: print("done, successfully loaded") break def enable(self, iccid): self.at(f'AT+QESIM="enable","{iccid}"') def disable(self, iccid): self.at(f'AT+QESIM="disable","{iccid}"') def delete(self, iccid): self.at(f'AT+QESIM="delete","{iccid}"') def list_profiles(self): out = self.at('AT+QESIM="list"') return out.strip().splitlines()[1:] if __name__ == "__main__": import sys if "RESTART" in os.environ: subprocess.check_call("sudo systemctl stop ModemManager", shell=True) subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) while not os.path.exists('/dev/ttyUSB2'): time.sleep(1) time.sleep(3) lpa = LPA() print(lpa.list_profiles()) if len(sys.argv) > 1: lpa.download(sys.argv[1]) print(lpa.list_profiles())