From c9feff0f4dcb0549adec926a252aa5e7b26fcb71 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Mon, 1 Sep 2025 10:41:42 -0700 Subject: [PATCH 01/18] feat: add LPA validations --- system/hardware/base.py | 19 +++++ system/hardware/tests/test_lpa_validation.py | 80 ++++++++++++++++++++ system/hardware/tici/esim.py | 21 ++--- 3 files changed, 110 insertions(+), 10 deletions(-) create mode 100644 system/hardware/tests/test_lpa_validation.py diff --git a/system/hardware/base.py b/system/hardware/base.py index ce97bf294d..2e831c4c82 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -1,4 +1,5 @@ import os +import re from abc import abstractmethod, ABC from dataclasses import dataclass, fields @@ -96,6 +97,24 @@ class LPABase(ABC): def is_comma_profile(self, iccid: str) -> bool: return any(iccid.startswith(prefix) for prefix in ('8985235',)) + def _validate_iccid(self, iccid: str) -> None: + assert re.match(r'^89\d{17,18}$', iccid), 'invalid ICCID format. expected format: 8988303000000614227' + + def _validate_lpa_activation_code(self, lpa_activation_code: str) -> None: + assert re.match(r'^LPA:1\$.+\$.+$', lpa_activation_code), 'invalid LPA activation code format. expected format: LPA:1$domain$code' + + def _validate_nickname(self, nickname: str) -> None: + assert len(nickname) >= 1 and len(nickname) <= 16, 'nickname must be between 1 and 16 characters' + assert re.match(r'^[a-zA-Z0-9-_ ]+$', nickname), 'nickname must contain only alphanumeric characters, hyphens, underscores, and spaces' + + def _validate_profile_exists(self, iccid: str) -> None: + if not any(p.iccid == iccid for p in self.list_profiles()): + raise LPAProfileNotFoundError(f'profile {iccid} does not exist') + + def _validate_successful(self, msgs: list[dict]) -> None: + assert len(msgs) > 0, 'expected at least one message' + assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' + class HardwareBase(ABC): @staticmethod def get_cmdline() -> dict[str, str]: diff --git a/system/hardware/tests/test_lpa_validation.py b/system/hardware/tests/test_lpa_validation.py new file mode 100644 index 0000000000..5060b358b1 --- /dev/null +++ b/system/hardware/tests/test_lpa_validation.py @@ -0,0 +1,80 @@ +import pytest + +from openpilot.system.hardware.base import LPABase, LPAProfileNotFoundError, Profile + + +class TestLPABase(LPABase): + + def list_profiles(self) -> list[Profile]: + return [] + + def get_active_profile(self) -> Profile | None: + return None + + def delete_profile(self, iccid: str) -> None: + pass + + def download_profile(self, qr: str, nickname: str | None = None) -> None: + pass + + def nickname_profile(self, iccid: str, nickname: str) -> None: + pass + + def switch_profile(self, iccid: str) -> None: + pass + + +class TestLPAValidation: + + def setup_method(self): + self.lpa = TestLPABase() + + def test_validate_iccid(self): + self.lpa._validate_iccid('8988303000000614227') + + with pytest.raises(AssertionError, match='invalid ICCID format'): + self.lpa._validate_iccid('') + + with pytest.raises(AssertionError, match='invalid ICCID format'): + self.lpa._validate_iccid('1234567890123456789') # Doesn't start with 89 + + def test_validate_lpa_activation_code(self): + self.lpa._validate_lpa_activation_code('LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5') + + with pytest.raises(AssertionError, match='invalid LPA activation code format'): + self.lpa._validate_lpa_activation_code('') + + with pytest.raises(AssertionError, match='invalid LPA activation code format'): + self.lpa._validate_lpa_activation_code('LPA:1$domain.com') # Missing third part + + def test_validate_nickname(self): + self.lpa._validate_nickname('test_profile') + + with pytest.raises(AssertionError, match='nickname must be between 1 and 16 characters'): + self.lpa._validate_nickname('') + + with pytest.raises(AssertionError, match='nickname must contain only alphanumeric characters'): + self.lpa._validate_nickname('test.profile') # Contains invalid character + + def test_validate_successful(self): + self.lpa._validate_successful([{'payload': {'message': 'success'}}]) + + with pytest.raises(AssertionError, match='expected at least one message'): + self.lpa._validate_successful([]) + + with pytest.raises(AssertionError, match='expected success notification'): + self.lpa._validate_successful([{'payload': {'message': 'error'}}]) + + def test_validate_profile_exists(self, mocker): + existing_profiles = [Profile(iccid='8988303000000614227', nickname='test1', enabled=True, provider='Test Provider')] + + mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles) + self.lpa._validate_profile_exists('8988303000000614227') + + mocker.patch.object(self.lpa, 'list_profiles', return_value=[]) + with pytest.raises(LPAProfileNotFoundError, match='profile 8988303000000614227 does not exist'): + self.lpa._validate_profile_exists('8988303000000614227') + + mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles) + with pytest.raises(LPAProfileNotFoundError, match='profile 8988303000000614229 does not exist'): + self.lpa._validate_profile_exists('8988303000000614229') diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index 391ba45531..14dea4865d 100644 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -4,7 +4,7 @@ import shutil import subprocess from typing import Literal -from openpilot.system.hardware.base import LPABase, LPAError, LPAProfileNotFoundError, Profile +from openpilot.system.hardware.base import LPABase, LPAError, Profile class TiciLPA(LPABase): def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'): @@ -32,6 +32,7 @@ class TiciLPA(LPABase): return next((p for p in self.list_profiles() if p.enabled), None) def delete_profile(self, iccid: str) -> None: + self._validate_iccid(iccid) self._validate_profile_exists(iccid) latest = self.get_active_profile() if latest is not None and latest.iccid == iccid: @@ -39,9 +40,13 @@ class TiciLPA(LPABase): self._validate_successful(self._invoke('profile', 'delete', iccid)) self._process_notifications() - def download_profile(self, qr: str, nickname: str | None = None) -> None: + def download_profile(self, lpa_activation_code: str, nickname: str | None = None) -> None: self._check_bootstrapped() - msgs = self._invoke('profile', 'download', '-a', qr) + self._validate_lpa_activation_code(lpa_activation_code) + if nickname: + self._validate_nickname(nickname) + + msgs = self._invoke('profile', 'download', '-a', lpa_activation_code) self._validate_successful(msgs) new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None) if new_profile is None: @@ -51,11 +56,14 @@ class TiciLPA(LPABase): self._process_notifications() def nickname_profile(self, iccid: str, nickname: str) -> None: + self._validate_iccid(iccid) self._validate_profile_exists(iccid) + self._validate_nickname(nickname) self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) def switch_profile(self, iccid: str) -> None: self._check_bootstrapped() + self._validate_iccid(iccid) self._validate_profile_exists(iccid) latest = self.get_active_profile() if latest and latest.iccid == iccid: @@ -126,10 +134,3 @@ class TiciLPA(LPABase): Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier. """ self._validate_successful(self._invoke('notification', 'process', '-a', '-r')) - - def _validate_profile_exists(self, iccid: str) -> None: - if not any(p.iccid == iccid for p in self.list_profiles()): - raise LPAProfileNotFoundError(f'profile {iccid} does not exist') - - def _validate_successful(self, msgs: list[dict]) -> None: - assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' From 3004059e7a610606781e9777bf8227bbda9304dd Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Mon, 1 Sep 2025 10:46:51 -0700 Subject: [PATCH 02/18] wiki --- system/hardware/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/system/hardware/base.py b/system/hardware/base.py index 2e831c4c82..00f4a2bff7 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -98,6 +98,7 @@ class LPABase(ABC): return any(iccid.startswith(prefix) for prefix in ('8985235',)) def _validate_iccid(self, iccid: str) -> None: + # https://en.wikipedia.org/wiki/E.118#ICCID assert re.match(r'^89\d{17,18}$', iccid), 'invalid ICCID format. expected format: 8988303000000614227' def _validate_lpa_activation_code(self, lpa_activation_code: str) -> None: From 07455c77d8cab0d8b903a382b718a122599d2df2 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 18:45:51 -0700 Subject: [PATCH 03/18] cleanup --- system/hardware/base.py | 18 +++----- system/hardware/tests/test_lpa_validation.py | 46 +++++++++----------- system/hardware/tici/esim.py | 4 ++ system/hardware/tici/tests/test_esim.py | 3 +- 4 files changed, 30 insertions(+), 41 deletions(-) diff --git a/system/hardware/base.py b/system/hardware/base.py index 00f4a2bff7..269c4022a9 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -10,9 +10,6 @@ NetworkType = log.DeviceState.NetworkType class LPAError(RuntimeError): pass -class LPAProfileNotFoundError(LPAError): - pass - @dataclass class Profile: iccid: str @@ -97,24 +94,19 @@ class LPABase(ABC): def is_comma_profile(self, iccid: str) -> bool: return any(iccid.startswith(prefix) for prefix in ('8985235',)) - def _validate_iccid(self, iccid: str) -> None: + def validate_iccid(self, iccid: str) -> None: # https://en.wikipedia.org/wiki/E.118#ICCID assert re.match(r'^89\d{17,18}$', iccid), 'invalid ICCID format. expected format: 8988303000000614227' - def _validate_lpa_activation_code(self, lpa_activation_code: str) -> None: + def validate_lpa_activation_code(self, lpa_activation_code: str) -> None: assert re.match(r'^LPA:1\$.+\$.+$', lpa_activation_code), 'invalid LPA activation code format. expected format: LPA:1$domain$code' - def _validate_nickname(self, nickname: str) -> None: + def validate_nickname(self, nickname: str) -> None: assert len(nickname) >= 1 and len(nickname) <= 16, 'nickname must be between 1 and 16 characters' assert re.match(r'^[a-zA-Z0-9-_ ]+$', nickname), 'nickname must contain only alphanumeric characters, hyphens, underscores, and spaces' - def _validate_profile_exists(self, iccid: str) -> None: - if not any(p.iccid == iccid for p in self.list_profiles()): - raise LPAProfileNotFoundError(f'profile {iccid} does not exist') - - def _validate_successful(self, msgs: list[dict]) -> None: - assert len(msgs) > 0, 'expected at least one message' - assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' + def validate_profile_exists(self, iccid: str) -> None: + assert any(p.iccid == iccid for p in self.list_profiles()), f'profile {iccid} does not exist' class HardwareBase(ABC): @staticmethod diff --git a/system/hardware/tests/test_lpa_validation.py b/system/hardware/tests/test_lpa_validation.py index 5060b358b1..11fec71f17 100644 --- a/system/hardware/tests/test_lpa_validation.py +++ b/system/hardware/tests/test_lpa_validation.py @@ -1,9 +1,12 @@ import pytest -from openpilot.system.hardware.base import LPABase, LPAProfileNotFoundError, Profile +from openpilot.system.hardware.base import LPABase, Profile -class TestLPABase(LPABase): +class MockLPA(LPABase): + + def bootstrap(self) -> None: + pass def list_profiles(self) -> list[Profile]: return [] @@ -27,54 +30,45 @@ class TestLPABase(LPABase): class TestLPAValidation: def setup_method(self): - self.lpa = TestLPABase() + self.lpa = MockLPA() def test_validate_iccid(self): - self.lpa._validate_iccid('8988303000000614227') + self.lpa.validate_iccid('8988303000000614227') with pytest.raises(AssertionError, match='invalid ICCID format'): - self.lpa._validate_iccid('') + self.lpa.validate_iccid('') with pytest.raises(AssertionError, match='invalid ICCID format'): - self.lpa._validate_iccid('1234567890123456789') # Doesn't start with 89 + self.lpa.validate_iccid('1234567890123456789') # Doesn't start with 89 def test_validate_lpa_activation_code(self): - self.lpa._validate_lpa_activation_code('LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5') + self.lpa.validate_lpa_activation_code('LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5') with pytest.raises(AssertionError, match='invalid LPA activation code format'): - self.lpa._validate_lpa_activation_code('') + self.lpa.validate_lpa_activation_code('') with pytest.raises(AssertionError, match='invalid LPA activation code format'): - self.lpa._validate_lpa_activation_code('LPA:1$domain.com') # Missing third part + self.lpa.validate_lpa_activation_code('LPA:1$domain.com') # Missing third part def test_validate_nickname(self): - self.lpa._validate_nickname('test_profile') + self.lpa.validate_nickname('test_profile') with pytest.raises(AssertionError, match='nickname must be between 1 and 16 characters'): - self.lpa._validate_nickname('') + self.lpa.validate_nickname('') with pytest.raises(AssertionError, match='nickname must contain only alphanumeric characters'): - self.lpa._validate_nickname('test.profile') # Contains invalid character - - def test_validate_successful(self): - self.lpa._validate_successful([{'payload': {'message': 'success'}}]) - - with pytest.raises(AssertionError, match='expected at least one message'): - self.lpa._validate_successful([]) - - with pytest.raises(AssertionError, match='expected success notification'): - self.lpa._validate_successful([{'payload': {'message': 'error'}}]) + self.lpa.validate_nickname('test.profile') # Contains invalid character def test_validate_profile_exists(self, mocker): existing_profiles = [Profile(iccid='8988303000000614227', nickname='test1', enabled=True, provider='Test Provider')] mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles) - self.lpa._validate_profile_exists('8988303000000614227') + self.lpa.validate_profile_exists('8988303000000614227') mocker.patch.object(self.lpa, 'list_profiles', return_value=[]) - with pytest.raises(LPAProfileNotFoundError, match='profile 8988303000000614227 does not exist'): - self.lpa._validate_profile_exists('8988303000000614227') + with pytest.raises(AssertionError, match='profile 8988303000000614227 does not exist'): + self.lpa.validate_profile_exists('8988303000000614227') mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles) - with pytest.raises(LPAProfileNotFoundError, match='profile 8988303000000614229 does not exist'): - self.lpa._validate_profile_exists('8988303000000614229') + with pytest.raises(AssertionError, match='profile 8988303000000614229 does not exist'): + self.lpa.validate_profile_exists('8988303000000614229') diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index 14dea4865d..ad46138d4a 100644 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -134,3 +134,7 @@ class TiciLPA(LPABase): Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier. """ self._validate_successful(self._invoke('notification', 'process', '-a', '-r')) + + def _validate_successful(self, msgs: list[dict]) -> None: + assert len(msgs) > 0, 'expected at least one message' + assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' diff --git a/system/hardware/tici/tests/test_esim.py b/system/hardware/tici/tests/test_esim.py index 6fab931cce..c6557a7801 100644 --- a/system/hardware/tici/tests/test_esim.py +++ b/system/hardware/tici/tests/test_esim.py @@ -1,7 +1,6 @@ import pytest from openpilot.system.hardware import HARDWARE, TICI -from openpilot.system.hardware.base import LPAProfileNotFoundError # https://euicc-manual.osmocom.org/docs/rsp/known-test-profile # iccid is always the same for the given activation code @@ -14,7 +13,7 @@ def cleanup(): lpa = HARDWARE.get_sim_lpa() try: lpa.delete_profile(TEST_ICCID) - except LPAProfileNotFoundError: + except AssertionError: pass lpa.process_notifications() From d425c69c9c3bc334ff59153a19c4d34a98ccbb28 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 18:49:27 -0700 Subject: [PATCH 04/18] _ --- system/hardware/tici/esim.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index ad46138d4a..bb1325a136 100644 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -32,8 +32,8 @@ class TiciLPA(LPABase): return next((p for p in self.list_profiles() if p.enabled), None) def delete_profile(self, iccid: str) -> None: - self._validate_iccid(iccid) - self._validate_profile_exists(iccid) + self.validate_iccid(iccid) + self.validate_profile_exists(iccid) latest = self.get_active_profile() if latest is not None and latest.iccid == iccid: raise LPAError('cannot delete active profile, switch to another profile first') @@ -44,7 +44,7 @@ class TiciLPA(LPABase): self._check_bootstrapped() self._validate_lpa_activation_code(lpa_activation_code) if nickname: - self._validate_nickname(nickname) + self.validate_nickname(nickname) msgs = self._invoke('profile', 'download', '-a', lpa_activation_code) self._validate_successful(msgs) @@ -56,15 +56,15 @@ class TiciLPA(LPABase): self._process_notifications() def nickname_profile(self, iccid: str, nickname: str) -> None: - self._validate_iccid(iccid) - self._validate_profile_exists(iccid) - self._validate_nickname(nickname) + self.validate_iccid(iccid) + self.validate_profile_exists(iccid) + self.validate_nickname(nickname) self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) def switch_profile(self, iccid: str) -> None: self._check_bootstrapped() - self._validate_iccid(iccid) - self._validate_profile_exists(iccid) + self.validate_iccid(iccid) + self.validate_profile_exists(iccid) latest = self.get_active_profile() if latest and latest.iccid == iccid: return From c59d26b8b4c28a46c5aed3376c88e047b7c39f52 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 18:50:13 -0700 Subject: [PATCH 05/18] moar --- system/hardware/tici/esim.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index bb1325a136..4203a35456 100644 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -42,7 +42,7 @@ class TiciLPA(LPABase): def download_profile(self, lpa_activation_code: str, nickname: str | None = None) -> None: self._check_bootstrapped() - self._validate_lpa_activation_code(lpa_activation_code) + self.validate_lpa_activation_code(lpa_activation_code) if nickname: self.validate_nickname(nickname) From 34bbc76b1c8ff91ff5de544263335dd2cf468398 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Mon, 1 Sep 2025 10:42:51 -0700 Subject: [PATCH 06/18] add esim methods to athena --- system/athena/athenad.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 6ed53b759c..43710905a6 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -560,6 +560,22 @@ def getNetworks(): return HARDWARE.get_networks() +@dispatcher.add_method +def getEsimProfiles(): + return [asdict(p) for p in HARDWARE.get_sim_lpa().list_profiles()] + + +@dispatcher.add_method +def downloadEsimProfile(lpa_activation_code: str, profile_name: str): + HARDWARE.get_sim_lpa().download_profile(lpa_activation_code, profile_name) + + +@dispatcher.add_method +def setEsimProfile(iccid: str): + HARDWARE.get_sim_lpa().switch_profile(iccid) + HARDWARE.reboot_modem() + + @dispatcher.add_method def takeSnapshot() -> str | dict[str, str] | None: from openpilot.system.camerad.snapshot import jpeg_write, snapshot From ab25580a978be3ee814ef84cfe8a45c7218452f7 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 19:02:49 -0700 Subject: [PATCH 07/18] validations --- system/athena/athenad.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 43710905a6..94ca0be618 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -567,12 +567,18 @@ def getEsimProfiles(): @dispatcher.add_method def downloadEsimProfile(lpa_activation_code: str, profile_name: str): - HARDWARE.get_sim_lpa().download_profile(lpa_activation_code, profile_name) + lpa = HARDWARE.get_sim_lpa() + lpa.validate_lpa_activation_code(lpa_activation_code) + lpa.validate_nickname(profile_name) + lpa.download_profile(lpa_activation_code, profile_name) @dispatcher.add_method def setEsimProfile(iccid: str): - HARDWARE.get_sim_lpa().switch_profile(iccid) + lpa = HARDWARE.get_sim_lpa() + lpa.validate_iccid(iccid) + lpa.validate_profile_exists(iccid) + lpa.switch_profile(iccid) HARDWARE.reboot_modem() From 3d39ef05e816b7d68654112dc444b97b90feebd2 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 19:39:27 -0700 Subject: [PATCH 08/18] also expose is_bootstrapped --- system/athena/athenad.py | 10 ++++++++-- system/hardware/base.py | 3 +++ system/hardware/tici/esim.py | 10 +++++----- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 94ca0be618..426f1ebad3 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -561,8 +561,14 @@ def getNetworks(): @dispatcher.add_method -def getEsimProfiles(): - return [asdict(p) for p in HARDWARE.get_sim_lpa().list_profiles()] +def getEsim(): + lpa = HARDWARE.get_sim_lpa() + profiles = [asdict(p) for p in lpa.list_profiles()] + is_bootstrapped = lpa.is_bootstrapped() + return { + "profiles": profiles, + "is_bootstrapped": is_bootstrapped + } @dispatcher.add_method diff --git a/system/hardware/base.py b/system/hardware/base.py index 269c4022a9..4910d1cfb3 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -65,6 +65,9 @@ class ThermalConfig: class LPABase(ABC): @abstractmethod def bootstrap(self) -> None: + + @abstractmethod + def is_bootstrapped(self) -> bool: pass @abstractmethod diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index 4203a35456..ebbf4ae459 100644 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -87,16 +87,16 @@ class TiciLPA(LPABase): self._disable_profile(p.iccid) self.delete_profile(p.iccid) + def is_bootstrapped(self) -> bool: + """ check if any comma provisioned profiles are on the eUICC """ + return not any(self.is_comma_profile(iccid) for iccid in (p.iccid for p in self.list_profiles())) + def _disable_profile(self, iccid: str) -> None: self._validate_successful(self._invoke('profile', 'disable', iccid)) self._process_notifications() def _check_bootstrapped(self) -> None: - assert self._is_bootstrapped(), 'eUICC is not bootstrapped, please bootstrap before performing this operation' - - def _is_bootstrapped(self) -> bool: - """ check if any comma provisioned profiles are on the eUICC """ - return not any(self.is_comma_profile(iccid) for iccid in (p.iccid for p in self.list_profiles())) + assert self.is_bootstrapped(), 'eUICC is not bootstrapped, please bootstrap before performing this operation' def _invoke(self, *cmd: str): proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) From c28249da7feff0092ce3bf4593617c7a5cb1f2ac Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 19:40:55 -0700 Subject: [PATCH 09/18] bootstrap to API --- system/athena/athenad.py | 5 +++++ system/hardware/base.py | 1 + 2 files changed, 6 insertions(+) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 426f1ebad3..e26b1f3cec 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -571,6 +571,11 @@ def getEsim(): } +@dispatcher.add_method +def bootstrapEsim(): + HARDWARE.get_sim_lpa().bootstrap() + + @dispatcher.add_method def downloadEsimProfile(lpa_activation_code: str, profile_name: str): lpa = HARDWARE.get_sim_lpa() diff --git a/system/hardware/base.py b/system/hardware/base.py index 4910d1cfb3..0cc90de6a3 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -65,6 +65,7 @@ class ThermalConfig: class LPABase(ABC): @abstractmethod def bootstrap(self) -> None: + pass @abstractmethod def is_bootstrapped(self) -> bool: From 24457d4ded9e491109bef2f44f6263e58b86a669 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 19:42:23 -0700 Subject: [PATCH 10/18] its just a sim --- system/athena/athenad.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index e26b1f3cec..6a5d3d5294 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -561,7 +561,7 @@ def getNetworks(): @dispatcher.add_method -def getEsim(): +def describeSim(): lpa = HARDWARE.get_sim_lpa() profiles = [asdict(p) for p in lpa.list_profiles()] is_bootstrapped = lpa.is_bootstrapped() @@ -577,7 +577,7 @@ def bootstrapEsim(): @dispatcher.add_method -def downloadEsimProfile(lpa_activation_code: str, profile_name: str): +def downloadSimProfile(lpa_activation_code: str, profile_name: str): lpa = HARDWARE.get_sim_lpa() lpa.validate_lpa_activation_code(lpa_activation_code) lpa.validate_nickname(profile_name) @@ -585,7 +585,7 @@ def downloadEsimProfile(lpa_activation_code: str, profile_name: str): @dispatcher.add_method -def setEsimProfile(iccid: str): +def setSimProfile(iccid: str): lpa = HARDWARE.get_sim_lpa() lpa.validate_iccid(iccid) lpa.validate_profile_exists(iccid) From 9fe5d589f530d997e032203699b0aa9e95a7b132 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 19:47:34 -0700 Subject: [PATCH 11/18] rename bootstrapSim, force acknowledgement in request --- system/athena/athenad.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 6a5d3d5294..430bfafe5f 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -572,7 +572,8 @@ def describeSim(): @dispatcher.add_method -def bootstrapEsim(): +def bootstrapSim(acknowledged: bool): + assert acknowledged, 'you must acknowledge the risks of this operation' HARDWARE.get_sim_lpa().bootstrap() From 2ba75f2ed915d8e2cca686d7be1f8ec6713a7b14 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 19:49:38 -0700 Subject: [PATCH 12/18] warn! --- system/athena/athenad.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 430bfafe5f..f90cb26658 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -573,7 +573,7 @@ def describeSim(): @dispatcher.add_method def bootstrapSim(acknowledged: bool): - assert acknowledged, 'you must acknowledge the risks of this operation' + assert acknowledged, 'you must acknowledge the operation to proceed' HARDWARE.get_sim_lpa().bootstrap() From ca7037188e775a93d1cc4581a0f133f464815311 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 20:54:05 -0700 Subject: [PATCH 13/18] is comma profile --- system/hardware/base.py | 1 + system/hardware/tici/esim.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/system/hardware/base.py b/system/hardware/base.py index 0cc90de6a3..77b4a44bed 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -16,6 +16,7 @@ class Profile: nickname: str enabled: bool provider: str + is_comma_profile: bool @dataclass class ThermalZone: diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index ebbf4ae459..b70f615eba 100644 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -25,7 +25,8 @@ class TiciLPA(LPABase): iccid=p['iccid'], nickname=p['profileNickname'], enabled=p['profileState'] == 'enabled', - provider=p['serviceProviderName'] + provider=p['serviceProviderName'], + is_comma_profile=self.is_comma_profile(p['iccid']) ) for p in msgs[-1]['payload']['data']] def get_active_profile(self) -> Profile | None: From 8ced742a6fad1c541b578f63b6de41634e7f554b Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 21:28:08 -0700 Subject: [PATCH 14/18] move --- system/athena/athenad.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index f90cb26658..61fb84bc3b 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -560,6 +560,12 @@ def getNetworks(): return HARDWARE.get_networks() +@dispatcher.add_method +def bootstrapSim(acknowledged: bool): + assert acknowledged, 'you must acknowledge the operation to proceed' + HARDWARE.get_sim_lpa().bootstrap() + + @dispatcher.add_method def describeSim(): lpa = HARDWARE.get_sim_lpa() @@ -571,12 +577,6 @@ def describeSim(): } -@dispatcher.add_method -def bootstrapSim(acknowledged: bool): - assert acknowledged, 'you must acknowledge the operation to proceed' - HARDWARE.get_sim_lpa().bootstrap() - - @dispatcher.add_method def downloadSimProfile(lpa_activation_code: str, profile_name: str): lpa = HARDWARE.get_sim_lpa() From 64790333f5d307a0b6c0fd3623efb8bb79c31e41 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 21:31:03 -0700 Subject: [PATCH 15/18] bug! --- system/hardware/tici/esim.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index b70f615eba..3518e67e11 100644 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -80,7 +80,7 @@ class TiciLPA(LPABase): **note**: this is a **very** destructive operation. you **must** purchase a new comma SIM in order to use comma prime again. """ - if self._is_bootstrapped(): + if self.is_bootstrapped(): return for p in self.list_profiles(): From cfb7d679254205da183a5f5007df322bb147e2ca Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 21:43:57 -0700 Subject: [PATCH 16/18] retry lpa invocation --- system/hardware/tici/esim.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index 3518e67e11..2f37156e8d 100644 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -4,6 +4,7 @@ import shutil import subprocess from typing import Literal +from openpilot.common.retry import retry from openpilot.system.hardware.base import LPABase, LPAError, Profile class TiciLPA(LPABase): @@ -99,7 +100,11 @@ class TiciLPA(LPABase): def _check_bootstrapped(self) -> None: assert self.is_bootstrapped(), 'eUICC is not bootstrapped, please bootstrap before performing this operation' + @retry(attempts=3, delay=1.) def _invoke(self, *cmd: str): + """ + the lpac command sometimes fails if the eUICC is busy, so we retry a few times + """ proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) try: out, err = proc.communicate(timeout=self.timeout_sec) From 359db20ea405fec720608f4137a2b4b91bf1a2d9 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 21:52:02 -0700 Subject: [PATCH 17/18] lets figure out something better --- common/retry.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/common/retry.py b/common/retry.py index 9bd4ac9522..994780bfd2 100644 --- a/common/retry.py +++ b/common/retry.py @@ -1,21 +1,22 @@ import time import functools -from openpilot.common.swaglog import cloudlog - def retry(attempts=3, delay=1.0, ignore_failure=False): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): + for _ in range(attempts): try: return func(*args, **kwargs) except Exception: + from openpilot.common.swaglog import cloudlog cloudlog.exception(f"{func.__name__} failed, trying again") time.sleep(delay) if ignore_failure: + from openpilot.common.swaglog import cloudlog cloudlog.error(f"{func.__name__} failed after retry") else: raise Exception(f"{func.__name__} failed after retry") From ed5edee848d856e883075aacbfafd95cdf79764e Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Thu, 4 Sep 2025 21:57:41 -0700 Subject: [PATCH 18/18] no --- common/retry.py | 1 - 1 file changed, 1 deletion(-) diff --git a/common/retry.py b/common/retry.py index 994780bfd2..915871607c 100644 --- a/common/retry.py +++ b/common/retry.py @@ -6,7 +6,6 @@ def retry(attempts=3, delay=1.0, ignore_failure=False): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): - for _ in range(attempts): try: return func(*args, **kwargs)