From 957bc7cdacda037beff240e3ac768d2f27df5a86 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Mon, 19 May 2025 21:16:37 -0700 Subject: [PATCH] keep --- system/hardware/tici/esim.py | 181 +++++++++++++++++++++++++++++++++-- system/hardware/tici/lpa.py | 161 ------------------------------- 2 files changed, 174 insertions(+), 168 deletions(-) delete mode 100644 system/hardware/tici/lpa.py diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index df76c1a5fd..31bacebc70 100755 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import json import os import math import time @@ -6,6 +7,164 @@ import binascii import requests import serial import subprocess +from dataclasses import dataclass + + +@dataclass +class Profile: + iccid: str + isdp_aid: str + nickname: str + enabled: bool + provider: str + + +class LPAError(Exception): + pass + + +class LPAC: + def __init__(self): + self.env = os.environ.copy() + self.env['LPAC_APDU'] = 'qmi' + self.env['QMI_DEVICE'] = '/dev/cdc-wdm0' + + self.timeout_sec = 30 + + def list_profiles(self): + """ + List all profiles on the eUICC. + """ + raw = self._invoke('profile', 'list')[-1] # only one message + + profiles = [] + for profile in raw['payload']['data']: + profiles.append(Profile( + iccid=profile['iccid'], + isdp_aid=profile['isdpAid'], + nickname=profile['profileNickname'], + enabled=profile['profileState'] == 'enabled', + provider=profile['serviceProviderName'], + )) + + return profiles + + def validate_successful(self, msgs: list[dict]) -> None: + """ + Validate that the last message is a success notification. + """ + assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' + + def profile_exists(self, iccid: str) -> bool: + """ + Check if a profile exists on the eUICC. + """ + return any(p.iccid == iccid for p in self.list_profiles()) + + def get_active_profile(self): + """ + Get the active profile on the eUICC. + """ + profiles = self.list_profiles() + for profile in profiles: + if profile.enabled: + return profile + return None + + def process_notifications(self) -> None: + """ + Process notifications from the LPA, typically to activate/deactivate the profile with the carrier. + """ + msgs = self._invoke('notification', 'process', '-a', '-r') + assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' + + def enable_profile(self, iccid: str) -> None: + """ + Enable the profile on the eUICC. + """ + if not self.profile_exists(iccid): + raise LPAError(f'profile {iccid} does not exist') + + latest = self.get_active_profile() + if latest is None: + raise LPAError('no profile enabled') + if latest.iccid == iccid: + raise LPAError(f'profile {iccid} is already enabled') + else: + self.disable_profile(latest.iccid) + + msgs = self._invoke('profile', 'enable', iccid) + self.validate_successful(msgs) + self.process_notifications() + + def disable_profile(self, iccid: str) -> None: + """ + Disable the profile on the eUICC. + """ + if not self.profile_exists(iccid): + raise LPAError(f'profile {iccid} does not exist') + + latest = self.get_active_profile() + if latest is None: + raise LPAError('no profile enabled') + if latest.iccid != iccid: + raise LPAError(f'profile {iccid} is not enabled') + + msgs = self._invoke('profile', 'disable', iccid) + self.validate_successful(msgs) + self.process_notifications() + + def delete_profile(self, iccid: str) -> None: + """ + Delete the profile on the eUICC. + """ + if not self.profile_exists(iccid): + raise LPAError(f'profile {iccid} does not exist') + + msgs = self._invoke('profile', 'delete', iccid) + self.validate_successful(msgs) + self.process_notifications() + + def download_profile(self, qr: str, nickname: str) -> None: + """ + Download the profile from the eUICC. + """ + msgs = self._invoke('profile', 'download', qr) + self.validate_successful(msgs) + self.process_notifications() + + + def _invoke(self, *cmd: str): + print(f"invoking lpac {' '.join(list(cmd))}") + ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) + try: + out, err = ret.communicate(timeout=self.timeout_sec) + except subprocess.TimeoutExpired as e: + ret.kill() + raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e + + messages = [] + for line in out.decode().strip().splitlines(): + if line.startswith('{'): + message = json.loads(line) + + # lpac response format validations + assert 'type' in message, 'expected type in message' + assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type' + assert 'payload' in message, 'expected payload in message' + assert 'code' in message['payload'], 'expected code in message payload' + + if message['payload']['code'] != 0: + raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: <{message['payload']['message']}> {message['payload']['data']}") + + assert 'data' in message['payload'], 'expected data in message payload' + + messages.append(message) + + if len(messages) == 0: + raise LPAError(f"lpac {cmd} returned no messages") + + return messages def post(url, payload): @@ -100,16 +259,24 @@ class LPA: if __name__ == "__main__": import sys + lpa = LPA2() + print(lpa.list_profiles()) + + if len(sys.argv) > 2: + if sys.argv[1] == 'enable': + lpa.enable_profile(sys.argv[2]) + elif sys.argv[1] == 'disable': + lpa.disable_profile(sys.argv[2]) + elif sys.argv[1] == 'delete': + lpa.delete_profile(sys.argv[2]) + elif sys.argv[1] == 'download': + lpa.download(sys.argv[2]) + else: + raise Exception(f"invalid command: {sys.argv[1]}") + if "RESTART" in os.environ: subprocess.check_call("sudo systemctl stop ModemManager", shell=True) subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) while not os.path.exists('/dev/ttyUSB2'): time.sleep(1) - time.sleep(3) - - lpa = LPA() - print(lpa.list_profiles()) - if len(sys.argv) > 1: - lpa.download(sys.argv[1]) - print(lpa.list_profiles()) diff --git a/system/hardware/tici/lpa.py b/system/hardware/tici/lpa.py deleted file mode 100644 index bcdc1a6a3d..0000000000 --- a/system/hardware/tici/lpa.py +++ /dev/null @@ -1,161 +0,0 @@ -#!/usr/bin/env python3 -import json -import os -import subprocess -import time -from dataclasses import dataclass - -@dataclass -class Profile: - iccid: str - isdp_aid: str - nickname: str - enabled: bool - provider: str - -class LPAError(Exception): - pass - - -class LPA: - def __init__(self): - self.env = os.environ.copy() - self.env['LPAC_APDU'] = 'qmi' - self.env['QMI_DEVICE'] = '/dev/cdc-wdm0' - - self.timeout_sec = 30 - - def list_profiles(self): - """ - List all profiles on the eUICC. - """ - raw = self._invoke('profile', 'list')[-1] # only one message - - profiles = [] - for profile in raw['payload']['data']: - profiles.append(Profile( - iccid=profile['iccid'], - isdp_aid=profile['isdpAid'], - nickname=profile['profileNickname'], - enabled=profile['profileState'] == 'enabled', - provider=profile['serviceProviderName'], - )) - - return profiles - - def profile_exists(self, iccid: str) -> bool: - """ - Check if a profile exists on the eUICC. - """ - profiles = self.list_profiles() - return any(profile.iccid == iccid for profile in profiles) - - def get_active_profile(self): - """ - Get the active profile on the eUICC. - """ - profiles = self.list_profiles() - for profile in profiles: - if profile.enabled: - return profile - return None - - def process_notifications(self) -> None: - """ - Process notifications from the LPA, typically to activate/deactivate the profile with the carrier. - """ - msgs = self._invoke('notification', 'process', '-a', '-r') - assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' - - def enable_profile(self, iccid: str) -> None: - """ - Enable the profile on the eUICC. - """ - self.validate_profile(iccid) - latest = self.get_active_profile() - if latest is not None and latest.iccid == iccid: - raise LPAError(f'profile {iccid} is already enabled') - elif latest is not None: - self.disable_profile(latest.iccid) - - msgs = self._invoke('profile', 'enable', iccid) - assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' - self.process_notifications() - - def disable_profile(self, iccid: str) -> None: - """ - Disable the profile on the eUICC. - """ - self.validate_profile(iccid) - latest = self.get_active_profile() - if latest is None: - return - if latest.iccid != iccid: - raise LPAError(f'profile {iccid} is not enabled') - - msgs = self._invoke('profile', 'disable', iccid) - assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' - self.process_notifications() - - - def validate_profile(self, iccid: str) -> None: - """ - Validate the profile on the eUICC. - """ - if not self.profile_exists(iccid): - raise LPAError(f'profile {iccid} does not exist') - - - def _invoke(self, *cmd: str): - print(f"-> lpac {' '.join(list(cmd))}") - ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) - try: - out, err = ret.communicate(timeout=self.timeout_sec) - except subprocess.TimeoutExpired as e: - ret.kill() - raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e - - messages = [] - for line in out.decode().strip().splitlines(): - if line.startswith('{'): - message = json.loads(line) - - # lpac response format validations - assert 'type' in message, 'expected type in message' - assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type' - assert 'payload' in message, 'expected payload in message' - assert 'code' in message['payload'], 'expected code in message payload' - - if message['payload']['code'] != 0: - raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: <{message['payload']['message']}> {message['payload']['data']}") - - assert 'data' in message['payload'], 'expected data in message payload' - - messages.append(message) - - if len(messages) == 0: - raise LPAError(f"lpac {cmd} returned no messages") - - return messages - - -if __name__ == "__main__": - import sys - - lpa = LPA() - print(lpa.list_profiles()) - - if len(sys.argv) > 2: - if sys.argv[1] == 'enable': - lpa.enable_profile(sys.argv[2]) - elif sys.argv[1] == 'disable': - lpa.disable_profile(sys.argv[2]) - else: - raise Exception(f"invalid command: {sys.argv[1]}") - - if "RESTART" in os.environ: - subprocess.check_call("sudo systemctl stop ModemManager", shell=True) - subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) - subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) - while not os.path.exists('/dev/ttyUSB2'): - time.sleep(1)