diff --git a/system/hardware/base.py b/system/hardware/base.py index ce97bf294d..2e831c4c82 100644 --- a/system/hardware/base.py +++ b/system/hardware/base.py @@ -1,4 +1,5 @@ import os +import re from abc import abstractmethod, ABC from dataclasses import dataclass, fields @@ -96,6 +97,24 @@ class LPABase(ABC): def is_comma_profile(self, iccid: str) -> bool: return any(iccid.startswith(prefix) for prefix in ('8985235',)) + def _validate_iccid(self, iccid: str) -> None: + assert re.match(r'^89\d{17,18}$', iccid), 'invalid ICCID format. expected format: 8988303000000614227' + + def _validate_lpa_activation_code(self, lpa_activation_code: str) -> None: + assert re.match(r'^LPA:1\$.+\$.+$', lpa_activation_code), 'invalid LPA activation code format. expected format: LPA:1$domain$code' + + def _validate_nickname(self, nickname: str) -> None: + assert len(nickname) >= 1 and len(nickname) <= 16, 'nickname must be between 1 and 16 characters' + assert re.match(r'^[a-zA-Z0-9-_ ]+$', nickname), 'nickname must contain only alphanumeric characters, hyphens, underscores, and spaces' + + def _validate_profile_exists(self, iccid: str) -> None: + if not any(p.iccid == iccid for p in self.list_profiles()): + raise LPAProfileNotFoundError(f'profile {iccid} does not exist') + + def _validate_successful(self, msgs: list[dict]) -> None: + assert len(msgs) > 0, 'expected at least one message' + assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' + class HardwareBase(ABC): @staticmethod def get_cmdline() -> dict[str, str]: diff --git a/system/hardware/tests/test_lpa_validation.py b/system/hardware/tests/test_lpa_validation.py new file mode 100644 index 0000000000..5060b358b1 --- /dev/null +++ b/system/hardware/tests/test_lpa_validation.py @@ -0,0 +1,80 @@ +import pytest + +from openpilot.system.hardware.base import LPABase, LPAProfileNotFoundError, Profile + + +class TestLPABase(LPABase): + + def list_profiles(self) -> list[Profile]: + return [] + + def get_active_profile(self) -> Profile | None: + return None + + def delete_profile(self, iccid: str) -> None: + pass + + def download_profile(self, qr: str, nickname: str | None = None) -> None: + pass + + def nickname_profile(self, iccid: str, nickname: str) -> None: + pass + + def switch_profile(self, iccid: str) -> None: + pass + + +class TestLPAValidation: + + def setup_method(self): + self.lpa = TestLPABase() + + def test_validate_iccid(self): + self.lpa._validate_iccid('8988303000000614227') + + with pytest.raises(AssertionError, match='invalid ICCID format'): + self.lpa._validate_iccid('') + + with pytest.raises(AssertionError, match='invalid ICCID format'): + self.lpa._validate_iccid('1234567890123456789') # Doesn't start with 89 + + def test_validate_lpa_activation_code(self): + self.lpa._validate_lpa_activation_code('LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5') + + with pytest.raises(AssertionError, match='invalid LPA activation code format'): + self.lpa._validate_lpa_activation_code('') + + with pytest.raises(AssertionError, match='invalid LPA activation code format'): + self.lpa._validate_lpa_activation_code('LPA:1$domain.com') # Missing third part + + def test_validate_nickname(self): + self.lpa._validate_nickname('test_profile') + + with pytest.raises(AssertionError, match='nickname must be between 1 and 16 characters'): + self.lpa._validate_nickname('') + + with pytest.raises(AssertionError, match='nickname must contain only alphanumeric characters'): + self.lpa._validate_nickname('test.profile') # Contains invalid character + + def test_validate_successful(self): + self.lpa._validate_successful([{'payload': {'message': 'success'}}]) + + with pytest.raises(AssertionError, match='expected at least one message'): + self.lpa._validate_successful([]) + + with pytest.raises(AssertionError, match='expected success notification'): + self.lpa._validate_successful([{'payload': {'message': 'error'}}]) + + def test_validate_profile_exists(self, mocker): + existing_profiles = [Profile(iccid='8988303000000614227', nickname='test1', enabled=True, provider='Test Provider')] + + mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles) + self.lpa._validate_profile_exists('8988303000000614227') + + mocker.patch.object(self.lpa, 'list_profiles', return_value=[]) + with pytest.raises(LPAProfileNotFoundError, match='profile 8988303000000614227 does not exist'): + self.lpa._validate_profile_exists('8988303000000614227') + + mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles) + with pytest.raises(LPAProfileNotFoundError, match='profile 8988303000000614229 does not exist'): + self.lpa._validate_profile_exists('8988303000000614229') diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index 391ba45531..14dea4865d 100644 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -4,7 +4,7 @@ import shutil import subprocess from typing import Literal -from openpilot.system.hardware.base import LPABase, LPAError, LPAProfileNotFoundError, Profile +from openpilot.system.hardware.base import LPABase, LPAError, Profile class TiciLPA(LPABase): def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'): @@ -32,6 +32,7 @@ class TiciLPA(LPABase): return next((p for p in self.list_profiles() if p.enabled), None) def delete_profile(self, iccid: str) -> None: + self._validate_iccid(iccid) self._validate_profile_exists(iccid) latest = self.get_active_profile() if latest is not None and latest.iccid == iccid: @@ -39,9 +40,13 @@ class TiciLPA(LPABase): self._validate_successful(self._invoke('profile', 'delete', iccid)) self._process_notifications() - def download_profile(self, qr: str, nickname: str | None = None) -> None: + def download_profile(self, lpa_activation_code: str, nickname: str | None = None) -> None: self._check_bootstrapped() - msgs = self._invoke('profile', 'download', '-a', qr) + self._validate_lpa_activation_code(lpa_activation_code) + if nickname: + self._validate_nickname(nickname) + + msgs = self._invoke('profile', 'download', '-a', lpa_activation_code) self._validate_successful(msgs) new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None) if new_profile is None: @@ -51,11 +56,14 @@ class TiciLPA(LPABase): self._process_notifications() def nickname_profile(self, iccid: str, nickname: str) -> None: + self._validate_iccid(iccid) self._validate_profile_exists(iccid) + self._validate_nickname(nickname) self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) def switch_profile(self, iccid: str) -> None: self._check_bootstrapped() + self._validate_iccid(iccid) self._validate_profile_exists(iccid) latest = self.get_active_profile() if latest and latest.iccid == iccid: @@ -126,10 +134,3 @@ class TiciLPA(LPABase): Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier. """ self._validate_successful(self._invoke('notification', 'process', '-a', '-r')) - - def _validate_profile_exists(self, iccid: str) -> None: - if not any(p.iccid == iccid for p in self.list_profiles()): - raise LPAProfileNotFoundError(f'profile {iccid} does not exist') - - def _validate_successful(self, msgs: list[dict]) -> None: - assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'