From 993b1b4d88bb55d4ab87ddd2ffc51e8cfc866522 Mon Sep 17 00:00:00 2001 From: Trey Moen <50057480+greatgitsby@users.noreply.github.com> Date: Sun, 25 May 2025 08:54:07 -0700 Subject: [PATCH] refactor: LPA interface, CLI location (#35328) * refactor: LPABase, simpler switch() interface * leave this since LPABase hides * hw-agnostic esim.py * newline * use latest --- system/hardware/base.py | 42 ++++++++++ system/hardware/esim.py | 38 +++++++++ system/hardware/pc/hardware.py | 6 +- system/hardware/tici/esim.py | 105 ++++++------------------ system/hardware/tici/hardware.py | 6 +- system/hardware/tici/tests/test_esim.py | 8 +- tools/op.sh | 2 +- 7 files changed, 117 insertions(+), 90 deletions(-) create mode 100755 system/hardware/esim.py mode change 100755 => 100644 system/hardware/tici/esim.py diff --git a/system/hardware/base.py b/system/hardware/base.py index 90b42b2f1f..e429f0e9f2 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -6,6 +6,19 @@ from cereal import log NetworkType = log.DeviceState.NetworkType +class LPAError(RuntimeError): + pass + +class LPAProfileNotFoundError(LPAError): + pass + +@dataclass +class Profile: + iccid: str + nickname: str + enabled: bool + provider: str + @dataclass class ThermalZone: # a zone from /sys/class/thermal/thermal_zone* @@ -51,6 +64,31 @@ class ThermalConfig: ret[f.name + "TempC"] = v.read() return ret +class LPABase(ABC): + @abstractmethod + def list_profiles(self) -> list[Profile]: + pass + + @abstractmethod + def get_active_profile(self) -> Profile | None: + pass + + @abstractmethod + def delete_profile(self, iccid: str) -> None: + pass + + @abstractmethod + def download_profile(self, qr: str, nickname: str | None = None) -> None: + pass + + @abstractmethod + def nickname_profile(self, iccid: str, nickname: str) -> None: + pass + + @abstractmethod + def switch_profile(self, iccid: str) -> None: + pass + class HardwareBase(ABC): @staticmethod def get_cmdline() -> dict[str, str]: @@ -105,6 +143,10 @@ class HardwareBase(ABC): def get_sim_info(self): pass + @abstractmethod + def get_sim_lpa(self) -> LPABase: + pass + @abstractmethod def get_network_strength(self, network_type): pass diff --git a/system/hardware/esim.py b/system/hardware/esim.py new file mode 100755 index 0000000000..6668a1cdd3 --- /dev/null +++ b/system/hardware/esim.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import argparse + +from openpilot.system.hardware import HARDWARE + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(prog='esim.py', description='manage eSIM profiles on your comma device', epilog='comma.ai') + parser.add_argument('--backend', choices=['qmi', 'at'], default='qmi', help='use the specified backend, defaults to qmi') + parser.add_argument('--switch', metavar='iccid', help='switch to profile') + parser.add_argument('--delete', metavar='iccid', help='delete profile (warning: this cannot be undone)') + parser.add_argument('--download', nargs=2, metavar=('qr', 'name'), help='download a profile using QR code (format: LPA:1$rsp.truphone.com$QRF-SPEEDTEST)') + parser.add_argument('--nickname', nargs=2, metavar=('iccid', 'name'), help='update the nickname for a profile') + args = parser.parse_args() + + lpa = HARDWARE.get_sim_lpa() + if args.switch: + lpa.switch_profile(args.switch) + elif args.delete: + confirm = input('are you sure you want to delete this profile? (y/N) ') + if confirm == 'y': + lpa.delete_profile(args.delete) + print('deleted profile, please restart device to apply changes') + else: + print('cancelled') + exit(0) + elif args.download: + lpa.download_profile(args.download[0], args.download[1]) + elif args.nickname: + lpa.nickname_profile(args.nickname[0], args.nickname[1]) + else: + parser.print_help() + + profiles = lpa.list_profiles() + print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:') + for p in profiles: + print(f'- {p.iccid} (nickname: {p.nickname or ""}) (provider: {p.provider}) - {"enabled" if p.enabled else "disabled"}') diff --git a/system/hardware/pc/hardware.py b/system/hardware/pc/hardware.py index 017a449c90..9a80f10bed 100644 --- a/system/hardware/pc/hardware.py +++ b/system/hardware/pc/hardware.py @@ -1,12 +1,11 @@ import random from cereal import log -from openpilot.system.hardware.base import HardwareBase +from openpilot.system.hardware.base import HardwareBase, LPABase NetworkType = log.DeviceState.NetworkType NetworkStrength = log.DeviceState.NetworkStrength - class Pc(HardwareBase): def get_os_version(self): return None @@ -41,6 +40,9 @@ class Pc(HardwareBase): 'data_connected': False } + def get_sim_lpa(self) -> LPABase: + raise NotImplementedError("SIM LPA not implemented for PC") + def get_network_strength(self, network_type): return NetworkStrength.unknown diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py old mode 100755 new mode 100644 index 0a07e77e16..f657966ddf --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -1,28 +1,12 @@ -#!/usr/bin/env python3 - -import argparse import json import os import shutil import subprocess -from dataclasses import dataclass from typing import Literal -@dataclass -class Profile: - iccid: str - nickname: str - enabled: bool - provider: str - -class LPAError(RuntimeError): - pass - -class LPAProfileNotFoundError(LPAError): - pass - +from openpilot.system.hardware.base import LPABase, LPAError, LPAProfileNotFoundError, Profile -class LPA: +class TiciLPA(LPABase): def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'): self.env = os.environ.copy() self.env['LPAC_APDU'] = interface @@ -47,31 +31,13 @@ class LPA: def get_active_profile(self) -> Profile | None: return next((p for p in self.list_profiles() if p.enabled), None) - def enable_profile(self, iccid: str) -> None: - self._validate_profile_exists(iccid) - latest = self.get_active_profile() - if latest: - if latest.iccid == iccid: - return - self.disable_profile(latest.iccid) - self._validate_successful(self._invoke('profile', 'enable', iccid)) - self.process_notifications() - - def disable_profile(self, iccid: str) -> None: - self._validate_profile_exists(iccid) - latest = self.get_active_profile() - if latest is not None and latest.iccid != iccid: - return - self._validate_successful(self._invoke('profile', 'disable', iccid)) - self.process_notifications() - def delete_profile(self, iccid: str) -> None: self._validate_profile_exists(iccid) latest = self.get_active_profile() if latest is not None and latest.iccid == iccid: - self.disable_profile(iccid) + raise LPAError('cannot delete active profile, switch to another profile first') self._validate_successful(self._invoke('profile', 'delete', iccid)) - self.process_notifications() + self._process_notifications() def download_profile(self, qr: str, nickname: str | None = None) -> None: msgs = self._invoke('profile', 'download', '-a', qr) @@ -81,17 +47,24 @@ class LPA: raise LPAError('no new profile found') if nickname: self.nickname_profile(new_profile['payload']['data']['iccid'], nickname) - self.process_notifications() + self._process_notifications() def nickname_profile(self, iccid: str, nickname: str) -> None: self._validate_profile_exists(iccid) self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) - def process_notifications(self) -> None: - """ - Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier. - """ - self._validate_successful(self._invoke('notification', 'process', '-a', '-r')) + def switch_profile(self, iccid: str) -> None: + self._enable_profile(iccid) + + def _enable_profile(self, iccid: str) -> None: + self._validate_profile_exists(iccid) + latest = self.get_active_profile() + if latest: + if latest.iccid == iccid: + return + self._validate_successful(self._invoke('profile', 'disable', latest.iccid)) + self._validate_successful(self._invoke('profile', 'enable', iccid)) + self._process_notifications() def _invoke(self, *cmd: str): proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) @@ -124,47 +97,15 @@ class LPA: return messages + def _process_notifications(self) -> None: + """ + Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier. + """ + self._validate_successful(self._invoke('notification', 'process', '-a', '-r')) + def _validate_profile_exists(self, iccid: str) -> None: if not any(p.iccid == iccid for p in self.list_profiles()): raise LPAProfileNotFoundError(f'profile {iccid} does not exist') def _validate_successful(self, msgs: list[dict]) -> None: assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' - - -if __name__ == "__main__": - parser = argparse.ArgumentParser(prog='esim.py', description='manage eSIM profiles on your comma device', epilog='comma.ai') - parser.add_argument('--backend', choices=['qmi', 'at'], default='qmi', help='use the specified backend, defaults to qmi') - parser.add_argument('--enable', metavar='iccid', help='enable profile; will disable current profile') - parser.add_argument('--disable', metavar='iccid', help='disable profile') - parser.add_argument('--delete', metavar='iccid', help='delete profile (warning: this cannot be undone)') - parser.add_argument('--download', nargs=2, metavar=('qr', 'name'), help='download a profile using QR code (format: LPA:1$rsp.truphone.com$QRF-SPEEDTEST)') - parser.add_argument('--nickname', nargs=2, metavar=('iccid', 'name'), help='update the nickname for a profile') - args = parser.parse_args() - - lpa = LPA(interface=args.backend) - if args.enable: - lpa.enable_profile(args.enable) - print('enabled profile, please restart device to apply changes') - elif args.disable: - lpa.disable_profile(args.disable) - print('disabled profile, please restart device to apply changes') - elif args.delete: - confirm = input('are you sure you want to delete this profile? (y/N) ') - if confirm == 'y': - lpa.delete_profile(args.delete) - print('deleted profile, please restart device to apply changes') - else: - print('cancelled') - exit(0) - elif args.download: - lpa.download_profile(args.download[0], args.download[1]) - elif args.nickname: - lpa.nickname_profile(args.nickname[0], args.nickname[1]) - else: - parser.print_help() - - profiles = lpa.list_profiles() - print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:') - for p in profiles: - print(f'- {p.iccid} (nickname: {p.nickname or ""}) (provider: {p.provider}) - {"enabled" if p.enabled else "disabled"}') diff --git a/system/hardware/tici/hardware.py b/system/hardware/tici/hardware.py index 5a8e41b51f..3add52d21c 100644 --- a/system/hardware/tici/hardware.py +++ b/system/hardware/tici/hardware.py @@ -10,8 +10,9 @@ from pathlib import Path from cereal import log from openpilot.common.gpio import gpio_set, gpio_init, get_irqs_for_action -from openpilot.system.hardware.base import HardwareBase, ThermalConfig, ThermalZone +from openpilot.system.hardware.base import HardwareBase, LPABase, ThermalConfig, ThermalZone from openpilot.system.hardware.tici import iwlist +from openpilot.system.hardware.tici.esim import TiciLPA from openpilot.system.hardware.tici.pins import GPIO from openpilot.system.hardware.tici.amplifier import Amplifier @@ -198,6 +199,9 @@ class Tici(HardwareBase): 'data_connected': modem.Get(MM_MODEM, 'State', dbus_interface=DBUS_PROPS, timeout=TIMEOUT) == MM_MODEM_STATE.CONNECTED, } + def get_sim_lpa(self) -> LPABase: + return TiciLPA() + def get_imei(self, slot): if slot != 0: return "" diff --git a/system/hardware/tici/tests/test_esim.py b/system/hardware/tici/tests/test_esim.py index d36bdaa27b..6fab931cce 100644 --- a/system/hardware/tici/tests/test_esim.py +++ b/system/hardware/tici/tests/test_esim.py @@ -1,7 +1,7 @@ import pytest -from openpilot.system.hardware import TICI -from openpilot.system.hardware.tici.esim import LPA, LPAProfileNotFoundError +from openpilot.system.hardware import HARDWARE, TICI +from openpilot.system.hardware.base import LPAProfileNotFoundError # https://euicc-manual.osmocom.org/docs/rsp/known-test-profile # iccid is always the same for the given activation code @@ -11,7 +11,7 @@ TEST_ICCID = '8944476500001944011' TEST_NICKNAME = 'test_profile' def cleanup(): - lpa = LPA() + lpa = HARDWARE.get_sim_lpa() try: lpa.delete_profile(TEST_ICCID) except LPAProfileNotFoundError: @@ -31,7 +31,7 @@ class TestEsim: cleanup() def test_provision_enable_disable(self): - lpa = LPA() + lpa = HARDWARE.get_sim_lpa() current_active = lpa.get_active_profile() lpa.download_profile(TEST_ACTIVATION_CODE, TEST_NICKNAME) diff --git a/tools/op.sh b/tools/op.sh index 7c17c949e4..9142d50a18 100755 --- a/tools/op.sh +++ b/tools/op.sh @@ -295,7 +295,7 @@ function op_check() { function op_esim() { op_before_cmd - op_run_command system/hardware/tici/esim.py "$@" + op_run_command system/hardware/esim.py "$@" } function op_build() {