|
|
@ -1,4 +1,5 @@ |
|
|
|
#!/usr/bin/env python3 |
|
|
|
#!/usr/bin/env python3 |
|
|
|
|
|
|
|
|
|
|
|
import json |
|
|
|
import json |
|
|
|
import os |
|
|
|
import os |
|
|
|
import subprocess |
|
|
|
import subprocess |
|
|
@ -22,57 +23,46 @@ class LPA: |
|
|
|
|
|
|
|
|
|
|
|
def list_profiles(self) -> list[dict[str, str]]: |
|
|
|
def list_profiles(self) -> list[dict[str, str]]: |
|
|
|
msgs = self._invoke('profile', 'list') |
|
|
|
msgs = self._invoke('profile', 'list') |
|
|
|
self.validate_successful(msgs) |
|
|
|
self._validate_successful(msgs) |
|
|
|
|
|
|
|
return [{ |
|
|
|
profiles = [] |
|
|
|
'iccid': p['iccid'], |
|
|
|
for profile in msgs[-1]['payload']['data']: |
|
|
|
'nickname': p['profileNickname'], |
|
|
|
profiles.append({ |
|
|
|
'enabled': p['profileState'] == 'enabled', |
|
|
|
'iccid': profile['iccid'], |
|
|
|
'provider': p['serviceProviderName'] |
|
|
|
'nickname': profile['profileNickname'], |
|
|
|
} for p in msgs[-1]['payload']['data']] |
|
|
|
'enabled': profile['profileState'] == 'enabled', |
|
|
|
|
|
|
|
'provider': profile['serviceProviderName'], |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return profiles |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_active_profile(self) -> dict[str, str] | None: |
|
|
|
def get_active_profile(self) -> dict[str, str] | None: |
|
|
|
return next((p for p in self.list_profiles() if p['enabled']), None) |
|
|
|
return next((p for p in self.list_profiles() if p['enabled']), None) |
|
|
|
|
|
|
|
|
|
|
|
def enable_profile(self, iccid: str) -> None: |
|
|
|
def enable_profile(self, iccid: str) -> None: |
|
|
|
self.validate_profile_exists(iccid) |
|
|
|
self._validate_profile_exists(iccid) |
|
|
|
latest = self.get_active_profile() |
|
|
|
latest = self.get_active_profile() |
|
|
|
if latest is None: |
|
|
|
if latest: |
|
|
|
raise LPAError('no profile enabled') |
|
|
|
if latest['iccid'] == iccid: |
|
|
|
if latest['iccid'] == iccid: |
|
|
|
raise LPAError(f'profile {iccid} is already enabled') |
|
|
|
raise LPAError(f'profile {iccid} is already enabled') |
|
|
|
|
|
|
|
else: |
|
|
|
|
|
|
|
self.disable_profile(latest['iccid']) |
|
|
|
self.disable_profile(latest['iccid']) |
|
|
|
|
|
|
|
self._validate_successful(self._invoke('profile', 'enable', iccid)) |
|
|
|
self.validate_successful(self._invoke('profile', 'enable', iccid)) |
|
|
|
|
|
|
|
self.process_notifications() |
|
|
|
self.process_notifications() |
|
|
|
|
|
|
|
|
|
|
|
def disable_profile(self, iccid: str) -> None: |
|
|
|
def disable_profile(self, iccid: str) -> None: |
|
|
|
self.validate_profile_exists(iccid) |
|
|
|
self._validate_profile_exists(iccid) |
|
|
|
latest = self.get_active_profile() |
|
|
|
latest = self.get_active_profile() |
|
|
|
if latest is None: |
|
|
|
if latest is not None and latest['iccid'] != iccid: |
|
|
|
raise LPAError('no profile enabled') |
|
|
|
return |
|
|
|
if latest['iccid'] != iccid: |
|
|
|
self._validate_successful(self._invoke('profile', 'disable', iccid)) |
|
|
|
raise LPAError(f'profile {iccid} is not enabled') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.validate_successful(self._invoke('profile', 'disable', iccid)) |
|
|
|
|
|
|
|
self.process_notifications() |
|
|
|
self.process_notifications() |
|
|
|
|
|
|
|
|
|
|
|
def delete_profile(self, iccid: str) -> None: |
|
|
|
def delete_profile(self, iccid: str) -> None: |
|
|
|
self.validate_profile_exists(iccid) |
|
|
|
self._validate_profile_exists(iccid) |
|
|
|
latest = self.get_active_profile() |
|
|
|
latest = self.get_active_profile() |
|
|
|
if latest is not None and latest['iccid'] == iccid: |
|
|
|
if latest is not None and latest['iccid'] == iccid: |
|
|
|
self.disable_profile(iccid) |
|
|
|
self.disable_profile(iccid) |
|
|
|
self.validate_successful(self._invoke('profile', 'delete', iccid)) |
|
|
|
self._validate_successful(self._invoke('profile', 'delete', iccid)) |
|
|
|
self.process_notifications() |
|
|
|
self.process_notifications() |
|
|
|
|
|
|
|
|
|
|
|
def download_profile(self, qr: str, nickname: str | None = None) -> None: |
|
|
|
def download_profile(self, qr: str, nickname: str | None = None) -> None: |
|
|
|
msgs = self._invoke('profile', 'download', '-a', qr) |
|
|
|
msgs = self._invoke('profile', 'download', '-a', qr) |
|
|
|
self.validate_successful(msgs) |
|
|
|
self._validate_successful(msgs) |
|
|
|
new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None) |
|
|
|
new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None) |
|
|
|
if new_profile is None: |
|
|
|
if new_profile is None: |
|
|
|
raise LPAError('no new profile found') |
|
|
|
raise LPAError('no new profile found') |
|
|
@ -81,21 +71,14 @@ class LPA: |
|
|
|
self.process_notifications() |
|
|
|
self.process_notifications() |
|
|
|
|
|
|
|
|
|
|
|
def nickname_profile(self, iccid: str, nickname: str) -> None: |
|
|
|
def nickname_profile(self, iccid: str, nickname: str) -> None: |
|
|
|
self.validate_profile_exists(iccid) |
|
|
|
self._validate_profile_exists(iccid) |
|
|
|
self.validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) |
|
|
|
self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) |
|
|
|
|
|
|
|
|
|
|
|
def process_notifications(self) -> None: |
|
|
|
def process_notifications(self) -> None: |
|
|
|
""" |
|
|
|
""" |
|
|
|
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier. |
|
|
|
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier. |
|
|
|
""" |
|
|
|
""" |
|
|
|
self.validate_successful(self._invoke('notification', 'process', '-a', '-r')) |
|
|
|
self._validate_successful(self._invoke('notification', 'process', '-a', '-r')) |
|
|
|
|
|
|
|
|
|
|
|
def validate_profile_exists(self, iccid: str) -> None: |
|
|
|
|
|
|
|
if not any(p['iccid'] == iccid for p in self.list_profiles()): |
|
|
|
|
|
|
|
raise LPAProfileNotFoundError(f'profile {iccid} does not exist') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_successful(self, msgs: list[dict]) -> None: |
|
|
|
|
|
|
|
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _invoke(self, *cmd: str): |
|
|
|
def _invoke(self, *cmd: str): |
|
|
|
ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) |
|
|
|
ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) |
|
|
@ -128,6 +111,13 @@ class LPA: |
|
|
|
|
|
|
|
|
|
|
|
return messages |
|
|
|
return messages |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _validate_profile_exists(self, iccid: str) -> None: |
|
|
|
|
|
|
|
if not any(p['iccid'] == iccid for p in self.list_profiles()): |
|
|
|
|
|
|
|
raise LPAProfileNotFoundError(f'profile {iccid} does not exist') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _validate_successful(self, msgs: list[dict]) -> None: |
|
|
|
|
|
|
|
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
if __name__ == "__main__": |
|
|
|
import sys |
|
|
|
import sys |
|
|
|