refactor: LPA interface, CLI location (#35328)

* refactor: LPABase, simpler switch() interface

* leave this since LPABase hides

* hw-agnostic esim.py

* newline

* use latest
pull/35345/head
Trey Moen 3 months ago committed by GitHub
parent 840ced5005
commit 993b1b4d88
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 42
      system/hardware/base.py
  2. 38
      system/hardware/esim.py
  3. 6
      system/hardware/pc/hardware.py
  4. 105
      system/hardware/tici/esim.py
  5. 6
      system/hardware/tici/hardware.py
  6. 8
      system/hardware/tici/tests/test_esim.py
  7. 2
      tools/op.sh

@ -6,6 +6,19 @@ from cereal import log
NetworkType = log.DeviceState.NetworkType NetworkType = log.DeviceState.NetworkType
class LPAError(RuntimeError):
pass
class LPAProfileNotFoundError(LPAError):
pass
@dataclass
class Profile:
iccid: str
nickname: str
enabled: bool
provider: str
@dataclass @dataclass
class ThermalZone: class ThermalZone:
# a zone from /sys/class/thermal/thermal_zone* # a zone from /sys/class/thermal/thermal_zone*
@ -51,6 +64,31 @@ class ThermalConfig:
ret[f.name + "TempC"] = v.read() ret[f.name + "TempC"] = v.read()
return ret return ret
class LPABase(ABC):
@abstractmethod
def list_profiles(self) -> list[Profile]:
pass
@abstractmethod
def get_active_profile(self) -> Profile | None:
pass
@abstractmethod
def delete_profile(self, iccid: str) -> None:
pass
@abstractmethod
def download_profile(self, qr: str, nickname: str | None = None) -> None:
pass
@abstractmethod
def nickname_profile(self, iccid: str, nickname: str) -> None:
pass
@abstractmethod
def switch_profile(self, iccid: str) -> None:
pass
class HardwareBase(ABC): class HardwareBase(ABC):
@staticmethod @staticmethod
def get_cmdline() -> dict[str, str]: def get_cmdline() -> dict[str, str]:
@ -105,6 +143,10 @@ class HardwareBase(ABC):
def get_sim_info(self): def get_sim_info(self):
pass pass
@abstractmethod
def get_sim_lpa(self) -> LPABase:
pass
@abstractmethod @abstractmethod
def get_network_strength(self, network_type): def get_network_strength(self, network_type):
pass pass

@ -0,0 +1,38 @@
#!/usr/bin/env python3
import argparse
from openpilot.system.hardware import HARDWARE
if __name__ == '__main__':
parser = argparse.ArgumentParser(prog='esim.py', description='manage eSIM profiles on your comma device', epilog='comma.ai')
parser.add_argument('--backend', choices=['qmi', 'at'], default='qmi', help='use the specified backend, defaults to qmi')
parser.add_argument('--switch', metavar='iccid', help='switch to profile')
parser.add_argument('--delete', metavar='iccid', help='delete profile (warning: this cannot be undone)')
parser.add_argument('--download', nargs=2, metavar=('qr', 'name'), help='download a profile using QR code (format: LPA:1$rsp.truphone.com$QRF-SPEEDTEST)')
parser.add_argument('--nickname', nargs=2, metavar=('iccid', 'name'), help='update the nickname for a profile')
args = parser.parse_args()
lpa = HARDWARE.get_sim_lpa()
if args.switch:
lpa.switch_profile(args.switch)
elif args.delete:
confirm = input('are you sure you want to delete this profile? (y/N) ')
if confirm == 'y':
lpa.delete_profile(args.delete)
print('deleted profile, please restart device to apply changes')
else:
print('cancelled')
exit(0)
elif args.download:
lpa.download_profile(args.download[0], args.download[1])
elif args.nickname:
lpa.nickname_profile(args.nickname[0], args.nickname[1])
else:
parser.print_help()
profiles = lpa.list_profiles()
print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:')
for p in profiles:
print(f'- {p.iccid} (nickname: {p.nickname or "<none provided>"}) (provider: {p.provider}) - {"enabled" if p.enabled else "disabled"}')

@ -1,12 +1,11 @@
import random import random
from cereal import log from cereal import log
from openpilot.system.hardware.base import HardwareBase from openpilot.system.hardware.base import HardwareBase, LPABase
NetworkType = log.DeviceState.NetworkType NetworkType = log.DeviceState.NetworkType
NetworkStrength = log.DeviceState.NetworkStrength NetworkStrength = log.DeviceState.NetworkStrength
class Pc(HardwareBase): class Pc(HardwareBase):
def get_os_version(self): def get_os_version(self):
return None return None
@ -41,6 +40,9 @@ class Pc(HardwareBase):
'data_connected': False 'data_connected': False
} }
def get_sim_lpa(self) -> LPABase:
raise NotImplementedError("SIM LPA not implemented for PC")
def get_network_strength(self, network_type): def get_network_strength(self, network_type):
return NetworkStrength.unknown return NetworkStrength.unknown

