Trey Moen 2 days ago committed by GitHub
commit 72d6cecbbd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      common/retry.py
  2. 34
      system/athena/athenad.py
  3. 23
      system/hardware/base.py
  4. 74
      system/hardware/tests/test_lpa_validation.py
  5. 45
      system/hardware/tici/esim.py
  6. 3
      system/hardware/tici/tests/test_esim.py

@ -1,8 +1,6 @@
import time
import functools
from openpilot.common.swaglog import cloudlog
def retry(attempts=3, delay=1.0, ignore_failure=False):
def decorator(func):
@ -12,10 +10,12 @@ def retry(attempts=3, delay=1.0, ignore_failure=False):
try:
return func(*args, **kwargs)
except Exception:
from openpilot.common.swaglog import cloudlog
cloudlog.exception(f"{func.__name__} failed, trying again")
time.sleep(delay)
if ignore_failure:
from openpilot.common.swaglog import cloudlog
cloudlog.error(f"{func.__name__} failed after retry")
else:
raise Exception(f"{func.__name__} failed after retry")

@ -560,6 +560,40 @@ def getNetworks():
return HARDWARE.get_networks()
@dispatcher.add_method
def bootstrapSim(acknowledged: bool):
assert acknowledged, 'you must acknowledge the operation to proceed'
HARDWARE.get_sim_lpa().bootstrap()
@dispatcher.add_method
def describeSim():
lpa = HARDWARE.get_sim_lpa()
profiles = [asdict(p) for p in lpa.list_profiles()]
is_bootstrapped = lpa.is_bootstrapped()
return {
"profiles": profiles,
"is_bootstrapped": is_bootstrapped
}
@dispatcher.add_method
def downloadSimProfile(lpa_activation_code: str, profile_name: str):
lpa = HARDWARE.get_sim_lpa()
lpa.validate_lpa_activation_code(lpa_activation_code)
lpa.validate_nickname(profile_name)
lpa.download_profile(lpa_activation_code, profile_name)
@dispatcher.add_method
def setSimProfile(iccid: str):
lpa = HARDWARE.get_sim_lpa()
lpa.validate_iccid(iccid)
lpa.validate_profile_exists(iccid)
lpa.switch_profile(iccid)
HARDWARE.reboot_modem()
@dispatcher.add_method
def takeSnapshot() -> str | dict[str, str] | None:
from openpilot.system.camerad.snapshot import jpeg_write, snapshot

@ -1,4 +1,5 @@
import os
import re
from abc import abstractmethod, ABC
from dataclasses import dataclass, fields
@ -9,15 +10,13 @@ NetworkType = log.DeviceState.NetworkType
class LPAError(RuntimeError):
pass
class LPAProfileNotFoundError(LPAError):
pass
@dataclass
class Profile:
iccid: str
nickname: str
enabled: bool
provider: str
is_comma_profile: bool
@dataclass
class ThermalZone:
@ -69,6 +68,10 @@ class LPABase(ABC):
def bootstrap(self) -> None:
pass
@abstractmethod
def is_bootstrapped(self) -> bool:
pass
@abstractmethod
def list_profiles(self) -> list[Profile]:
pass
@ -96,6 +99,20 @@ class LPABase(ABC):
def is_comma_profile(self, iccid: str) -> bool:
return any(iccid.startswith(prefix) for prefix in ('8985235',))
def validate_iccid(self, iccid: str) -> None:
# https://en.wikipedia.org/wiki/E.118#ICCID
assert re.match(r'^89\d{17,18}$', iccid), 'invalid ICCID format. expected format: 8988303000000614227'
def validate_lpa_activation_code(self, lpa_activation_code: str) -> None:
assert re.match(r'^LPA:1\$.+\$.+$', lpa_activation_code), 'invalid LPA activation code format. expected format: LPA:1$domain$code'
def validate_nickname(self, nickname: str) -> None:
assert len(nickname) >= 1 and len(nickname) <= 16, 'nickname must be between 1 and 16 characters'
assert re.match(r'^[a-zA-Z0-9-_ ]+$', nickname), 'nickname must contain only alphanumeric characters, hyphens, underscores, and spaces'
def validate_profile_exists(self, iccid: str) -> None:
assert any(p.iccid == iccid for p in self.list_profiles()), f'profile {iccid} does not exist'
class HardwareBase(ABC):
@staticmethod
def get_cmdline() -> dict[str, str]:

