diff --git a/common/retry.py b/common/retry.py index 9bd4ac9522..915871607c 100644 --- a/common/retry.py +++ b/common/retry.py @@ -1,8 +1,6 @@ import time import functools -from openpilot.common.swaglog import cloudlog - def retry(attempts=3, delay=1.0, ignore_failure=False): def decorator(func): @@ -12,10 +10,12 @@ def retry(attempts=3, delay=1.0, ignore_failure=False): try: return func(*args, **kwargs) except Exception: + from openpilot.common.swaglog import cloudlog cloudlog.exception(f"{func.__name__} failed, trying again") time.sleep(delay) if ignore_failure: + from openpilot.common.swaglog import cloudlog cloudlog.error(f"{func.__name__} failed after retry") else: raise Exception(f"{func.__name__} failed after retry") diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 6ed53b759c..61fb84bc3b 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -560,6 +560,40 @@ def getNetworks(): return HARDWARE.get_networks() +@dispatcher.add_method +def bootstrapSim(acknowledged: bool): + assert acknowledged, 'you must acknowledge the operation to proceed' + HARDWARE.get_sim_lpa().bootstrap() + + +@dispatcher.add_method +def describeSim(): + lpa = HARDWARE.get_sim_lpa() + profiles = [asdict(p) for p in lpa.list_profiles()] + is_bootstrapped = lpa.is_bootstrapped() + return { + "profiles": profiles, + "is_bootstrapped": is_bootstrapped + } + + +@dispatcher.add_method +def downloadSimProfile(lpa_activation_code: str, profile_name: str): + lpa = HARDWARE.get_sim_lpa() + lpa.validate_lpa_activation_code(lpa_activation_code) + lpa.validate_nickname(profile_name) + lpa.download_profile(lpa_activation_code, profile_name) + + +@dispatcher.add_method +def setSimProfile(iccid: str): + lpa = HARDWARE.get_sim_lpa() + lpa.validate_iccid(iccid) + lpa.validate_profile_exists(iccid) + lpa.switch_profile(iccid) + HARDWARE.reboot_modem() + + @dispatcher.add_method def takeSnapshot() -> str | dict[str, str] | None: from openpilot.system.camerad.snapshot import jpeg_write, snapshot diff --git a/system/hardware/base.py b/system/hardware/base.py index ce97bf294d..77b4a44bed 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -1,4 +1,5 @@ import os +import re from abc import abstractmethod, ABC from dataclasses import dataclass, fields @@ -9,15 +10,13 @@ NetworkType = log.DeviceState.NetworkType class LPAError(RuntimeError): pass -class LPAProfileNotFoundError(LPAError): - pass - @dataclass class Profile: iccid: str nickname: str enabled: bool provider: str + is_comma_profile: bool @dataclass class ThermalZone: @@ -69,6 +68,10 @@ class LPABase(ABC): def bootstrap(self) -> None: pass + @abstractmethod + def is_bootstrapped(self) -> bool: + pass + @abstractmethod def list_profiles(self) -> list[Profile]: pass @@ -96,6 +99,20 @@ class LPABase(ABC): def is_comma_profile(self, iccid: str) -> bool: return any(iccid.startswith(prefix) for prefix in ('8985235',)) + def validate_iccid(self, iccid: str) -> None: + # https://en.wikipedia.org/wiki/E.118#ICCID + assert re.match(r'^89\d{17,18}$', iccid), 'invalid ICCID format. expected format: 8988303000000614227' + + def validate_lpa_activation_code(self, lpa_activation_code: str) -> None: + assert re.match(r'^LPA:1\$.+\$.+$', lpa_activation_code), 'invalid LPA activation code format. expected format: LPA:1$domain$code' + + def validate_nickname(self, nickname: str) -> None: + assert len(nickname) >= 1 and len(nickname) <= 16, 'nickname must be between 1 and 16 characters' + assert re.match(r'^[a-zA-Z0-9-_ ]+$', nickname), 'nickname must contain only alphanumeric characters, hyphens, underscores, and spaces' + + def validate_profile_exists(self, iccid: str) -> None: + assert any(p.iccid == iccid for p in self.list_profiles()), f'profile {iccid} does not exist' + class HardwareBase(ABC): @staticmethod def get_cmdline() -> dict[str, str]: diff --git a/system/hardware/tests/test_lpa_validation.py b/system/hardware/tests/test_lpa_validation.py new file mode 100644 index 0000000000..11fec71f17 --- /dev/null +++ b/system/hardware/tests/test_lpa_validation.py @@ -0,0 +1,74 @@ +import pytest + +from openpilot.system.hardware.base import LPABase, Profile + + +class MockLPA(LPABase): + + def bootstrap(self) -> None: + pass + + def list_profiles(self) -> list[Profile]: + return [] + + def get_active_profile(self) -> Profile | None: + return None + + def delete_profile(self, iccid: str) -> None: + pass + + def download_profile(self, qr: str, nickname: str | None = None) -> None: + pass + + def nickname_profile(self, iccid: str, nickname: str) -> None: + pass + + def switch_profile(self, iccid: str) -> None: + pass + + +class TestLPAValidation: + + def setup_method(self): + self.lpa = MockLPA() + + def test_validate_iccid(self): + self.lpa.validate_iccid('8988303000000614227') + + with pytest.raises(AssertionError, match='invalid ICCID format'): + self.lpa.validate_iccid('') + + with pytest.raises(AssertionError, match='invalid ICCID format'): + self.lpa.validate_iccid('1234567890123456789') # Doesn't start with 89 + + def test_validate_lpa_activation_code(self): + self.lpa.validate_lpa_activation_code('LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5') + + with pytest.raises(AssertionError, match='invalid LPA activation code format'): + self.lpa.validate_lpa_activation_code('') + + with pytest.raises(AssertionError, match='invalid LPA activation code format'): + self.lpa.validate_lpa_activation_code('LPA:1$domain.com') # Missing third part + + def test_validate_nickname(self): + self.lpa.validate_nickname('test_profile') + + with pytest.raises(AssertionError, match='nickname must be between 1 and 16 characters'): + self.lpa.validate_nickname('') + + with pytest.raises(AssertionError, match='nickname must contain only alphanumeric characters'): + self.lpa.validate_nickname('test.profile') # Contains invalid character + + def test_validate_profile_exists(self, mocker): + existing_profiles = [Profile(iccid='8988303000000614227', nickname='test1', enabled=True, provider='Test Provider')] + + mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles) + self.lpa.validate_profile_exists('8988303000000614227') + + mocker.patch.object(self.lpa, 'list_profiles', return_value=[]) + with pytest.raises(AssertionError, match='profile 8988303000000614227 does not exist'): + self.lpa.validate_profile_exists('8988303000000614227') + + mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles) + with pytest.raises(AssertionError, match='profile 8988303000000614229 does not exist'): + self.lpa.validate_profile_exists('8988303000000614229') diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index 391ba45531..2f37156e8d 100644 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -4,7 +4,8 @@ import shutil import subprocess from typing import Literal -from openpilot.system.hardware.base import LPABase, LPAError, LPAProfileNotFoundError, Profile +from openpilot.common.retry import retry +from openpilot.system.hardware.base import LPABase, LPAError, Profile class TiciLPA(LPABase): def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'): @@ -25,23 +26,29 @@ class TiciLPA(LPABase): iccid=p['iccid'], nickname=p['profileNickname'], enabled=p['profileState'] == 'enabled', - provider=p['serviceProviderName'] + provider=p['serviceProviderName'], + is_comma_profile=self.is_comma_profile(p['iccid']) ) for p in msgs[-1]['payload']['data']] def get_active_profile(self) -> Profile | None: return next((p for p in self.list_profiles() if p.enabled), None) def delete_profile(self, iccid: str) -> None: - self._validate_profile_exists(iccid) + self.validate_iccid(iccid) + self.validate_profile_exists(iccid) latest = self.get_active_profile() if latest is not None and latest.iccid == iccid: raise LPAError('cannot delete active profile, switch to another profile first') self._validate_successful(self._invoke('profile', 'delete', iccid)) self._process_notifications() - def download_profile(self, qr: str, nickname: str | None = None) -> None: + def download_profile(self, lpa_activation_code: str, nickname: str | None = None) -> None: self._check_bootstrapped() - msgs = self._invoke('profile', 'download', '-a', qr) + self.validate_lpa_activation_code(lpa_activation_code) + if nickname: + self.validate_nickname(nickname) + + msgs = self._invoke('profile', 'download', '-a', lpa_activation_code) self._validate_successful(msgs) new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None) if new_profile is None: @@ -51,12 +58,15 @@ class TiciLPA(LPABase): self._process_notifications() def nickname_profile(self, iccid: str, nickname: str) -> None: - self._validate_profile_exists(iccid) + self.validate_iccid(iccid) + self.validate_profile_exists(iccid) + self.validate_nickname(nickname) self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) def switch_profile(self, iccid: str) -> None: self._check_bootstrapped() - self._validate_profile_exists(iccid) + self.validate_iccid(iccid) + self.validate_profile_exists(iccid) latest = self.get_active_profile() if latest and latest.iccid == iccid: return @@ -71,7 +81,7 @@ class TiciLPA(LPABase): **note**: this is a **very** destructive operation. you **must** purchase a new comma SIM in order to use comma prime again. """ - if self._is_bootstrapped(): + if self.is_bootstrapped(): return for p in self.list_profiles(): @@ -79,18 +89,22 @@ class TiciLPA(LPABase): self._disable_profile(p.iccid) self.delete_profile(p.iccid) + def is_bootstrapped(self) -> bool: + """ check if any comma provisioned profiles are on the eUICC """ + return not any(self.is_comma_profile(iccid) for iccid in (p.iccid for p in self.list_profiles())) + def _disable_profile(self, iccid: str) -> None: self._validate_successful(self._invoke('profile', 'disable', iccid)) self._process_notifications() def _check_bootstrapped(self) -> None: - assert self._is_bootstrapped(), 'eUICC is not bootstrapped, please bootstrap before performing this operation' - - def _is_bootstrapped(self) -> bool: - """ check if any comma provisioned profiles are on the eUICC """ - return not any(self.is_comma_profile(iccid) for iccid in (p.iccid for p in self.list_profiles())) + assert self.is_bootstrapped(), 'eUICC is not bootstrapped, please bootstrap before performing this operation' + @retry(attempts=3, delay=1.) def _invoke(self, *cmd: str): + """ + the lpac command sometimes fails if the eUICC is busy, so we retry a few times + """ proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) try: out, err = proc.communicate(timeout=self.timeout_sec) @@ -127,9 +141,6 @@ class TiciLPA(LPABase): """ self._validate_successful(self._invoke('notification', 'process', '-a', '-r')) - def _validate_profile_exists(self, iccid: str) -> None: - if not any(p.iccid == iccid for p in self.list_profiles()): - raise LPAProfileNotFoundError(f'profile {iccid} does not exist') - def _validate_successful(self, msgs: list[dict]) -> None: + assert len(msgs) > 0, 'expected at least one message' assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' diff --git a/system/hardware/tici/tests/test_esim.py b/system/hardware/tici/tests/test_esim.py index 6fab931cce..c6557a7801 100644 --- a/system/hardware/tici/tests/test_esim.py +++ b/system/hardware/tici/tests/test_esim.py @@ -1,7 +1,6 @@ import pytest from openpilot.system.hardware import HARDWARE, TICI -from openpilot.system.hardware.base import LPAProfileNotFoundError # https://euicc-manual.osmocom.org/docs/rsp/known-test-profile # iccid is always the same for the given activation code @@ -14,7 +13,7 @@ def cleanup(): lpa = HARDWARE.get_sim_lpa() try: lpa.delete_profile(TEST_ICCID) - except LPAProfileNotFoundError: + except AssertionError: pass lpa.process_notifications()