initial HITL test for eSIM provisioning

Trey Moen 1 week ago
parent dfb4e206aa
commit 46ab0fb23e
  1. 1
      Jenkinsfile
  2. 117
      system/hardware/tici/esim.py
  3. 71
      system/hardware/tici/tests/test_esim.py

1
Jenkinsfile vendored

@ -268,6 +268,7 @@ node {
step("test pandad spi", "pytest selfdrive/pandad/tests/test_pandad_spi.py"), step("test pandad spi", "pytest selfdrive/pandad/tests/test_pandad_spi.py"),
step("test pandad", "pytest selfdrive/pandad/tests/test_pandad.py", [diffPaths: ["panda", "selfdrive/pandad/"]]), step("test pandad", "pytest selfdrive/pandad/tests/test_pandad.py", [diffPaths: ["panda", "selfdrive/pandad/"]]),
step("test amp", "pytest system/hardware/tici/tests/test_amplifier.py"), step("test amp", "pytest system/hardware/tici/tests/test_amplifier.py"),
step("test esim", "pytest system/hardware/tici/tests/test_esim.py"),
step("test qcomgpsd", "pytest system/qcomgpsd/tests/test_qcomgpsd.py", [diffPaths: ["system/qcomgpsd/"]]), step("test qcomgpsd", "pytest system/qcomgpsd/tests/test_qcomgpsd.py", [diffPaths: ["system/qcomgpsd/"]]),
]) ])
}, },

@ -7,19 +7,12 @@ import binascii
import requests import requests
import serial import serial
import subprocess import subprocess
from dataclasses import dataclass
@dataclass class LPAError(RuntimeError):
class Profile: pass
iccid: str
isdp_aid: str
nickname: str
enabled: bool
provider: str
class LPAError(Exception): class LPAProfileNotFoundError(LPAError):
pass pass
@ -35,42 +28,47 @@ class LPA2:
""" """
List all profiles on the eUICC. List all profiles on the eUICC.
""" """
raw = self._invoke('profile', 'list')[-1] # only one message msgs = self._invoke('profile', 'list')
self.validate_successful(msgs)
profiles = [] profiles = []
for profile in raw['payload']['data']: for profile in msgs[-1]['payload']['data']:
profiles.append(Profile( profiles.append({
iccid=profile['iccid'], 'iccid': profile['iccid'],
isdp_aid=profile['isdpAid'], 'isdp_aid': profile['isdpAid'],
nickname=profile['profileNickname'], 'nickname': profile['profileNickname'],
enabled=profile['profileState'] == 'enabled', 'enabled': profile['profileState'] == 'enabled',
provider=profile['serviceProviderName'], 'provider': profile['serviceProviderName'],
)) })
return profiles return profiles
def validate_successful(self, msgs: list[dict]) -> None: def get_active_profile(self) -> dict | None:
"""
Validate that the last message is a success notification.
"""
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
def profile_exists(self, iccid: str) -> bool:
"""
Check if a profile exists on the eUICC.
"""
return any(p.iccid == iccid for p in self.list_profiles())
def get_active_profile(self):
""" """
Get the active profile on the eUICC. Get the active profile on the eUICC.
""" """
profiles = self.list_profiles() profiles = self.list_profiles()
for profile in profiles: for profile in profiles:
if profile.enabled: if profile['enabled']:
return profile return profile
return None return None
def list_notifications(self) -> list[dict]:
"""
List notifications from the LPA.
"""
msgs = self._invoke('notification', 'list')
self.validate_successful(msgs)
notifications = []
for notification in msgs[-1]['payload']['data']:
notifications.append({
"sequence_number": notification['seqNumber'],
"profile_management_operation": notification['profileManagementOperation'],
"notification_address": notification['notificationAddress'],
"iccid": notification['iccid'],
})
return notifications
def process_notifications(self) -> None: def process_notifications(self) -> None:
""" """
Process notifications from the LPA, typically to activate/deactivate the profile with the carrier. Process notifications from the LPA, typically to activate/deactivate the profile with the carrier.
@ -79,18 +77,16 @@ class LPA2:
def enable_profile(self, iccid: str) -> None: def enable_profile(self, iccid: str) -> None:
""" """
Enable the profile on the eUICC. Enable the profile on the eUICC. Disables active profile if necessary.
""" """
if not self.profile_exists(iccid): self.validate_profile_exists(iccid)
raise LPAError(f'profile {iccid} does not exist')
latest = self.get_active_profile() latest = self.get_active_profile()
if latest is None: if latest is None:
raise LPAError('no profile enabled') raise LPAError('no profile enabled')
if latest.iccid == iccid: if latest['iccid'] == iccid:
raise LPAError(f'profile {iccid} is already enabled') raise LPAError(f'profile {iccid} is already enabled')
else: else:
self.disable_profile(latest.iccid) self.disable_profile(latest['iccid'])
self.validate_successful(self._invoke('profile', 'enable', iccid)) self.validate_successful(self._invoke('profile', 'enable', iccid))
self.process_notifications() self.process_notifications()
@ -99,13 +95,11 @@ class LPA2:
""" """
Disable the profile on the eUICC. Disable the profile on the eUICC.
""" """
if not self.profile_exists(iccid): self.validate_profile_exists(iccid)
raise LPAError(f'profile {iccid} does not exist')
latest = self.get_active_profile() latest = self.get_active_profile()
if latest is None: if latest is None:
raise LPAError('no profile enabled') raise LPAError('no profile enabled')
if latest.iccid != iccid: if latest['iccid'] != iccid:
raise LPAError(f'profile {iccid} is not enabled') raise LPAError(f'profile {iccid} is not enabled')
self.validate_successful(self._invoke('profile', 'disable', iccid)) self.validate_successful(self._invoke('profile', 'disable', iccid))
@ -115,22 +109,13 @@ class LPA2:
""" """
Delete the profile on the eUICC. Delete the profile on the eUICC.
""" """
if not self.profile_exists(iccid): self.validate_profile_exists(iccid)
raise LPAError(f'profile {iccid} does not exist') latest = self.get_active_profile()
if latest is not None and latest['iccid'] == iccid:
self.disable_profile(iccid)
self.validate_successful(self._invoke('profile', 'delete', iccid)) self.validate_successful(self._invoke('profile', 'delete', iccid))
self.process_notifications() self.process_notifications()
def nickname_profile(self, iccid: str, nickname: str) -> None:
"""
Set the nickname of the profile on the eUICC.
"""
if not self.profile_exists(iccid):
raise LPAError(f'profile {iccid} does not exist')
self.validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
self.process_notifications()
def download_profile(self, qr: str, nickname: str | None = None) -> None: def download_profile(self, qr: str, nickname: str | None = None) -> None:
""" """
Download the profile from the eUICC. Download the profile from the eUICC.
@ -144,6 +129,26 @@ class LPA2:
self.nickname_profile(new_profile['payload']['data']['iccid'], nickname) self.nickname_profile(new_profile['payload']['data']['iccid'], nickname)
self.process_notifications() self.process_notifications()
def nickname_profile(self, iccid: str, nickname: str) -> None:
"""
Set the nickname of the profile on the eUICC.
"""
self.validate_profile_exists(iccid)
self.validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
def validate_profile_exists(self, iccid: str) -> dict:
"""
Validate that the profile exists on the eUICC.
"""
if not any(p['iccid'] == iccid for p in self.list_profiles()):
raise LPAProfileNotFoundError(f'profile {iccid} does not exist')
def validate_successful(self, msgs: list[dict]) -> None:
"""
Validate that the last message is a success notification.
"""
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
def _invoke(self, *cmd: str): def _invoke(self, *cmd: str):
print(f"invoking lpac {' '.join(list(cmd))}") print(f"invoking lpac {' '.join(list(cmd))}")
ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)