@ -0,0 +1,74 @@
import pytest
from openpilot.system.hardware.base import LPABase, Profile
class MockLPA(LPABase):
def bootstrap(self) -> None:
pass
def list_profiles(self) -> list[Profile]:
return []
def get_active_profile(self) -> Profile | None:
return None
def delete_profile(self, iccid: str) -> None:
pass
def download_profile(self, qr: str, nickname: str | None = None) -> None:
pass
def nickname_profile(self, iccid: str, nickname: str) -> None:
pass
def switch_profile(self, iccid: str) -> None:
pass
class TestLPAValidation:
def setup_method(self):
self.lpa = MockLPA()
def test_validate_iccid(self):
self.lpa.validate_iccid('8988303000000614227')
with pytest.raises(AssertionError, match='invalid ICCID format'):
self.lpa.validate_iccid('')
with pytest.raises(AssertionError, match='invalid ICCID format'):
self.lpa.validate_iccid('1234567890123456789') # Doesn't start with 89
def test_validate_lpa_activation_code(self):
self.lpa.validate_lpa_activation_code('LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5')
with pytest.raises(AssertionError, match='invalid LPA activation code format'):
self.lpa.validate_lpa_activation_code('')
with pytest.raises(AssertionError, match='invalid LPA activation code format'):
self.lpa.validate_lpa_activation_code('LPA:1$domain.com') # Missing third part
def test_validate_nickname(self):
self.lpa.validate_nickname('test_profile')
with pytest.raises(AssertionError, match='nickname must be between 1 and 16 characters'):
self.lpa.validate_nickname('')
with pytest.raises(AssertionError, match='nickname must contain only alphanumeric characters'):
self.lpa.validate_nickname('test.profile') # Contains invalid character
def test_validate_profile_exists(self, mocker):
existing_profiles = [Profile(iccid='8988303000000614227', nickname='test1', enabled=True, provider='Test Provider')]
mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles)
self.lpa.validate_profile_exists('8988303000000614227')
mocker.patch.object(self.lpa, 'list_profiles', return_value=[])
with pytest.raises(AssertionError, match='profile 8988303000000614227 does not exist'):
self.lpa.validate_profile_exists('8988303000000614227')
mocker.patch.object(self.lpa, 'list_profiles', return_value=existing_profiles)
with pytest.raises(AssertionError, match='profile 8988303000000614229 does not exist'):
self.lpa.validate_profile_exists('8988303000000614229')

