openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

157 lines
5.4 KiB

#!/usr/bin/env python3
1 month ago
import json
import os
import subprocess
1 month ago
import time
1 month ago
class LPAError(RuntimeError):
pass
1 month ago
class LPAProfileNotFoundError(LPAError):
1 month ago
pass
1 month ago
class LPA:
1 month ago
def __init__(self):
self.env = os.environ.copy()
self.env['LPAC_APDU'] = 'qmi'
self.env['QMI_DEVICE'] = '/dev/cdc-wdm0'
self.timeout_sec = 45
1 month ago
1 month ago
def list_profiles(self) -> list[dict[str, str]]:
msgs = self._invoke('profile', 'list')
self.validate_successful(msgs)
1 month ago
profiles = []
for profile in msgs[-1]['payload']['data']:
profiles.append({
'iccid': profile['iccid'],
'nickname': profile['profileNickname'],
'enabled': profile['profileState'] == 'enabled',
'provider': profile['serviceProviderName'],
})
1 month ago
return profiles
1 month ago
def get_active_profile(self) -> dict[str, str] | None:
1 month ago
return next((p for p in self.list_profiles() if p['enabled']), None)
1 month ago
def enable_profile(self, iccid: str) -> None:
self.validate_profile_exists(iccid)
1 month ago
latest = self.get_active_profile()
if latest is None:
raise LPAError('no profile enabled')
if latest['iccid'] == iccid:
1 month ago
raise LPAError(f'profile {iccid} is already enabled')
else:
self.disable_profile(latest['iccid'])
1 month ago
1 month ago
self.validate_successful(self._invoke('profile', 'enable', iccid))
1 month ago
self.process_notifications()
def disable_profile(self, iccid: str) -> None:
self.validate_profile_exists(iccid)
1 month ago
latest = self.get_active_profile()
if latest is None:
raise LPAError('no profile enabled')
if latest['iccid'] != iccid:
1 month ago
raise LPAError(f'profile {iccid} is not enabled')
1 month ago
self.validate_successful(self._invoke('profile', 'disable', iccid))
1 month ago
self.process_notifications()
def delete_profile(self, iccid: str) -> None:
self.validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest is not None and latest['iccid'] == iccid:
self.disable_profile(iccid)
1 month ago
self.validate_successful(self._invoke('profile', 'delete', iccid))
1 month ago
self.process_notifications()
def download_profile(self, qr: str, nickname: str | None = None) -> None:
msgs = self._invoke('profile', 'download', '-a', qr)
1 month ago
self.validate_successful(msgs)
new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None)
if new_profile is None:
raise LPAError('no new profile found')
if nickname:
self.nickname_profile(new_profile['payload']['data']['iccid'], nickname)
1 month ago
self.process_notifications()
def nickname_profile(self, iccid: str, nickname: str) -> None:
self.validate_profile_exists(iccid)
self.validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
1 month ago
def process_notifications(self) -> None:
"""
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier.
"""
self.validate_successful(self._invoke('notification', 'process', '-a', '-r'))
1 month ago
def validate_profile_exists(self, iccid: str) -> None:
if not any(p['iccid'] == iccid for p in self.list_profiles()):
raise LPAProfileNotFoundError(f'profile {iccid} does not exist')
def validate_successful(self, msgs: list[dict]) -> None:
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
1 month ago
def _invoke(self, *cmd: str):
ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
try:
out, err = ret.communicate(timeout=self.timeout_sec)
except subprocess.TimeoutExpired as e:
ret.kill()
raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e
messages = []
for line in out.decode().strip().splitlines():
if line.startswith('{'):
message = json.loads(line)
# lpac response format validations
assert 'type' in message, 'expected type in message'
assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type'
assert 'payload' in message, 'expected payload in message'
assert 'code' in message['payload'], 'expected code in message payload'
if message['payload']['code'] != 0:
raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: <{message['payload']['message']}> {message['payload']['data']}")
assert 'data' in message['payload'], 'expected data in message payload'
messages.append(message)
if len(messages) == 0:
raise LPAError(f"lpac {cmd} returned no messages")
return messages
if __name__ == "__main__":
import sys
1 month ago
lpa = LPA()
1 month ago
print(lpa.list_profiles())
if len(sys.argv) > 2:
if sys.argv[1] == 'enable':
lpa.enable_profile(sys.argv[2])
elif sys.argv[1] == 'disable':
lpa.disable_profile(sys.argv[2])
elif sys.argv[1] == 'delete':
lpa.delete_profile(sys.argv[2])
elif sys.argv[1] == 'download':
assert len(sys.argv) == 4, 'expected profile nickname'
lpa.download_profile(sys.argv[2], sys.argv[3])
1 month ago
else:
raise Exception(f"invalid command: {sys.argv[1]}")
if "RESTART" in os.environ:
subprocess.check_call("sudo systemctl stop ModemManager", shell=True)
subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True)
subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True)
while not os.path.exists('/dev/ttyUSB2'):
time.sleep(1)