diff --git a/system/hardware/base.py b/system/hardware/base.py index ce97bf294d..269c4022a9 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -1,4 +1,5 @@ import os +import re from abc import abstractmethod, ABC from dataclasses import dataclass, fields @@ -9,9 +10,6 @@ NetworkType = log.DeviceState.NetworkType class LPAError(RuntimeError): pass -class LPAProfileNotFoundError(LPAError): - pass - @dataclass class Profile: iccid: str @@ -96,6 +94,20 @@ class LPABase(ABC): def is_comma_profile(self, iccid: str) -> bool: return any(iccid.startswith(prefix) for prefix in ('8985235',)) + def validate_iccid(self, iccid: str) -> None: + # https://en.wikipedia.org/wiki/E.118#ICCID + assert re.match(r'^89\d{17,18}$', iccid), 'invalid ICCID format. expected format: 8988303000000614227' + + def validate_lpa_activation_code(self, lpa_activation_code: str) -> None: + assert re.match(r'^LPA:1\$.+\$.+$', lpa_activation_code), 'invalid LPA activation code format. expected format: LPA:1$domain$code' + + def validate_nickname(self, nickname: str) -> None: + assert len(nickname) >= 1 and len(nickname) <= 16, 'nickname must be between 1 and 16 characters' + assert re.match(r'^[a-zA-Z0-9-_ ]+$', nickname), 'nickname must contain only alphanumeric characters, hyphens, underscores, and spaces' + + def validate_profile_exists(self, iccid: str) -> None: + assert any(p.iccid == iccid for p in self.list_profiles()), f'profile {iccid} does not exist' + class HardwareBase(ABC): @staticmethod def get_cmdline() -> dict[str, str]: diff --git a/system/hardware/tests/test_lpa_validation.py b/system/hardware/tests/test_lpa_validation.py new file mode 100644 index 0000000000..11fec71f17 --- /dev/null +++ b/system/hardware/tests/test_lpa_validation.py @@ -0,0 +1,74 @@ +import pytest + +from openpilot.system.hardware.base import LPABase, Profile + + +class MockLPA(LPABase): + + def bootstrap(self) -> None: + pass + + def list_profiles(self) -> list[Profile]: + return [] + + def get_active_profile(self) -> Profile | None: + return None + + def delete_profile(self, iccid: str) -> None: + pass + + def download_profile(self, qr: str, nickname: str | None = None) -> None: + pass + + def nickname_profile(self, iccid: str, nickname: str) -> None: + pass + + def switch_profile(self, iccid: str) -> None: + pass + + +class TestLPAValidation: + + def setup_method(self): + self.lpa = MockLPA() + + def test_validate_iccid(self): + self.lpa.validate_iccid('8988303000000614227') + + with pytest.raises(AssertionError, match='invalid ICCID format'): + self.lpa.validate_iccid('') + + with pytest.raises(AssertionError, match='invalid ICCID format'): + self.lpa.validate_iccid('1234567890123456789') # Doesn't start with 89 + + def test_validate_lpa_activation_code(self): + self.lpa.validate_lpa_activation_code('LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5') + + with pytest.raises(AssertionError, match='invalid LPA activation code format'): + self.lpa.validate_lpa_activation_code('') + + with pytest.raises(AssertionError, match='invalid LPA activation code format'): + self.lpa.validate_lpa_activation_code('LPA:1$domain.com') # Missing third part + + def test_validate_nickname(self): + self.lpa.validate_nickname('test_profile') + + with pytest.raises(AssertionError, match='nickname must be between 1 and 16 characters'): + self.lpa.validate_nickname('') + + with pytest.raises(AssertionError, match='nickname must contain only alphanumeric characters'): + self.lpa.validate_nickname('test.profile') # Contains invalid character + + def test_validate_profile_exists(self, mocker): + existing_profiles = [Profile(iccid='8988303000000614227', nickname='test1', enabled=True, provider='Test Provider')] + + mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles) + self.lpa.validate_profile_exists('8988303000000614227') + + mocker.patch.object(self.lpa, 'list_profiles', return_value=[]) + with pytest.raises(AssertionError, match='profile 8988303000000614227 does not exist'): + self.lpa.validate_profile_exists('8988303000000614227') + + mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles) + with pytest.raises(AssertionError, match='profile 8988303000000614229 does not exist'): + self.lpa.validate_profile_exists('8988303000000614229') diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index 391ba45531..4203a35456 100644 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -4,7 +4,7 @@ import shutil import subprocess from typing import Literal -from openpilot.system.hardware.base import LPABase, LPAError, LPAProfileNotFoundError, Profile +from openpilot.system.hardware.base import LPABase, LPAError, Profile class TiciLPA(LPABase): def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'): @@ -32,16 +32,21 @@ class TiciLPA(LPABase): return next((p for p in self.list_profiles() if p.enabled), None) def delete_profile(self, iccid: str) -> None: - self._validate_profile_exists(iccid) + self.validate_iccid(iccid) + self.validate_profile_exists(iccid) latest = self.get_active_profile() if latest is not None and latest.iccid == iccid: raise LPAError('cannot delete active profile, switch to another profile first') self._validate_successful(self._invoke('profile', 'delete', iccid)) self._process_notifications() - def download_profile(self, qr: str, nickname: str | None = None) -> None: + def download_profile(self, lpa_activation_code: str, nickname: str | None = None) -> None: self._check_bootstrapped() - msgs = self._invoke('profile', 'download', '-a', qr) + self.validate_lpa_activation_code(lpa_activation_code) + if nickname: + self.validate_nickname(nickname) + + msgs = self._invoke('profile', 'download', '-a', lpa_activation_code) self._validate_successful(msgs) new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None) if new_profile is None: @@ -51,12 +56,15 @@ class TiciLPA(LPABase): self._process_notifications() def nickname_profile(self, iccid: str, nickname: str) -> None: - self._validate_profile_exists(iccid) + self.validate_iccid(iccid) + self.validate_profile_exists(iccid) + self.validate_nickname(nickname) self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) def switch_profile(self, iccid: str) -> None: self._check_bootstrapped() - self._validate_profile_exists(iccid) + self.validate_iccid(iccid) + self.validate_profile_exists(iccid) latest = self.get_active_profile() if latest and latest.iccid == iccid: return @@ -127,9 +135,6 @@ class TiciLPA(LPABase): """ self._validate_successful(self._invoke('notification', 'process', '-a', '-r')) - def _validate_profile_exists(self, iccid: str) -> None: - if not any(p.iccid == iccid for p in self.list_profiles()): - raise LPAProfileNotFoundError(f'profile {iccid} does not exist') - def _validate_successful(self, msgs: list[dict]) -> None: + assert len(msgs) > 0, 'expected at least one message' assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' diff --git a/system/hardware/tici/tests/test_esim.py b/system/hardware/tici/tests/test_esim.py index 6fab931cce..c6557a7801 100644 --- a/system/hardware/tici/tests/test_esim.py +++ b/system/hardware/tici/tests/test_esim.py @@ -1,7 +1,6 @@ import pytest from openpilot.system.hardware import HARDWARE, TICI -from openpilot.system.hardware.base import LPAProfileNotFoundError # https://euicc-manual.osmocom.org/docs/rsp/known-test-profile # iccid is always the same for the given activation code @@ -14,7 +13,7 @@ def cleanup(): lpa = HARDWARE.get_sim_lpa() try: lpa.delete_profile(TEST_ICCID) - except LPAProfileNotFoundError: + except AssertionError: pass lpa.process_notifications()