openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

162 lines
5.6 KiB

#!/usr/bin/env python3
1 month ago
1 month ago
import json
import os
import shutil
import subprocess
1 month ago
import time
from dataclasses import dataclass
1 month ago
@dataclass
class Profile:
iccid: str
nickname: str
enabled: bool
provider: str
1 month ago
class LPAError(RuntimeError):
pass
1 month ago
class LPAProfileNotFoundError(LPAError):
1 month ago
pass
1 month ago
class LPA:
1 month ago
def __init__(self):
self.env = os.environ.copy()
self.env['LPAC_APDU'] = 'qmi'
self.env['QMI_DEVICE'] = '/dev/cdc-wdm0'
self.timeout_sec = 45
1 month ago
if shutil.which('lpac') is None:
raise LPAError('lpac not found, must be installed!')
def list_profiles(self) -> list[Profile]:
msgs = self._invoke('profile', 'list')
1 month ago
self._validate_successful(msgs)
return [Profile(
iccid=p['iccid'],
nickname=p['profileNickname'],
enabled=p['profileState'] == 'enabled',
provider=p['serviceProviderName']
) for p in msgs[-1]['payload']['data']]
1 month ago
def get_active_profile(self) -> Profile | None:
return next((p for p in self.list_profiles() if p.enabled), None)
1 month ago
def enable_profile(self, iccid: str) -> None:
1 month ago
self._validate_profile_exists(iccid)
1 month ago
latest = self.get_active_profile()
1 month ago
if latest:
if latest.iccid == iccid:
1 month ago
raise LPAError(f'profile {iccid} is already enabled')
self.disable_profile(latest.iccid)
1 month ago
self._validate_successful(self._invoke('profile', 'enable', iccid))
1 month ago
self.process_notifications()
def disable_profile(self, iccid: str) -> None:
1 month ago
self._validate_profile_exists(iccid)
1 month ago
latest = self.get_active_profile()
if latest is not None and latest.iccid != iccid:
1 month ago
return
self._validate_successful(self._invoke('profile', 'disable', iccid))
1 month ago
self.process_notifications()
def delete_profile(self, iccid: str) -> None:
1 month ago
self._validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest is not None and latest.iccid == iccid:
self.disable_profile(iccid)
1 month ago
self._validate_successful(self._invoke('profile', 'delete', iccid))
1 month ago
self.process_notifications()
def download_profile(self, qr: str, nickname: str | None = None) -> None:
msgs = self._invoke('profile', 'download', '-a', qr)
1 month ago
self._validate_successful(msgs)
new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None)
if new_profile is None:
raise LPAError('no new profile found')
if nickname:
self.nickname_profile(new_profile['payload']['data']['iccid'], nickname)
1 month ago
self.process_notifications()
def nickname_profile(self, iccid: str, nickname: str) -> None:
1 month ago
self._validate_profile_exists(iccid)
self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
1 month ago
def process_notifications(self) -> None:
"""
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier.
"""
1 month ago
self._validate_successful(self._invoke('notification', 'process', '-a', '-r'))
1 month ago
def _invoke(self, *cmd: str):
1 month ago
proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
1 month ago
try:
1 month ago
out, err = proc.communicate(timeout=self.timeout_sec)
1 month ago
except subprocess.TimeoutExpired as e:
1 month ago
proc.kill()
1 month ago
raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e
messages = []
for line in out.decode().strip().splitlines():
if line.startswith('{'):
message = json.loads(line)
# lpac response format validations
assert 'type' in message, 'expected type in message'
assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type'
assert 'payload' in message, 'expected payload in message'
assert 'code' in message['payload'], 'expected code in message payload'
1 month ago
assert 'data' in message['payload'], 'expected data in message payload'
1 month ago
1 month ago
msg_ret_code = message['payload']['code']
if msg_ret_code != 0:
raise LPAError(f"lpac {' '.join(cmd)} failed with code {msg_ret_code}: <{message['payload']['message']}> {message['payload']['data']}")
1 month ago
messages.append(message)
if len(messages) == 0:
raise LPAError(f"lpac {cmd} returned no messages")
return messages
1 month ago
def _validate_profile_exists(self, iccid: str) -> None:
if not any(p.iccid == iccid for p in self.list_profiles()):
1 month ago
raise LPAProfileNotFoundError(f'profile {iccid} does not exist')
def _validate_successful(self, msgs: list[dict]) -> None:
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
if __name__ == "__main__":
import sys
1 month ago
lpa = LPA()
1 month ago
profiles = lpa.list_profiles()
1 month ago
print(f'{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:')
1 month ago
for p in profiles:
print(f'- {p.iccid} (nickname: {p.nickname or "no nickname"}) (provider: {p.provider}) - {"enabled" if p.enabled else "disabled"}')
print()
1 month ago
if len(sys.argv) > 2:
if sys.argv[1] == 'enable':
lpa.enable_profile(sys.argv[2])
elif sys.argv[1] == 'disable':
lpa.disable_profile(sys.argv[2])
elif sys.argv[1] == 'delete':
lpa.delete_profile(sys.argv[2])
elif sys.argv[1] == 'download':
assert len(sys.argv) == 4, 'expected profile nickname'
lpa.download_profile(sys.argv[2], sys.argv[3])
1 month ago
else:
raise Exception(f"invalid command: {sys.argv[1]}")
if "RESTART" in os.environ:
subprocess.check_call("sudo systemctl stop ModemManager", shell=True)
subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True)
subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True)
while not os.path.exists('/dev/ttyUSB2'):
time.sleep(1)