@ -1,28 +1,12 @@
#!/usr/bin/env python3
import argparse
import json import json
import os import os
import shutil import shutil
import subprocess import subprocess
from dataclasses import dataclass
from typing import Literal from typing import Literal
@dataclass from openpilot.system.hardware.base import LPABase, LPAError, LPAProfileNotFoundError, Profile
class Profile:
iccid: str
nickname: str
enabled: bool
provider: str
class LPAError(RuntimeError):
pass
class LPAProfileNotFoundError(LPAError):
pass
class LPA: class TiciLPA(LPABase):
def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'): def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'):
self.env = os.environ.copy() self.env = os.environ.copy()
self.env['LPAC_APDU'] = interface self.env['LPAC_APDU'] = interface
@ -47,31 +31,13 @@ class LPA:
def get_active_profile(self) -> Profile | None: def get_active_profile(self) -> Profile | None:
return next((p for p in self.list_profiles() if p.enabled), None) return next((p for p in self.list_profiles() if p.enabled), None)
def enable_profile(self, iccid: str) -> None:
self._validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest:
if latest.iccid == iccid:
return
self.disable_profile(latest.iccid)
self._validate_successful(self._invoke('profile', 'enable', iccid))
self.process_notifications()
def disable_profile(self, iccid: str) -> None:
self._validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest is not None and latest.iccid != iccid:
return
self._validate_successful(self._invoke('profile', 'disable', iccid))
self.process_notifications()
def delete_profile(self, iccid: str) -> None: def delete_profile(self, iccid: str) -> None:
self._validate_profile_exists(iccid) self._validate_profile_exists(iccid)
latest = self.get_active_profile() latest = self.get_active_profile()
if latest is not None and latest.iccid == iccid: if latest is not None and latest.iccid == iccid:
self.disable_profile(iccid) raise LPAError('cannot delete active profile, switch to another profile first')
self._validate_successful(self._invoke('profile', 'delete', iccid)) self._validate_successful(self._invoke('profile', 'delete', iccid))
self.process_notifications() self._process_notifications()
def download_profile(self, qr: str, nickname: str | None = None) -> None: def download_profile(self, qr: str, nickname: str | None = None) -> None:
msgs = self._invoke('profile', 'download', '-a', qr) msgs = self._invoke('profile', 'download', '-a', qr)
@ -81,17 +47,24 @@ class LPA:
raise LPAError('no new profile found') raise LPAError('no new profile found')
if nickname: if nickname:
self.nickname_profile(new_profile['payload']['data']['iccid'], nickname) self.nickname_profile(new_profile['payload']['data']['iccid'], nickname)
self.process_notifications() self._process_notifications()
def nickname_profile(self, iccid: str, nickname: str) -> None: def nickname_profile(self, iccid: str, nickname: str) -> None:
self._validate_profile_exists(iccid) self._validate_profile_exists(iccid)
self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
def process_notifications(self) -> None: def switch_profile(self, iccid: str) -> None:
""" self._enable_profile(iccid)
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier.
""" def _enable_profile(self, iccid: str) -> None:
self._validate_successful(self._invoke('notification', 'process', '-a', '-r')) self._validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest:
if latest.iccid == iccid:
return
self._validate_successful(self._invoke('profile', 'disable', latest.iccid))
self._validate_successful(self._invoke('profile', 'enable', iccid))
self._process_notifications()
def _invoke(self, *cmd: str): def _invoke(self, *cmd: str):
proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
@ -124,47 +97,15 @@ class LPA:
return messages return messages
def _process_notifications(self) -> None:
"""
Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier.
"""
self._validate_successful(self._invoke('notification', 'process', '-a', '-r'))
def _validate_profile_exists(self, iccid: str) -> None: def _validate_profile_exists(self, iccid: str) -> None:
if not any(p.iccid == iccid for p in self.list_profiles()): if not any(p.iccid == iccid for p in self.list_profiles()):
raise LPAProfileNotFoundError(f'profile {iccid} does not exist') raise LPAProfileNotFoundError(f'profile {iccid} does not exist')
def _validate_successful(self, msgs: list[dict]) -> None: def _validate_successful(self, msgs: list[dict]) -> None:
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog='esim.py', description='manage eSIM profiles on your comma device', epilog='comma.ai')
parser.add_argument('--backend', choices=['qmi', 'at'], default='qmi', help='use the specified backend, defaults to qmi')
parser.add_argument('--enable', metavar='iccid', help='enable profile; will disable current profile')
parser.add_argument('--disable', metavar='iccid', help='disable profile')
parser.add_argument('--delete', metavar='iccid', help='delete profile (warning: this cannot be undone)')
parser.add_argument('--download', nargs=2, metavar=('qr', 'name'), help='download a profile using QR code (format: LPA:1$rsp.truphone.com$QRF-SPEEDTEST)')
parser.add_argument('--nickname', nargs=2, metavar=('iccid', 'name'), help='update the nickname for a profile')
args = parser.parse_args()
lpa = LPA(interface=args.backend)
if args.enable:
lpa.enable_profile(args.enable)
print('enabled profile, please restart device to apply changes')
elif args.disable:
lpa.disable_profile(args.disable)
print('disabled profile, please restart device to apply changes')
elif args.delete:
confirm = input('are you sure you want to delete this profile? (y/N) ')
if confirm == 'y':
lpa.delete_profile(args.delete)
print('deleted profile, please restart device to apply changes')
else:
print('cancelled')
exit(0)
elif args.download:
lpa.download_profile(args.download[0], args.download[1])
elif args.nickname:
lpa.nickname_profile(args.nickname[0], args.nickname[1])
else:
parser.print_help()
profiles = lpa.list_profiles()
print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:')
for p in profiles:
print(f'- {p.iccid} (nickname: {p.nickname or "<none provided>"}) (provider: {p.provider}) - {"enabled" if p.enabled else "disabled"}')

