From 7c38cfe85fd264e0eecbbb122f57622928170fe6 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Mon, 19 May 2025 21:06:39 -0700 Subject: [PATCH] keep old file the same for easier diff --- system/hardware/tici/esim.py | 232 ++++++++++++++--------------------- system/hardware/tici/lpa.py | 161 ++++++++++++++++++++++++ 2 files changed, 254 insertions(+), 139 deletions(-) create mode 100644 system/hardware/tici/lpa.py diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index bcdc1a6a3d..df76c1a5fd 100755 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -1,161 +1,115 @@ #!/usr/bin/env python3 -import json import os -import subprocess +import math import time -from dataclasses import dataclass +import binascii +import requests +import serial +import subprocess -@dataclass -class Profile: - iccid: str - isdp_aid: str - nickname: str - enabled: bool - provider: str -class LPAError(Exception): - pass +def post(url, payload): + print() + print("POST to", url) + r = requests.post( + url, + data=payload, + verify=False, + headers={ + "Content-Type": "application/json", + "X-Admin-Protocol": "gsma/rsp/v2.2.0", + "charset": "utf-8", + "User-Agent": "gsma-rsp-lpad", + }, + ) + print("resp", r) + print("resp text", repr(r.text)) + print() + r.raise_for_status() + + ret = f"HTTP/1.1 {r.status_code}" + ret += ''.join(f"{k}: {v}" for k, v in r.headers.items() if k != 'Connection') + return ret.encode() + r.content class LPA: def __init__(self): - self.env = os.environ.copy() - self.env['LPAC_APDU'] = 'qmi' - self.env['QMI_DEVICE'] = '/dev/cdc-wdm0' - - self.timeout_sec = 30 + self.dev = serial.Serial('/dev/ttyUSB2', baudrate=57600, timeout=1, bytesize=8) + self.dev.reset_input_buffer() + self.dev.reset_output_buffer() + assert "OK" in self.at("AT") + + def at(self, cmd): + print(f"==> {cmd}") + self.dev.write(cmd.encode() + b'\r\n') + + r = b"" + cnt = 0 + while b"OK" not in r and b"ERROR" not in r and cnt < 20: + r += self.dev.read(8192).strip() + cnt += 1 + r = r.decode() + print(f"<== {repr(r)}") + return r + + def download_ota(self, qr): + return self.at(f'AT+QESIM="ota","{qr}"') + + def download(self, qr): + smdp = qr.split('$')[1] + out = self.at(f'AT+QESIM="download","{qr}"') + for _ in range(5): + line = out.split("+QESIM: ")[1].split("\r\n\r\nOK")[0] + + parts = [x.strip().strip('"') for x in line.split(',', maxsplit=4)] + print(repr(parts)) + trans, ret, url, payloadlen, payload = parts + assert trans == "trans" and ret == "0" + assert len(payload) == int(payloadlen) + + r = post(f"https://{smdp}/{url}", payload) + to_send = binascii.hexlify(r).decode() + + chunk_len = 1400 + for i in range(math.ceil(len(to_send) / chunk_len)): + state = 1 if (i+1)*chunk_len < len(to_send) else 0 + data = to_send[i * chunk_len : (i+1)*chunk_len] + out = self.at(f'AT+QESIM="trans",{len(to_send)},{state},{i},{len(data)},"{data}"') + assert "OK" in out + + if '+QESIM:"download",1' in out: + raise Exception("profile install failed") + elif '+QESIM:"download",0' in out: + print("done, successfully loaded") + break + + def enable(self, iccid): + self.at(f'AT+QESIM="enable","{iccid}"') + + def disable(self, iccid): + self.at(f'AT+QESIM="disable","{iccid}"') + + def delete(self, iccid): + self.at(f'AT+QESIM="delete","{iccid}"') def list_profiles(self): - """ - List all profiles on the eUICC. - """ - raw = self._invoke('profile', 'list')[-1] # only one message - - profiles = [] - for profile in raw['payload']['data']: - profiles.append(Profile( - iccid=profile['iccid'], - isdp_aid=profile['isdpAid'], - nickname=profile['profileNickname'], - enabled=profile['profileState'] == 'enabled', - provider=profile['serviceProviderName'], - )) - - return profiles - - def profile_exists(self, iccid: str) -> bool: - """ - Check if a profile exists on the eUICC. - """ - profiles = self.list_profiles() - return any(profile.iccid == iccid for profile in profiles) - - def get_active_profile(self): - """ - Get the active profile on the eUICC. - """ - profiles = self.list_profiles() - for profile in profiles: - if profile.enabled: - return profile - return None - - def process_notifications(self) -> None: - """ - Process notifications from the LPA, typically to activate/deactivate the profile with the carrier. - """ - msgs = self._invoke('notification', 'process', '-a', '-r') - assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' - - def enable_profile(self, iccid: str) -> None: - """ - Enable the profile on the eUICC. - """ - self.validate_profile(iccid) - latest = self.get_active_profile() - if latest is not None and latest.iccid == iccid: - raise LPAError(f'profile {iccid} is already enabled') - elif latest is not None: - self.disable_profile(latest.iccid) - - msgs = self._invoke('profile', 'enable', iccid) - assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' - self.process_notifications() - - def disable_profile(self, iccid: str) -> None: - """ - Disable the profile on the eUICC. - """ - self.validate_profile(iccid) - latest = self.get_active_profile() - if latest is None: - return - if latest.iccid != iccid: - raise LPAError(f'profile {iccid} is not enabled') - - msgs = self._invoke('profile', 'disable', iccid) - assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' - self.process_notifications() - - - def validate_profile(self, iccid: str) -> None: - """ - Validate the profile on the eUICC. - """ - if not self.profile_exists(iccid): - raise LPAError(f'profile {iccid} does not exist') - - - def _invoke(self, *cmd: str): - print(f"-> lpac {' '.join(list(cmd))}") - ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) - try: - out, err = ret.communicate(timeout=self.timeout_sec) - except subprocess.TimeoutExpired as e: - ret.kill() - raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e - - messages = [] - for line in out.decode().strip().splitlines(): - if line.startswith('{'): - message = json.loads(line) - - # lpac response format validations - assert 'type' in message, 'expected type in message' - assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type' - assert 'payload' in message, 'expected payload in message' - assert 'code' in message['payload'], 'expected code in message payload' - - if message['payload']['code'] != 0: - raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: <{message['payload']['message']}> {message['payload']['data']}") - - assert 'data' in message['payload'], 'expected data in message payload' - - messages.append(message) - - if len(messages) == 0: - raise LPAError(f"lpac {cmd} returned no messages") - - return messages + out = self.at('AT+QESIM="list"') + return out.strip().splitlines()[1:] if __name__ == "__main__": import sys - lpa = LPA() - print(lpa.list_profiles()) - - if len(sys.argv) > 2: - if sys.argv[1] == 'enable': - lpa.enable_profile(sys.argv[2]) - elif sys.argv[1] == 'disable': - lpa.disable_profile(sys.argv[2]) - else: - raise Exception(f"invalid command: {sys.argv[1]}") - if "RESTART" in os.environ: subprocess.check_call("sudo systemctl stop ModemManager", shell=True) subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) while not os.path.exists('/dev/ttyUSB2'): time.sleep(1) + time.sleep(3) + + lpa = LPA() + print(lpa.list_profiles()) + if len(sys.argv) > 1: + lpa.download(sys.argv[1]) + print(lpa.list_profiles()) diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py new file mode 100644 index 0000000000..bcdc1a6a3d --- /dev/null +++ b/system/hardware/tici/lpa.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +import json +import os +import subprocess +import time +from dataclasses import dataclass + +@dataclass +class Profile: + iccid: str + isdp_aid: str + nickname: str + enabled: bool + provider: str + +class LPAError(Exception): + pass + + +class LPA: + def __init__(self): + self.env = os.environ.copy() + self.env['LPAC_APDU'] = 'qmi' + self.env['QMI_DEVICE'] = '/dev/cdc-wdm0' + + self.timeout_sec = 30 + + def list_profiles(self): + """ + List all profiles on the eUICC. + """ + raw = self._invoke('profile', 'list')[-1] # only one message + + profiles = [] + for profile in raw['payload']['data']: + profiles.append(Profile( + iccid=profile['iccid'], + isdp_aid=profile['isdpAid'], + nickname=profile['profileNickname'], + enabled=profile['profileState'] == 'enabled', + provider=profile['serviceProviderName'], + )) + + return profiles + + def profile_exists(self, iccid: str) -> bool: + """ + Check if a profile exists on the eUICC. + """ + profiles = self.list_profiles() + return any(profile.iccid == iccid for profile in profiles) + + def get_active_profile(self): + """ + Get the active profile on the eUICC. + """ + profiles = self.list_profiles() + for profile in profiles: + if profile.enabled: + return profile + return None + + def process_notifications(self) -> None: + """ + Process notifications from the LPA, typically to activate/deactivate the profile with the carrier. + """ + msgs = self._invoke('notification', 'process', '-a', '-r') + assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' + + def enable_profile(self, iccid: str) -> None: + """ + Enable the profile on the eUICC. + """ + self.validate_profile(iccid) + latest = self.get_active_profile() + if latest is not None and latest.iccid == iccid: + raise LPAError(f'profile {iccid} is already enabled') + elif latest is not None: + self.disable_profile(latest.iccid) + + msgs = self._invoke('profile', 'enable', iccid) + assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' + self.process_notifications() + + def disable_profile(self, iccid: str) -> None: + """ + Disable the profile on the eUICC. + """ + self.validate_profile(iccid) + latest = self.get_active_profile() + if latest is None: + return + if latest.iccid != iccid: + raise LPAError(f'profile {iccid} is not enabled') + + msgs = self._invoke('profile', 'disable', iccid) + assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' + self.process_notifications() + + + def validate_profile(self, iccid: str) -> None: + """ + Validate the profile on the eUICC. + """ + if not self.profile_exists(iccid): + raise LPAError(f'profile {iccid} does not exist') + + + def _invoke(self, *cmd: str): + print(f"-> lpac {' '.join(list(cmd))}") + ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) + try: + out, err = ret.communicate(timeout=self.timeout_sec) + except subprocess.TimeoutExpired as e: + ret.kill() + raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e + + messages = [] + for line in out.decode().strip().splitlines(): + if line.startswith('{'): + message = json.loads(line) + + # lpac response format validations + assert 'type' in message, 'expected type in message' + assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type' + assert 'payload' in message, 'expected payload in message' + assert 'code' in message['payload'], 'expected code in message payload' + + if message['payload']['code'] != 0: + raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: <{message['payload']['message']}> {message['payload']['data']}") + + assert 'data' in message['payload'], 'expected data in message payload' + + messages.append(message) + + if len(messages) == 0: + raise LPAError(f"lpac {cmd} returned no messages") + + return messages + + +if __name__ == "__main__": + import sys + + lpa = LPA() + print(lpa.list_profiles()) + + if len(sys.argv) > 2: + if sys.argv[1] == 'enable': + lpa.enable_profile(sys.argv[2]) + elif sys.argv[1] == 'disable': + lpa.disable_profile(sys.argv[2]) + else: + raise Exception(f"invalid command: {sys.argv[1]}") + + if "RESTART" in os.environ: + subprocess.check_call("sudo systemctl stop ModemManager", shell=True) + subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) + subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) + while not os.path.exists('/dev/ttyUSB2'): + time.sleep(1)