openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

146 lines
5.1 KiB

#!/usr/bin/env python3
4 weeks ago
4 weeks ago
import json
import os
import subprocess
4 weeks ago
import time
4 weeks ago
class LPAError(RuntimeError):
pass
4 weeks ago
class LPAProfileNotFoundError(LPAError):
4 weeks ago
pass
4 weeks ago
class LPA:
4 weeks ago
def __init__(self):
self.env = os.environ.copy()
self.env['LPAC_APDU'] = 'qmi'
self.env['QMI_DEVICE'] = '/dev/cdc-wdm0'
self.timeout_sec = 45
4 weeks ago
4 weeks ago
def list_profiles(self) -> list[dict[str, str]]:
msgs = self._invoke('profile', 'list')
4 weeks ago
self._validate_successful(msgs)
return [{
'iccid': p['iccid'],
'nickname': p['profileNickname'],
'enabled': p['profileState'] == 'enabled',
'provider': p['serviceProviderName']
} for p in msgs[-1]['payload']['data']]
4 weeks ago
4 weeks ago
def get_active_profile(self) -> dict[str, str] | None:
4 weeks ago
return next((p for p in self.list_profiles() if p['enabled']), None)
4 weeks ago
def enable_profile(self, iccid: str) -> None:
4 weeks ago
self._validate_profile_exists(iccid)
4 weeks ago
latest = self.get_active_profile()
4 weeks ago
if latest:
if latest['iccid'] == iccid:
raise LPAError(f'profile {iccid} is already enabled')
self.disable_profile(latest['iccid'])
4 weeks ago
self._validate_successful(self._invoke('profile', 'enable', iccid))
4 weeks ago
self.process_notifications()
def disable_profile(self, iccid: str) -> None:
4 weeks ago
self._validate_profile_exists(iccid)
4 weeks ago
latest = self.get_active_profile()
4 weeks ago
if latest is not None and latest['iccid'] != iccid:
return
self._validate_successful(self._invoke('profile', 'disable', iccid))
4 weeks ago
self.process_notifications()
def delete_profile(self, iccid: str) -> None:
4 weeks ago
self._validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest is not None and latest['iccid'] == iccid:
self.disable_profile(iccid)
4 weeks ago
self._validate_successful(self._invoke('profile', 'delete', iccid))
4 weeks ago
self.process_notifications()
def download_profile(self, qr: str, nickname: str | None = None) -> None:
msgs = self._invoke('profile', 'download', '-a', qr)
4 weeks ago
self._validate_successful(msgs)
new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None)
if new_profile is None:
raise LPAError('no new profile found')
if nickname:
self.nickname_profile(new_profile['payload']['data']['iccid'], nickname)
4 weeks ago
self.process_notifications()
def nickname_profile(self, iccid: str, nickname: str) -> None:
4 weeks ago
self._validate_profile_exists(iccid)
self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
4 weeks ago
def process_notifications(self) -> None:
"""
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier.
"""
4 weeks ago
self._validate_successful(self._invoke('notification', 'process', '-a', '-r'))
4 weeks ago
def _invoke(self, *cmd: str):
4 weeks ago
proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
4 weeks ago
try:
4 weeks ago
out, err = proc.communicate(timeout=self.timeout_sec)
4 weeks ago
except subprocess.TimeoutExpired as e:
4 weeks ago
proc.kill()
4 weeks ago
raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e
messages = []
for line in out.decode().strip().splitlines():
if line.startswith('{'):
message = json.loads(line)
# lpac response format validations
assert 'type' in message, 'expected type in message'
assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type'
assert 'payload' in message, 'expected payload in message'
assert 'code' in message['payload'], 'expected code in message payload'
4 weeks ago
assert 'data' in message['payload'], 'expected data in message payload'
4 weeks ago
if message['payload']['code'] != 0:
raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: <{message['payload']['message']}> {message['payload']['data']}")
messages.append(message)
if len(messages) == 0:
raise LPAError(f"lpac {cmd} returned no messages")
return messages
4 weeks ago
def _validate_profile_exists(self, iccid: str) -> None:
if not any(p['iccid'] == iccid for p in self.list_profiles()):
raise LPAProfileNotFoundError(f'profile {iccid} does not exist')
def _validate_successful(self, msgs: list[dict]) -> None:
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
if __name__ == "__main__":
import sys
4 weeks ago
lpa = LPA()
4 weeks ago
print(lpa.list_profiles())
if len(sys.argv) > 2:
if sys.argv[1] == 'enable':
lpa.enable_profile(sys.argv[2])
elif sys.argv[1] == 'disable':
lpa.disable_profile(sys.argv[2])
elif sys.argv[1] == 'delete':
lpa.delete_profile(sys.argv[2])
elif sys.argv[1] == 'download':
assert len(sys.argv) == 4, 'expected profile nickname'
lpa.download_profile(sys.argv[2], sys.argv[3])
4 weeks ago
else:
raise Exception(f"invalid command: {sys.argv[1]}")
if "RESTART" in os.environ:
subprocess.check_call("sudo systemctl stop ModemManager", shell=True)
subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True)
subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True)
while not os.path.exists('/dev/ttyUSB2'):
time.sleep(1)