openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

170 lines
6.2 KiB

#!/usr/bin/env python3
2 months ago
2 months ago
import argparse
2 months ago
import json
import os
import shutil
import subprocess
2 months ago
from dataclasses import dataclass
2 months ago
from typing import Literal
2 months ago
2 months ago
@dataclass
class Profile:
iccid: str
nickname: str
enabled: bool
provider: str
2 months ago
class LPAError(RuntimeError):
pass
2 months ago
class LPAProfileNotFoundError(LPAError):
2 months ago
pass
2 months ago
class LPA:
2 months ago
def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'):
2 months ago
self.env = os.environ.copy()
2 months ago
self.env['LPAC_APDU'] = interface
2 months ago
self.env['QMI_DEVICE'] = '/dev/cdc-wdm0'
2 months ago
self.env['AT_DEVICE'] = '/dev/ttyUSB2'
2 months ago
self.timeout_sec = 45
2 months ago
if shutil.which('lpac') is None:
raise LPAError('lpac not found, must be installed!')
2 months ago
def list_profiles(self) -> list[Profile]:
msgs = self._invoke('profile', 'list')
2 months ago
self._validate_successful(msgs)
2 months ago
return [Profile(
iccid=p['iccid'],
nickname=p['profileNickname'],
enabled=p['profileState'] == 'enabled',
provider=p['serviceProviderName']
) for p in msgs[-1]['payload']['data']]
2 months ago
2 months ago
def get_active_profile(self) -> Profile | None:
return next((p for p in self.list_profiles() if p.enabled), None)
2 months ago
def enable_profile(self, iccid: str) -> None:
2 months ago
self._validate_profile_exists(iccid)
2 months ago
latest = self.get_active_profile()
2 months ago
if latest:
2 months ago
if latest.iccid == iccid:
return
2 months ago
self.disable_profile(latest.iccid)
2 months ago
self._validate_successful(self._invoke('profile', 'enable', iccid))
2 months ago
self.process_notifications()
def disable_profile(self, iccid: str) -> None:
2 months ago
self._validate_profile_exists(iccid)
2 months ago
latest = self.get_active_profile()
2 months ago
if latest is not None and latest.iccid != iccid:
2 months ago
return
self._validate_successful(self._invoke('profile', 'disable', iccid))
2 months ago
self.process_notifications()
def delete_profile(self, iccid: str) -> None:
2 months ago
self._validate_profile_exists(iccid)
latest = self.get_active_profile()
2 months ago
if latest is not None and latest.iccid == iccid:
self.disable_profile(iccid)
2 months ago
self._validate_successful(self._invoke('profile', 'delete', iccid))
2 months ago
self.process_notifications()
def download_profile(self, qr: str, nickname: str | None = None) -> None:
msgs = self._invoke('profile', 'download', '-a', qr)
2 months ago
self._validate_successful(msgs)
new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None)
if new_profile is None:
raise LPAError('no new profile found')
if nickname:
self.nickname_profile(new_profile['payload']['data']['iccid'], nickname)
2 months ago
self.process_notifications()
def nickname_profile(self, iccid: str, nickname: str) -> None:
2 months ago
self._validate_profile_exists(iccid)
self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
2 months ago
def process_notifications(self) -> None:
"""
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier.
"""
2 months ago
self._validate_successful(self._invoke('notification', 'process', '-a', '-r'))
2 months ago
def _invoke(self, *cmd: str):
2 months ago
proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
2 months ago
try:
2 months ago
out, err = proc.communicate(timeout=self.timeout_sec)
2 months ago
except subprocess.TimeoutExpired as e:
2 months ago
proc.kill()
2 months ago
raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e
messages = []
for line in out.decode().strip().splitlines():
if line.startswith('{'):
message = json.loads(line)
# lpac response format validations
assert 'type' in message, 'expected type in message'
assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type'
assert 'payload' in message, 'expected payload in message'
assert 'code' in message['payload'], 'expected code in message payload'
2 months ago
assert 'data' in message['payload'], 'expected data in message payload'
2 months ago
2 months ago
msg_ret_code = message['payload']['code']
if msg_ret_code != 0:
raise LPAError(f"lpac {' '.join(cmd)} failed with code {msg_ret_code}: <{message['payload']['message']}> {message['payload']['data']}")
2 months ago
messages.append(message)
if len(messages) == 0:
raise LPAError(f"lpac {cmd} returned no messages")
return messages
2 months ago
def _validate_profile_exists(self, iccid: str) -> None:
2 months ago
if not any(p.iccid == iccid for p in self.list_profiles()):
2 months ago
raise LPAProfileNotFoundError(f'profile {iccid} does not exist')
def _validate_successful(self, msgs: list[dict]) -> None:
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
if __name__ == "__main__":
2 months ago
parser = argparse.ArgumentParser(prog='esim.py', description='manage eSIM profiles on your comma device', epilog='comma.ai')
2 months ago
parser.add_argument('--enable', metavar='iccid', help='enable profile; will disable current profile')
parser.add_argument('--disable', metavar='iccid', help='disable profile')
parser.add_argument('--delete', metavar='iccid', help='delete profile (warning: this cannot be undone)')
2 months ago
parser.add_argument('--download', nargs=2, metavar=('qr', 'name'), help='download a profile using QR code')
parser.add_argument('--nickname', nargs=2, metavar=('iccid', 'name'), help='nickname for the downloaded profile')
2 months ago
args = parser.parse_args()
2 months ago
lpa = LPA()
2 months ago
if args.enable:
lpa.enable_profile(args.enable)
print('enabled profile, please restart device to apply changes')
2 months ago
elif args.disable:
lpa.disable_profile(args.disable)
print('disabled profile, please restart device to apply changes')
2 months ago
elif args.delete:
2 months ago
confirm = input('are you sure you want to delete this profile? (y/N) ')
if confirm == 'y':
lpa.delete_profile(args.delete)
print('deleted profile, please restart device to apply changes')
else:
print('cancelled')
exit(0)
2 months ago
elif args.download:
2 months ago
lpa.download_profile(args.download[0], args.download[1])
elif args.nickname:
lpa.nickname_profile(args.nickname[0], args.nickname[1])
2 months ago
else:
2 months ago
parser.print_help()
2 months ago
profiles = lpa.list_profiles()
2 months ago
print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:')
for p in profiles:
2 months ago
print(f'- {p.iccid} (nickname: {p.nickname or "<none provided>"}) (provider: {p.provider}) - {"enabled" if p.enabled else "disabled"}')