@ -10,8 +10,9 @@ from pathlib import Path
from cereal import log from cereal import log
from openpilot.common.gpio import gpio_set, gpio_init, get_irqs_for_action from openpilot.common.gpio import gpio_set, gpio_init, get_irqs_for_action
from openpilot.system.hardware.base import HardwareBase, ThermalConfig, ThermalZone from openpilot.system.hardware.base import HardwareBase, LPABase, ThermalConfig, ThermalZone
from openpilot.system.hardware.tici import iwlist from openpilot.system.hardware.tici import iwlist
from openpilot.system.hardware.tici.esim import TiciLPA
from openpilot.system.hardware.tici.pins import GPIO from openpilot.system.hardware.tici.pins import GPIO
from openpilot.system.hardware.tici.amplifier import Amplifier from openpilot.system.hardware.tici.amplifier import Amplifier
@ -198,6 +199,9 @@ class Tici(HardwareBase):
'data_connected': modem.Get(MM_MODEM, 'State', dbus_interface=DBUS_PROPS, timeout=TIMEOUT) == MM_MODEM_STATE.CONNECTED, 'data_connected': modem.Get(MM_MODEM, 'State', dbus_interface=DBUS_PROPS, timeout=TIMEOUT) == MM_MODEM_STATE.CONNECTED,
} }
def get_sim_lpa(self) -> LPABase:
return TiciLPA()
def get_imei(self, slot): def get_imei(self, slot):
if slot != 0: if slot != 0:
return "" return ""

@ -1,7 +1,7 @@
import pytest import pytest
from openpilot.system.hardware import TICI from openpilot.system.hardware import HARDWARE, TICI
from openpilot.system.hardware.tici.esim import LPA, LPAProfileNotFoundError from openpilot.system.hardware.base import LPAProfileNotFoundError
# https://euicc-manual.osmocom.org/docs/rsp/known-test-profile # https://euicc-manual.osmocom.org/docs/rsp/known-test-profile
# iccid is always the same for the given activation code # iccid is always the same for the given activation code
@ -11,7 +11,7 @@ TEST_ICCID = '8944476500001944011'
TEST_NICKNAME = 'test_profile' TEST_NICKNAME = 'test_profile'
def cleanup(): def cleanup():
lpa = LPA() lpa = HARDWARE.get_sim_lpa()
try: try:
lpa.delete_profile(TEST_ICCID) lpa.delete_profile(TEST_ICCID)
except LPAProfileNotFoundError: except LPAProfileNotFoundError:
@ -31,7 +31,7 @@ class TestEsim:
cleanup() cleanup()
def test_provision_enable_disable(self): def test_provision_enable_disable(self):
lpa = LPA() lpa = HARDWARE.get_sim_lpa()
current_active = lpa.get_active_profile() current_active = lpa.get_active_profile()
lpa.download_profile(TEST_ACTIVATION_CODE, TEST_NICKNAME) lpa.download_profile(TEST_ACTIVATION_CODE, TEST_NICKNAME)

@ -295,7 +295,7 @@ function op_check() {
function op_esim() { function op_esim() {
op_before_cmd op_before_cmd
op_run_command system/hardware/tici/esim.py "$@" op_run_command system/hardware/esim.py "$@"
} }
function op_build() { function op_build() {

Loading…
Cancel
Save