@ -4,7 +4,8 @@ import shutil
import subprocess
from typing import Literal
from openpilot.system.hardware.base import LPABase, LPAError, LPAProfileNotFoundError, Profile
from openpilot.common.retry import retry
from openpilot.system.hardware.base import LPABase, LPAError, Profile
class TiciLPA(LPABase):
def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'):
@ -25,23 +26,29 @@ class TiciLPA(LPABase):
iccid=p['iccid'],
nickname=p['profileNickname'],
enabled=p['profileState'] == 'enabled',
provider=p['serviceProviderName']
provider=p['serviceProviderName'],
is_comma_profile=self.is_comma_profile(p['iccid'])
) for p in msgs[-1]['payload']['data']]
def get_active_profile(self) -> Profile | None:
return next((p for p in self.list_profiles() if p.enabled), None)
def delete_profile(self, iccid: str) -> None:
self._validate_profile_exists(iccid)
self.validate_iccid(iccid)
self.validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest is not None and latest.iccid == iccid:
raise LPAError('cannot delete active profile, switch to another profile first')
self._validate_successful(self._invoke('profile', 'delete', iccid))
self._process_notifications()
def download_profile(self, qr: str, nickname: str | None = None) -> None:
def download_profile(self, lpa_activation_code: str, nickname: str | None = None) -> None:
self._check_bootstrapped()
msgs = self._invoke('profile', 'download', '-a', qr)
self.validate_lpa_activation_code(lpa_activation_code)
if nickname:
self.validate_nickname(nickname)
msgs = self._invoke('profile', 'download', '-a', lpa_activation_code)
self._validate_successful(msgs)
new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None)
if new_profile is None:
@ -51,12 +58,15 @@ class TiciLPA(LPABase):
self._process_notifications()
def nickname_profile(self, iccid: str, nickname: str) -> None:
self._validate_profile_exists(iccid)
self.validate_iccid(iccid)
self.validate_profile_exists(iccid)
self.validate_nickname(nickname)
self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname))
def switch_profile(self, iccid: str) -> None:
self._check_bootstrapped()
self._validate_profile_exists(iccid)
self.validate_iccid(iccid)
self.validate_profile_exists(iccid)
latest = self.get_active_profile()
if latest and latest.iccid == iccid:
return
@ -71,7 +81,7 @@ class TiciLPA(LPABase):
**note**: this is a **very** destructive operation. you **must** purchase a new comma SIM in order
to use comma prime again.
"""
if self._is_bootstrapped():
if self.is_bootstrapped():
return
for p in self.list_profiles():
@ -79,18 +89,22 @@ class TiciLPA(LPABase):
self._disable_profile(p.iccid)
self.delete_profile(p.iccid)
def is_bootstrapped(self) -> bool:
""" check if any comma provisioned profiles are on the eUICC """
return not any(self.is_comma_profile(iccid) for iccid in (p.iccid for p in self.list_profiles()))
def _disable_profile(self, iccid: str) -> None:
self._validate_successful(self._invoke('profile', 'disable', iccid))
self._process_notifications()
def _check_bootstrapped(self) -> None:
assert self._is_bootstrapped(), 'eUICC is not bootstrapped, please bootstrap before performing this operation'
def _is_bootstrapped(self) -> bool:
""" check if any comma provisioned profiles are on the eUICC """
return not any(self.is_comma_profile(iccid) for iccid in (p.iccid for p in self.list_profiles()))
assert self.is_bootstrapped(), 'eUICC is not bootstrapped, please bootstrap before performing this operation'
@retry(attempts=3, delay=1.)
def _invoke(self, *cmd: str):
"""
the lpac command sometimes fails if the eUICC is busy, so we retry a few times
"""
proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
try:
out, err = proc.communicate(timeout=self.timeout_sec)
@ -127,9 +141,6 @@ class TiciLPA(LPABase):
"""
self._validate_successful(self._invoke('notification', 'process', '-a', '-r'))
def _validate_profile_exists(self, iccid: str) -> None:
if not any(p.iccid == iccid for p in self.list_profiles()):
raise LPAProfileNotFoundError(f'profile {iccid} does not exist')
def _validate_successful(self, msgs: list[dict]) -> None:
assert len(msgs) > 0, 'expected at least one message'
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'

@ -1,7 +1,6 @@
import pytest
from openpilot.system.hardware import HARDWARE, TICI
from openpilot.system.hardware.base import LPAProfileNotFoundError
# https://euicc-manual.osmocom.org/docs/rsp/known-test-profile
# iccid is always the same for the given activation code
@ -14,7 +13,7 @@ def cleanup():
lpa = HARDWARE.get_sim_lpa()
try:
lpa.delete_profile(TEST_ICCID)
except LPAProfileNotFoundError:
except AssertionError:
pass
lpa.process_notifications()

Loading…
Cancel
Save