openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

170 lines
6.4 KiB

#!/usr/bin/env python3
import argparse
import json
import os
import shutil
import subprocess
from dataclasses import dataclass
from typing import Literal
@dataclass
class Profile:
iccid: str
nickname: str
enabled: bool
provider: str
class LPAError(RuntimeError):
pass
class LPAProfileNotFoundError(LPAError):
pass
class LPA:
def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'):
self.env = os.environ.copy()
self.env['LPAC_APDU'] = interface
self.env['QMI_DEVICE'] = '/dev/cdc-wdm0'
self.env['AT_DEVICE'] = '/dev/ttyUSB2'
self.timeout_sec = 45
if shutil.which('lpac') is None:
raise LPAError('lpac not found, must be installed!')
def list_profiles(self) -> list[Profile]:
msgs = self._invoke('profile', 'list')
self._validate_successful(msgs)
return [Profile(
iccid=p['iccid'],
nickname=p['profileNickname'],
enabled=p['profileState'] == 'enabled',
provider=p['serviceProviderName']
) for p in msgs[-1]['payload']['data']]
def get_active_profile(self) -> Profile | None:
return next((p for p in self.list_profiles() if p.enabled), None)
def enable_profile(self, iccid: str) -> None:
self._validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest:
if latest.iccid == iccid:
return
self.disable_profile(latest.iccid)
self._validate_successful(self._invoke('profile', 'enable', iccid))
self.process_notifications()
def disable_profile(self, iccid: str) -> None:
self._validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest is not None and latest.iccid != iccid:
return
self._validate_successful(self._invoke('profile', 'disable', iccid))
self.process_notifications()
def delete_profile(self, iccid: str) -> None:
self._validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest is not None and latest.iccid == iccid:
self.disable_profile(iccid)
self._validate_successful(self._invoke('profile', 'delete', iccid))
self.process_notifications()
def download_profile(self, qr: str, nickname: str | None = None) -> None:
msgs = self._invoke('profile', 'download', '-a', qr)
self._validate_successful(msgs)
new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None)
if new_profile is None:
raise LPAError('no new profile found')
if nickname:
self.nickname_profile(new_profile['payload']['data']['iccid'], nickname)
self.process_notifications()
def nickname_profile(self, iccid: str, nickname: str) -> None:
self._validate_profile_exists(iccid)
self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
def process_notifications(self) -> None:
"""
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier.
"""
self._validate_successful(self._invoke('notification', 'process', '-a', '-r'))
def _invoke(self, *cmd: str):
proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
try:
out, err = proc.communicate(timeout=self.timeout_sec)
except subprocess.TimeoutExpired as e:
proc.kill()
raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e
messages = []
for line in out.decode().strip().splitlines():
if line.startswith('{'):
message = json.loads(line)
# lpac response format validations
assert 'type' in message, 'expected type in message'
assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type'
assert 'payload' in message, 'expected payload in message'
assert 'code' in message['payload'], 'expected code in message payload'
assert 'data' in message['payload'], 'expected data in message payload'
msg_ret_code = message['payload']['code']
if msg_ret_code != 0:
raise LPAError(f"lpac {' '.join(cmd)} failed with code {msg_ret_code}: <{message['payload']['message']}> {message['payload']['data']}")
messages.append(message)
if len(messages) == 0:
raise LPAError(f"lpac {cmd} returned no messages")
return messages
def _validate_profile_exists(self, iccid: str) -> None:
if not any(p.iccid == iccid for p in self.list_profiles()):
raise LPAProfileNotFoundError(f'profile {iccid} does not exist')
def _validate_successful(self, msgs: list[dict]) -> None:
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog='esim.py', description='manage eSIM profiles on your comma device', epilog='comma.ai')
parser.add_argument('--backend', choices=['qmi', 'at'], default='qmi', help='use the specified backend, defaults to qmi')
parser.add_argument('--enable', metavar='iccid', help='enable profile; will disable current profile')
parser.add_argument('--disable', metavar='iccid', help='disable profile')
parser.add_argument('--delete', metavar='iccid', help='delete profile (warning: this cannot be undone)')
parser.add_argument('--download', nargs=2, metavar=['qr', 'name'], help='download a profile using QR code (format: LPA:1$rsp.truphone.com$QRF-SPEEDTEST)')
parser.add_argument('--nickname', nargs=2, metavar=['iccid', 'name'], help='update the nickname for a profile')
args = parser.parse_args()
lpa = LPA(interface=args.backend)
if args.enable:
lpa.enable_profile(args.enable)
print('enabled profile, please restart device to apply changes')
elif args.disable:
lpa.disable_profile(args.disable)
print('disabled profile, please restart device to apply changes')
elif args.delete:
confirm = input('are you sure you want to delete this profile? (y/N) ')
if confirm == 'y':
lpa.delete_profile(args.delete)
print('deleted profile, please restart device to apply changes')
else:
print('cancelled')
exit(0)
elif args.download:
lpa.download_profile(args.download[0], args.download[1])
elif args.nickname:
lpa.nickname_profile(args.nickname[0], args.nickname[1])
else:
parser.print_help()
profiles = lpa.list_profiles()
print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:')
for p in profiles:
print(f'- {p.iccid} (nickname: {p.nickname or "<none provided>"}) (provider: {p.provider}) - {"enabled" if p.enabled else "disabled"}')