@ -0,0 +1,71 @@
import pytest
import time
from openpilot.system.hardware import TICI
from openpilot.system.hardware.tici.esim import LPA2, LPAProfileNotFoundError
class TestEsim:
"""
https://euicc-manual.osmocom.org/docs/rsp/known-test-profile
"""
TEST_ACTIVATION_CODE = 'LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5'
TEST_ICCID = '8944476500001944011'
TEST_NICKNAME = 'test_profile'
@classmethod
def setup_class(cls):
if not TICI:
pytest.skip()
def ensure_profile_deleted(self):
lpa = LPA2()
try:
lpa.delete_profile(self.TEST_ICCID)
except LPAProfileNotFoundError:
pass
assert not self._profile_exists(self.TEST_ICCID, self.TEST_NICKNAME)
def setup_method(self):
self.ensure_profile_deleted()
# clear out any pending notifications
lpa = LPA2()
lpa.process_notifications()
assert len(lpa.list_notifications()) == 0
def teardown_method(self):
self.ensure_profile_deleted()
def test_list_profiles(self):
lpa = LPA2()
profiles = lpa.list_profiles()
assert profiles is not None
def test_download_profile(self):
lpa = LPA2()
try:
lpa.delete_profile(self.TEST_ICCID)
except Exception as e:
print(e)
lpa.download_profile(self.TEST_ACTIVATION_CODE, self.TEST_NICKNAME)
assert self._profile_exists(self.TEST_ICCID, self.TEST_NICKNAME)
self.enable_profile(lpa)
self.disable_profile(lpa)
def enable_profile(self, lpa: LPA2):
lpa.enable_profile(self.TEST_ICCID)
current = lpa.get_active_profile()
assert current is not None
assert current['iccid'] == self.TEST_ICCID
def disable_profile(self, lpa: LPA2):
lpa.disable_profile(self.TEST_ICCID)
current = lpa.get_active_profile()
assert current is None
def _profile_exists(self, iccid: str, nickname: str) -> bool:
lpa = LPA2()
profiles = lpa.list_profiles()
return any(p['iccid'] == iccid and p['nickname'] == nickname for p in profiles)
Loading…
Cancel
Save