diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index cbfeade7bf..059558654f 100755 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -1,12 +1,8 @@ #!/usr/bin/env python3 import json import os -import math -import time -import binascii -import requests -import serial import subprocess +import time class LPAError(RuntimeError): @@ -16,7 +12,7 @@ class LPAProfileNotFoundError(LPAError): pass -class LPA2: +class LPA: def __init__(self): self.env = os.environ.copy() self.env['LPAC_APDU'] = 'qmi' @@ -133,99 +129,10 @@ class LPA2: return messages -def post(url, payload): - print() - print("POST to", url) - r = requests.post( - url, - data=payload, - verify=False, - headers={ - "Content-Type": "application/json", - "X-Admin-Protocol": "gsma/rsp/v2.2.0", - "charset": "utf-8", - "User-Agent": "gsma-rsp-lpad", - }, - ) - print("resp", r) - print("resp text", repr(r.text)) - print() - r.raise_for_status() - - ret = f"HTTP/1.1 {r.status_code}" - ret += ''.join(f"{k}: {v}" for k, v in r.headers.items() if k != 'Connection') - return ret.encode() + r.content - - -class LPA: - def __init__(self): - self.dev = serial.Serial('/dev/ttyUSB2', baudrate=57600, timeout=1, bytesize=8) - self.dev.reset_input_buffer() - self.dev.reset_output_buffer() - assert "OK" in self.at("AT") - - def at(self, cmd): - print(f"==> {cmd}") - self.dev.write(cmd.encode() + b'\r\n') - - r = b"" - cnt = 0 - while b"OK" not in r and b"ERROR" not in r and cnt < 20: - r += self.dev.read(8192).strip() - cnt += 1 - r = r.decode() - print(f"<== {repr(r)}") - return r - - def download_ota(self, qr): - return self.at(f'AT+QESIM="ota","{qr}"') - - def download(self, qr): - smdp = qr.split('$')[1] - out = self.at(f'AT+QESIM="download","{qr}"') - for _ in range(5): - line = out.split("+QESIM: ")[1].split("\r\n\r\nOK")[0] - - parts = [x.strip().strip('"') for x in line.split(',', maxsplit=4)] - print(repr(parts)) - trans, ret, url, payloadlen, payload = parts - assert trans == "trans" and ret == "0" - assert len(payload) == int(payloadlen) - - r = post(f"https://{smdp}/{url}", payload) - to_send = binascii.hexlify(r).decode() - - chunk_len = 1400 - for i in range(math.ceil(len(to_send) / chunk_len)): - state = 1 if (i+1)*chunk_len < len(to_send) else 0 - data = to_send[i * chunk_len : (i+1)*chunk_len] - out = self.at(f'AT+QESIM="trans",{len(to_send)},{state},{i},{len(data)},"{data}"') - assert "OK" in out - - if '+QESIM:"download",1' in out: - raise Exception("profile install failed") - elif '+QESIM:"download",0' in out: - print("done, successfully loaded") - break - - def enable(self, iccid): - self.at(f'AT+QESIM="enable","{iccid}"') - - def disable(self, iccid): - self.at(f'AT+QESIM="disable","{iccid}"') - - def delete(self, iccid): - self.at(f'AT+QESIM="delete","{iccid}"') - - def list_profiles(self): - out = self.at('AT+QESIM="list"') - return out.strip().splitlines()[1:] - - if __name__ == "__main__": import sys - lpa = LPA2() + lpa = LPA() print(lpa.list_profiles()) if len(sys.argv) > 2: