diff --git a/Jenkinsfile b/Jenkinsfile index b1a0746ea3..a14bf59299 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -268,6 +268,8 @@ node { step("test pandad spi", "pytest selfdrive/pandad/tests/test_pandad_spi.py"), step("test pandad", "pytest selfdrive/pandad/tests/test_pandad.py", [diffPaths: ["panda", "selfdrive/pandad/"]]), step("test amp", "pytest system/hardware/tici/tests/test_amplifier.py"), + // TODO: enable once new AGNOS is available + // step("test esim", "pytest system/hardware/tici/tests/test_esim.py"), step("test qcomgpsd", "pytest system/qcomgpsd/tests/test_qcomgpsd.py", [diffPaths: ["system/qcomgpsd/"]]), ]) }, diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py index df76c1a5fd..0a07e77e16 100755 --- a/system/hardware/tici/esim.py +++ b/system/hardware/tici/esim.py @@ -1,115 +1,170 @@ #!/usr/bin/env python3 + +import argparse +import json import os -import math -import time -import binascii -import requests -import serial +import shutil import subprocess +from dataclasses import dataclass +from typing import Literal + +@dataclass +class Profile: + iccid: str + nickname: str + enabled: bool + provider: str +class LPAError(RuntimeError): + pass -def post(url, payload): - print() - print("POST to", url) - r = requests.post( - url, - data=payload, - verify=False, - headers={ - "Content-Type": "application/json", - "X-Admin-Protocol": "gsma/rsp/v2.2.0", - "charset": "utf-8", - "User-Agent": "gsma-rsp-lpad", - }, - ) - print("resp", r) - print("resp text", repr(r.text)) - print() - r.raise_for_status() - - ret = f"HTTP/1.1 {r.status_code}" - ret += ''.join(f"{k}: {v}" for k, v in r.headers.items() if k != 'Connection') - return ret.encode() + r.content +class LPAProfileNotFoundError(LPAError): + pass class LPA: - def __init__(self): - self.dev = serial.Serial('/dev/ttyUSB2', baudrate=57600, timeout=1, bytesize=8) - self.dev.reset_input_buffer() - self.dev.reset_output_buffer() - assert "OK" in self.at("AT") - - def at(self, cmd): - print(f"==> {cmd}") - self.dev.write(cmd.encode() + b'\r\n') - - r = b"" - cnt = 0 - while b"OK" not in r and b"ERROR" not in r and cnt < 20: - r += self.dev.read(8192).strip() - cnt += 1 - r = r.decode() - print(f"<== {repr(r)}") - return r - - def download_ota(self, qr): - return self.at(f'AT+QESIM="ota","{qr}"') - - def download(self, qr): - smdp = qr.split('$')[1] - out = self.at(f'AT+QESIM="download","{qr}"') - for _ in range(5): - line = out.split("+QESIM: ")[1].split("\r\n\r\nOK")[0] - - parts = [x.strip().strip('"') for x in line.split(',', maxsplit=4)] - print(repr(parts)) - trans, ret, url, payloadlen, payload = parts - assert trans == "trans" and ret == "0" - assert len(payload) == int(payloadlen) - - r = post(f"https://{smdp}/{url}", payload) - to_send = binascii.hexlify(r).decode() - - chunk_len = 1400 - for i in range(math.ceil(len(to_send) / chunk_len)): - state = 1 if (i+1)*chunk_len < len(to_send) else 0 - data = to_send[i * chunk_len : (i+1)*chunk_len] - out = self.at(f'AT+QESIM="trans",{len(to_send)},{state},{i},{len(data)},"{data}"') - assert "OK" in out - - if '+QESIM:"download",1' in out: - raise Exception("profile install failed") - elif '+QESIM:"download",0' in out: - print("done, successfully loaded") - break - - def enable(self, iccid): - self.at(f'AT+QESIM="enable","{iccid}"') - - def disable(self, iccid): - self.at(f'AT+QESIM="disable","{iccid}"') - - def delete(self, iccid): - self.at(f'AT+QESIM="delete","{iccid}"') - - def list_profiles(self): - out = self.at('AT+QESIM="list"') - return out.strip().splitlines()[1:] + def __init__(self, interface: Literal['qmi', 'at'] = 'qmi'): + self.env = os.environ.copy() + self.env['LPAC_APDU'] = interface + self.env['QMI_DEVICE'] = '/dev/cdc-wdm0' + self.env['AT_DEVICE'] = '/dev/ttyUSB2' + + self.timeout_sec = 45 + + if shutil.which('lpac') is None: + raise LPAError('lpac not found, must be installed!') + + def list_profiles(self) -> list[Profile]: + msgs = self._invoke('profile', 'list') + self._validate_successful(msgs) + return [Profile( + iccid=p['iccid'], + nickname=p['profileNickname'], + enabled=p['profileState'] == 'enabled', + provider=p['serviceProviderName'] + ) for p in msgs[-1]['payload']['data']] + + def get_active_profile(self) -> Profile | None: + return next((p for p in self.list_profiles() if p.enabled), None) + + def enable_profile(self, iccid: str) -> None: + self._validate_profile_exists(iccid) + latest = self.get_active_profile() + if latest: + if latest.iccid == iccid: + return + self.disable_profile(latest.iccid) + self._validate_successful(self._invoke('profile', 'enable', iccid)) + self.process_notifications() + + def disable_profile(self, iccid: str) -> None: + self._validate_profile_exists(iccid) + latest = self.get_active_profile() + if latest is not None and latest.iccid != iccid: + return + self._validate_successful(self._invoke('profile', 'disable', iccid)) + self.process_notifications() + + def delete_profile(self, iccid: str) -> None: + self._validate_profile_exists(iccid) + latest = self.get_active_profile() + if latest is not None and latest.iccid == iccid: + self.disable_profile(iccid) + self._validate_successful(self._invoke('profile', 'delete', iccid)) + self.process_notifications() + + def download_profile(self, qr: str, nickname: str | None = None) -> None: + msgs = self._invoke('profile', 'download', '-a', qr) + self._validate_successful(msgs) + new_profile = next((m for m in msgs if m['payload']['message'] == 'es8p_meatadata_parse'), None) + if new_profile is None: + raise LPAError('no new profile found') + if nickname: + self.nickname_profile(new_profile['payload']['data']['iccid'], nickname) + self.process_notifications() + + def nickname_profile(self, iccid: str, nickname: str) -> None: + self._validate_profile_exists(iccid) + self._validate_successful(self._invoke('profile', 'nickname', iccid, nickname)) + + def process_notifications(self) -> None: + """ + Process notifications stored on the eUICC, typically to activate/deactivate the profile with the carrier. + """ + self._validate_successful(self._invoke('notification', 'process', '-a', '-r')) + + def _invoke(self, *cmd: str): + proc = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env) + try: + out, err = proc.communicate(timeout=self.timeout_sec) + except subprocess.TimeoutExpired as e: + proc.kill() + raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e + + messages = [] + for line in out.decode().strip().splitlines(): + if line.startswith('{'): + message = json.loads(line) + + # lpac response format validations + assert 'type' in message, 'expected type in message' + assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type' + assert 'payload' in message, 'expected payload in message' + assert 'code' in message['payload'], 'expected code in message payload' + assert 'data' in message['payload'], 'expected data in message payload' + + msg_ret_code = message['payload']['code'] + if msg_ret_code != 0: + raise LPAError(f"lpac {' '.join(cmd)} failed with code {msg_ret_code}: <{message['payload']['message']}> {message['payload']['data']}") + + messages.append(message) + + if len(messages) == 0: + raise LPAError(f"lpac {cmd} returned no messages") + + return messages + + def _validate_profile_exists(self, iccid: str) -> None: + if not any(p.iccid == iccid for p in self.list_profiles()): + raise LPAProfileNotFoundError(f'profile {iccid} does not exist') + + def _validate_successful(self, msgs: list[dict]) -> None: + assert msgs[-1]['payload']['message'] == 'success', 'expected success notification' if __name__ == "__main__": - import sys - - if "RESTART" in os.environ: - subprocess.check_call("sudo systemctl stop ModemManager", shell=True) - subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) - subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) - while not os.path.exists('/dev/ttyUSB2'): - time.sleep(1) - time.sleep(3) - - lpa = LPA() - print(lpa.list_profiles()) - if len(sys.argv) > 1: - lpa.download(sys.argv[1]) - print(lpa.list_profiles()) + parser = argparse.ArgumentParser(prog='esim.py', description='manage eSIM profiles on your comma device', epilog='comma.ai') + parser.add_argument('--backend', choices=['qmi', 'at'], default='qmi', help='use the specified backend, defaults to qmi') + parser.add_argument('--enable', metavar='iccid', help='enable profile; will disable current profile') + parser.add_argument('--disable', metavar='iccid', help='disable profile') + parser.add_argument('--delete', metavar='iccid', help='delete profile (warning: this cannot be undone)') + parser.add_argument('--download', nargs=2, metavar=('qr', 'name'), help='download a profile using QR code (format: LPA:1$rsp.truphone.com$QRF-SPEEDTEST)') + parser.add_argument('--nickname', nargs=2, metavar=('iccid', 'name'), help='update the nickname for a profile') + args = parser.parse_args() + + lpa = LPA(interface=args.backend) + if args.enable: + lpa.enable_profile(args.enable) + print('enabled profile, please restart device to apply changes') + elif args.disable: + lpa.disable_profile(args.disable) + print('disabled profile, please restart device to apply changes') + elif args.delete: + confirm = input('are you sure you want to delete this profile? (y/N) ') + if confirm == 'y': + lpa.delete_profile(args.delete) + print('deleted profile, please restart device to apply changes') + else: + print('cancelled') + exit(0) + elif args.download: + lpa.download_profile(args.download[0], args.download[1]) + elif args.nickname: + lpa.nickname_profile(args.nickname[0], args.nickname[1]) + else: + parser.print_help() + + profiles = lpa.list_profiles() + print(f'\n{len(profiles)} profile{"s" if len(profiles) > 1 else ""}:') + for p in profiles: + print(f'- {p.iccid} (nickname: {p.nickname or ""}) (provider: {p.provider}) - {"enabled" if p.enabled else "disabled"}') diff --git a/system/hardware/tici/tests/test_esim.py b/system/hardware/tici/tests/test_esim.py new file mode 100644 index 0000000000..d36bdaa27b --- /dev/null +++ b/system/hardware/tici/tests/test_esim.py @@ -0,0 +1,51 @@ +import pytest + +from openpilot.system.hardware import TICI +from openpilot.system.hardware.tici.esim import LPA, LPAProfileNotFoundError + +# https://euicc-manual.osmocom.org/docs/rsp/known-test-profile +# iccid is always the same for the given activation code +TEST_ACTIVATION_CODE = 'LPA:1$rsp.truphone.com$QRF-BETTERROAMING-PMRDGIR2EARDEIT5' +TEST_ICCID = '8944476500001944011' + +TEST_NICKNAME = 'test_profile' + +def cleanup(): + lpa = LPA() + try: + lpa.delete_profile(TEST_ICCID) + except LPAProfileNotFoundError: + pass + lpa.process_notifications() + +class TestEsim: + + @classmethod + def setup_class(cls): + if not TICI: + pytest.skip() + cleanup() + + @classmethod + def teardown_class(cls): + cleanup() + + def test_provision_enable_disable(self): + lpa = LPA() + current_active = lpa.get_active_profile() + + lpa.download_profile(TEST_ACTIVATION_CODE, TEST_NICKNAME) + assert any(p.iccid == TEST_ICCID and p.nickname == TEST_NICKNAME for p in lpa.list_profiles()) + + lpa.enable_profile(TEST_ICCID) + new_active = lpa.get_active_profile() + assert new_active is not None + assert new_active.iccid == TEST_ICCID + assert new_active.nickname == TEST_NICKNAME + + lpa.disable_profile(TEST_ICCID) + new_active = lpa.get_active_profile() + assert new_active is None + + if current_active: + lpa.enable_profile(current_active.iccid) diff --git a/tools/op.sh b/tools/op.sh index a7ed964812..7c17c949e4 100755 --- a/tools/op.sh +++ b/tools/op.sh @@ -293,6 +293,11 @@ function op_check() { unset VERBOSE } +function op_esim() { + op_before_cmd + op_run_command system/hardware/tici/esim.py "$@" +} + function op_build() { CDIR=$(pwd) op_before_cmd @@ -392,6 +397,7 @@ function op_default() { echo -e "${BOLD}${UNDERLINE}Commands [System]:${NC}" echo -e " ${BOLD}auth${NC} Authenticate yourself for API use" echo -e " ${BOLD}check${NC} Check the development environment (git, os, python) to start using openpilot" + echo -e " ${BOLD}esim${NC} Manage eSIM profiles on your comma device" echo -e " ${BOLD}venv${NC} Activate the python virtual environment" echo -e " ${BOLD}setup${NC} Install openpilot dependencies" echo -e " ${BOLD}build${NC} Run the openpilot build system in the current working directory" @@ -448,6 +454,7 @@ function _op() { auth ) shift 1; op_auth "$@" ;; venv ) shift 1; op_venv "$@" ;; check ) shift 1; op_check "$@" ;; + esim ) shift 1; op_esim "$@" ;; setup ) shift 1; op_setup "$@" ;; build ) shift 1; op_build "$@" ;; juggle ) shift 1; op_juggle "$@" ;;