|
|
|
@ -5,7 +5,14 @@ import os |
|
|
|
|
import shutil |
|
|
|
|
import subprocess |
|
|
|
|
import time |
|
|
|
|
from dataclasses import dataclass |
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
|
|
class Profile: |
|
|
|
|
iccid: str |
|
|
|
|
nickname: str |
|
|
|
|
enabled: bool |
|
|
|
|
provider: str |
|
|
|
|
|
|
|
|
|
class LPAError(RuntimeError): |
|
|
|
|
pass |
|
|
|
@ -25,33 +32,33 @@ class LPA: |
|
|
|
|
if shutil.which('lpac') is None: |
|
|
|
|
raise LPAError('lpac not found, must be installed!') |
|
|
|
|
|
|
|
|
|
def list_profiles(self) -> list[dict[str, str]]: |
|
|
|
|
def list_profiles(self) -> list[Profile]: |
|
|
|
|
msgs = self._invoke('profile', 'list') |
|
|
|
|
self._validate_successful(msgs) |
|
|
|
|
return [{ |
|
|
|
|
'iccid': p['iccid'], |
|
|
|
|
'nickname': p['profileNickname'], |
|
|
|
|
'enabled': p['profileState'] == 'enabled', |
|
|
|
|
'provider': p['serviceProviderName'] |
|
|
|
|
} for p in msgs[-1]['payload']['data']] |
|
|
|
|
return [Profile( |
|
|
|
|
iccid=p['iccid'], |
|
|
|
|
nickname=p['profileNickname'], |
|
|
|
|
enabled=p['profileState'] == 'enabled', |
|
|
|
|
provider=p['serviceProviderName'] |
|
|
|
|
) for p in msgs[-1]['payload']['data']] |
|
|
|
|
|
|
|
|
|
def get_active_profile(self) -> dict[str, str] | None: |
|
|
|
|
return next((p for p in self.list_profiles() if p['enabled']), None) |
|
|
|
|
def get_active_profile(self) -> Profile | None: |
|
|
|
|
return next((p for p in self.list_profiles() if p.enabled), None) |
|
|
|
|
|
|
|
|
|
def enable_profile(self, iccid: str) -> None: |
|
|
|
|
self._validate_profile_exists(iccid) |
|
|
|
|
latest = self.get_active_profile() |
|
|
|
|
if latest: |
|
|
|
|
if latest['iccid'] == iccid: |
|
|
|
|
if latest.iccid == iccid: |
|
|
|
|
raise LPAError(f'profile {iccid} is already enabled') |
|
|
|
|
self.disable_profile(latest['iccid']) |
|
|
|
|
self.disable_profile(latest.iccid) |
|
|
|
|
self._validate_successful(self._invoke('profile', 'enable', iccid)) |
|
|
|
|
self.process_notifications() |
|
|
|
|
|
|
|
|
|
def disable_profile(self, iccid: str) -> None: |
|
|
|
|
self._validate_profile_exists(iccid) |
|
|
|
|
latest = self.get_active_profile() |
|
|
|
|
if latest is not None and latest['iccid'] != iccid: |
|
|
|
|
if latest is not None and latest.iccid != iccid: |
|
|
|
|
return |
|
|
|
|
self._validate_successful(self._invoke('profile', 'disable', iccid)) |
|
|
|
|
self.process_notifications() |
|
|
|
@ -59,7 +66,7 @@ class LPA: |
|
|
|
|
def delete_profile(self, iccid: str) -> None: |
|
|
|
|
self._validate_profile_exists(iccid) |
|
|
|
|
latest = self.get_active_profile() |
|
|
|
|
if latest is not None and latest['iccid'] == iccid: |
|
|
|
|
if latest is not None and latest.iccid == iccid: |
|
|
|
|
self.disable_profile(iccid) |
|
|
|
|
self._validate_successful(self._invoke('profile', 'delete', iccid)) |
|
|
|
|
self.process_notifications() |
|
|
|
@ -115,7 +122,7 @@ class LPA: |
|
|
|
|
return messages |
|
|
|
|
|
|
|
|
|
def _validate_profile_exists(self, iccid: str) -> None: |
|
|
|
|
if not any(p['iccid'] == iccid for p in self.list_profiles()): |
|
|
|
|
if not any(p.iccid == iccid for p in self.list_profiles()): |
|
|
|
|
raise LPAProfileNotFoundError(f'profile {iccid} does not exist') |
|
|
|
|
|
|
|
|
|
def _validate_successful(self, msgs: list[dict]) -> None: |
|
|
|
|