From 46ab0fb23e55fe5c4f323d6892e15bcf0c3a29b0 Mon Sep 17 00:00:00 2001 From: Trey Moen Date: Tue, 20 May 2025 16:07:28 -0700 Subject: [PATCH] initial HITL test for eSIM provisioning --- Jenkinsfile | 1 + system/hardware/tici/esim.py | 117 ++++++++++++------------ system/hardware/tici/tests/test_esim.py | 71 ++++++++++++++ 3 files changed, 133 insertions(+), 56 deletions(-) create mode 100644 system/hardware/tici/tests/test_esim.py diff --git a/Jenkinsfile b/Jenkinsfile index b1a0746ea3..4af657ad73 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -268,6 +268,7 @@ node { step("test pandad spi", "pytest selfdrive/pandad/tests/test_pandad_spi.py"), step("test pandad", "pytest selfdrive/pandad/tests/test_pandad.py", [diffPaths: ["panda", "selfdrive/pandad/"]]), step("test amp", "pytest system/hardware/tici/tests/test_amplifier.py"), + step("test esim", "pytest system/hardware/tici/tests/test_esim.py"), step("test qcomgpsd", "pytest system/qcomgpsd/tests/test_qcomgpsd.py", [diffPaths: ["system/qcomgpsd/"]]), ]) }, diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index 876a1d3354..b7cb96d1dc 100755 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -7,19 +7,12 @@ import binascii import requests import serial import subprocess -from dataclasses import dataclass -@dataclass -class Profile: - iccid: str - isdp_aid: str - nickname: str - enabled: bool - provider: str - +class LPAError(RuntimeError): + pass -class LPAError(Exception): +class LPAProfileNotFoundError(LPAError): pass @@ -35,42 +28,47 @@ class LPA2: """ List all profiles on the eUICC. """ - raw = self._invoke('profile', 'list')[-1] # only one message + msgs = self._invoke('profile', 'list') + self.validate_successful(msgs) profiles = [] - for profile in raw['payload']['data']: - profiles.append(Profile( - iccid=profile['iccid'], - isdp_aid=profile['isdpAid'], - nickname=profile['profileNickname'], - enabled=profile['profileState'] == 'enabled', - provider=profile['serviceProviderName'], - )) + for profile in msgs[-1]['payload']['data']: + profiles.append({ + 'iccid': profile['iccid'], + 'isdp_aid': profile['isdpAid'], + 'nickname': profile['profileNickname'], + 'enabled': profile['profileState'] == 'enabled', + 'provider': profile['serviceProviderName'], + }) return profiles - def validate_successful(self, msgs: list[dict]) -> None: - """ - Validate that the last message is a success notification. - """ - assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' - - def profile_exists(self, iccid: str) -> bool: - """ - Check if a profile exists on the eUICC. - """ - return any(p.iccid == iccid for p in self.list_profiles()) - - def get_active_profile(self): + def get_active_profile(self) -> dict | None: """ Get the active profile on the eUICC. """ profiles = self.list_profiles() for profile in profiles: - if profile.enabled: + if profile['enabled']: return profile return None + def list_notifications(self) -> list[dict]: + """ + List notifications from the LPA. + """ + msgs = self._invoke('notification', 'list') + self.validate_successful(msgs) + notifications = [] + for notification in msgs[-1]['payload']['data']: + notifications.append({ + "sequence_number": notification['seqNumber'], + "profile_management_operation": notification['profileManagementOperation'], + "notification_address": notification['notificationAddress'], + "iccid": notification['iccid'], + }) + return notifications + def process_notifications(self) -> None: """ Process notifications from the LPA, typically to activate/deactivate the profile with the carrier. @@ -79,18 +77,16 @@ class LPA2: def enable_profile(self, iccid: str) -> None: """ - Enable the profile on the eUICC. + Enable the profile on the eUICC. Disables active profile if necessary. """ - if not self.profile_exists(iccid): - raise LPAError(f'profile {iccid} does not exist') - + self.validate_profile_exists(iccid) latest = self.get_active_profile() if latest is None: raise LPAError('no profile enabled') - if latest.iccid == iccid: + if latest['iccid'] == iccid: raise LPAError(f'profile {iccid} is already enabled') else: - self.disable_profile(latest.iccid) + self.disable_profile(latest['iccid']) self.validate_successful(self._invoke('profile', 'enable', iccid)) self.process_notifications() @@ -99,13 +95,11 @@ class LPA2: """ Disable the profile on the eUICC. """ - if not self.profile_exists(iccid): - raise LPAError(f'profile {iccid} does not exist') - + self.validate_profile_exists(iccid) latest = self.get_active_profile() if latest is None: raise LPAError('no profile enabled') - if latest.iccid != iccid: + if latest['iccid'] != iccid: raise LPAError(f'profile {iccid} is not enabled') self.validate_successful(self._invoke('profile', 'disable', iccid)) @@ -115,22 +109,13 @@ class LPA2: """ Delete the profile on the eUICC. """ - if not self.profile_exists(iccid): - raise LPAError(f'profile {iccid} does not exist') - + self.validate_profile_exists(iccid) + latest = self.get_active_profile() + if latest is not None and latest['iccid'] == iccid: + self.disable_profile(iccid) self.validate_successful(self._invoke('profile', 'delete', iccid)) self.process_notifications() - def nickname_profile(self, iccid: str, nickname: str) -> None: - """ - Set the nickname of the profile on the eUICC. - """ - if not self.profile_exists(iccid): - raise LPAError(f'profile {iccid} does not exist') - - self.validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) - self.process_notifications() - def download_profile(self, qr: str, nickname: str | None = None) -> None: """ Download the profile from the eUICC. @@ -144,6 +129,26 @@ class LPA2: self.nickname_profile(new_profile['payload']['data']['iccid'], nickname) self.process_notifications() + def nickname_profile(self, iccid: str, nickname: str) -> None: + """ + Set the nickname of the profile on the eUICC. + """ + self.validate_profile_exists(iccid) + self.validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) + + def validate_profile_exists(self, iccid: str) -> dict: + """ + Validate that the profile exists on the eUICC. + """ + if not any(p['iccid'] == iccid for p in self.list_profiles()): + raise LPAProfileNotFoundError(f'profile {iccid} does not exist') + + def validate_successful(self, msgs: list[dict]) -> None: + """ + Validate that the last message is a success notification. + """ + assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' + def _invoke(self, *cmd: str): print(f"invoking lpac {' '.join(list(cmd))}") ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) diff --git a/system/hardware/tici/tests/test_esim.py b/system/hardware/tici/tests/test_esim.py new file mode 100644 index 0000000000..7ae05217a9 --- /dev/null +++ b/system/hardware/tici/tests/test_esim.py @@ -0,0 +1,71 @@ +import pytest +import time + +from openpilot.system.hardware import TICI +from openpilot.system.hardware.tici.esim import LPA2, LPAProfileNotFoundError + + +class TestEsim: + """ + https://euicc-manual.osmocom.org/docs/rsp/known-test-profile + """ + TEST_ACTIVATION_CODE = 'LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5' + TEST_ICCID = '8944476500001944011' + TEST_NICKNAME = 'test_profile' + + @classmethod + def setup_class(cls): + if not TICI: + pytest.skip() + + def ensure_profile_deleted(self): + lpa = LPA2() + try: + lpa.delete_profile(self.TEST_ICCID) + except LPAProfileNotFoundError: + pass + assert not self._profile_exists(self.TEST_ICCID, self.TEST_NICKNAME) + + def setup_method(self): + self.ensure_profile_deleted() + + # clear out any pending notifications + lpa = LPA2() + lpa.process_notifications() + assert len(lpa.list_notifications()) == 0 + + def teardown_method(self): + self.ensure_profile_deleted() + + def test_list_profiles(self): + lpa = LPA2() + profiles = lpa.list_profiles() + assert profiles is not None + + def test_download_profile(self): + lpa = LPA2() + try: + lpa.delete_profile(self.TEST_ICCID) + except Exception as e: + print(e) + lpa.download_profile(self.TEST_ACTIVATION_CODE, self.TEST_NICKNAME) + assert self._profile_exists(self.TEST_ICCID, self.TEST_NICKNAME) + + self.enable_profile(lpa) + self.disable_profile(lpa) + + def enable_profile(self, lpa: LPA2): + lpa.enable_profile(self.TEST_ICCID) + current = lpa.get_active_profile() + assert current is not None + assert current['iccid'] == self.TEST_ICCID + + def disable_profile(self, lpa: LPA2): + lpa.disable_profile(self.TEST_ICCID) + current = lpa.get_active_profile() + assert current is None + + def _profile_exists(self, iccid: str, nickname: str) -> bool: + lpa = LPA2() + profiles = lpa.list_profiles() + return any(p['iccid'] == iccid and p['nickname'] == nickname for p in profiles)