You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
170 lines
6.4 KiB
170 lines
6.4 KiB
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from typing import Literal
|
|
|
|
@dataclass
|
|
class Profile:
|
|
iccid: str
|
|
nickname: str
|
|
enabled: bool
|
|
provider: str
|
|
|
|
class LPAError(RuntimeError):
|
|
pass
|
|
|
|
class LPAProfileNotFoundError(LPAError):
|
|
pass
|
|
|
|
|
|
class LPA:
|
|
def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'):
|
|
self.env = os.environ.copy()
|
|
self.env['LPAC_APDU'] = interface
|
|
self.env['QMI_DEVICE'] = '/dev/cdc-wdm0'
|
|
self.env['AT_DEVICE'] = '/dev/ttyUSB2'
|
|
|
|
self.timeout_sec = 45
|
|
|
|
if shutil.which('lpac') is None:
|
|
raise LPAError('lpac not found, must be installed!')
|
|
|
|
def list_profiles(self) -> list[Profile]:
|
|
msgs = self._invoke('profile', 'list')
|
|
self._validate_successful(msgs)
|
|
return [Profile(
|
|
iccid=p['iccid'],
|
|
nickname=p['profileNickname'],
|
|
enabled=p['profileState'] == 'enabled',
|
|
provider=p['serviceProviderName']
|
|
) for p in msgs[-1]['payload']['data']]
|
|
|
|
def get_active_profile(self) -> Profile | None:
|
|
return next((p for p in self.list_profiles() if p.enabled), None)
|
|
|
|
def enable_profile(self, iccid: str) -> None:
|
|
self._validate_profile_exists(iccid)
|
|
latest = self.get_active_profile()
|
|
if latest:
|
|
if latest.iccid == iccid:
|
|
return
|
|
self.disable_profile(latest.iccid)
|
|
self._validate_successful(self._invoke('profile', 'enable', iccid))
|
|
self.process_notifications()
|
|
|
|
def disable_profile(self, iccid: str) -> None:
|
|
self._validate_profile_exists(iccid)
|
|
latest = self.get_active_profile()
|
|
if latest is not None and latest.iccid != iccid:
|
|
return
|
|
self._validate_successful(self._invoke('profile', 'disable', iccid))
|
|
self.process_notifications()
|
|
|
|
def delete_profile(self, iccid: str) -> None:
|
|
self._validate_profile_exists(iccid)
|
|
latest = self.get_active_profile()
|
|
if latest is not None and latest.iccid == iccid:
|
|
self.disable_profile(iccid)
|
|
self._validate_successful(self._invoke('profile', 'delete', iccid))
|
|
self.process_notifications()
|
|
|
|
def download_profile(self, qr: str, nickname: str | None = None) -> None:
|
|
msgs = self._invoke('profile', 'download', '-a', qr)
|
|
self._validate_successful(msgs)
|
|
new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None)
|
|
if new_profile is None:
|
|
raise LPAError('no new profile found')
|
|
if nickname:
|
|
self.nickname_profile(new_profile['payload']['data']['iccid'], nickname)
|
|
self.process_notifications()
|
|
|
|
def nickname_profile(self, iccid: str, nickname: str) -> None:
|
|
self._validate_profile_exists(iccid)
|
|
self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
|
|
|
|
def process_notifications(self) -> None:
|
|
"""
|
|
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier.
|
|
"""
|
|
self._validate_successful(self._invoke('notification', 'process', '-a', '-r'))
|
|
|
|
def _invoke(self, *cmd: str):
|
|
proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
|
|
try:
|
|
out, err = proc.communicate(timeout=self.timeout_sec)
|
|
except subprocess.TimeoutExpired as e:
|
|
proc.kill()
|
|
raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e
|
|
|
|
messages = []
|
|
for line in out.decode().strip().splitlines():
|
|
if line.startswith('{'):
|
|
message = json.loads(line)
|
|
|
|
# lpac response format validations
|
|
assert 'type' in message, 'expected type in message'
|
|
assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type'
|
|
assert 'payload' in message, 'expected payload in message'
|
|
assert 'code' in message['payload'], 'expected code in message payload'
|
|
assert 'data' in message['payload'], 'expected data in message payload'
|
|
|
|
msg_ret_code = message['payload']['code']
|
|
if msg_ret_code != 0:
|
|
raise LPAError(f"lpac {' '.join(cmd)} failed with code {msg_ret_code}: <{message['payload']['message']}> {message['payload']['data']}")
|
|
|
|
messages.append(message)
|
|
|
|
if len(messages) == 0:
|
|
raise LPAError(f"lpac {cmd} returned no messages")
|
|
|
|
return messages
|
|
|
|
def _validate_profile_exists(self, iccid: str) -> None:
|
|
if not any(p.iccid == iccid for p in self.list_profiles()):
|
|
raise LPAProfileNotFoundError(f'profile {iccid} does not exist')
|
|
|
|
def _validate_successful(self, msgs: list[dict]) -> None:
|
|
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(prog='esim.py', description='manage eSIM profiles on your comma device', epilog='comma.ai')
|
|
parser.add_argument('--backend', choices=['qmi', 'at'], default='qmi', help='use the specified backend, defaults to qmi')
|
|
parser.add_argument('--enable', metavar='iccid', help='enable profile; will disable current profile')
|
|
parser.add_argument('--disable', metavar='iccid', help='disable profile')
|
|
parser.add_argument('--delete', metavar='iccid', help='delete profile (warning: this cannot be undone)')
|
|
parser.add_argument('--download', nargs=2, metavar=('qr', 'name'), help='download a profile using QR code (format: LPA:1$rsp.truphone.com$QRF-SPEEDTEST)')
|
|
parser.add_argument('--nickname', nargs=2, metavar=('iccid', 'name'), help='update the nickname for a profile')
|
|
args = parser.parse_args()
|
|
|
|
lpa = LPA(interface=args.backend)
|
|
if args.enable:
|
|
lpa.enable_profile(args.enable)
|
|
print('enabled profile, please restart device to apply changes')
|
|
elif args.disable:
|
|
lpa.disable_profile(args.disable)
|
|
print('disabled profile, please restart device to apply changes')
|
|
elif args.delete:
|
|
confirm = input('are you sure you want to delete this profile? (y/N) ')
|
|
if confirm == 'y':
|
|
lpa.delete_profile(args.delete)
|
|
print('deleted profile, please restart device to apply changes')
|
|
else:
|
|
print('cancelled')
|
|
exit(0)
|
|
elif args.download:
|
|
lpa.download_profile(args.download[0], args.download[1])
|
|
elif args.nickname:
|
|
lpa.nickname_profile(args.nickname[0], args.nickname[1])
|
|
else:
|
|
parser.print_help()
|
|
|
|
profiles = lpa.list_profiles()
|
|
print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:')
|
|
for p in profiles:
|
|
print(f'- {p.iccid} (nickname: {p.nickname or "<none provided>"}) (provider: {p.provider}) - {"enabled" if p.enabled else "disabled"}')
|
|
|