delete old LPA, add enable/disable/validation

pull/35314/head
Trey Moen 1 week ago
parent 7c4d557658
commit 9871a02c1b
  1. 226
      system/hardware/tici/esim.py

@ -1,12 +1,17 @@
#!/usr/bin/env python3
import json
import os
import math
import time
import binascii
import requests
import serial
import subprocess
import time
from dataclasses import dataclass
@dataclass
class Profile:
iccid: str
isdp_aid: str
nickname: str
enabled: bool
provider: str
class LPAError(Exception):
pass
@ -20,157 +25,122 @@ class LPA:
self.timeout_sec = 30
def _invoke(self, cmd):
ret = subprocess.Popen(['sudo', 'lpac', *cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
def list_profiles(self):
"""
List all profiles on the eUICC.
"""
raw = self._invoke('profile', 'list')[-1] # only one message
profiles = []
for profile in raw['payload']['data']:
profiles.append(Profile(
iccid=profile['iccid'],
isdp_aid=profile['isdpAid'],
nickname=profile['profileNickname'],
enabled=profile['profileState'] == 'enabled',
provider=profile['serviceProviderName'],
))
return profiles
def get_active_profile(self):
"""
Get the active profile on the eUICC.
"""
profiles = self.list_profiles()
for profile in profiles:
if profile.enabled:
return profile
return None
def process_notifications(self) -> None:
"""
Process notifications from the LPA, typically to activate/deactivate the profile with the carrier.
"""
msgs = self._invoke('notification', 'process', '-a', '-r')
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
def enable_profile(self, iccid: str) -> None:
"""
Enable the profile on the eUICC.
"""
latest = self.get_active_profile()
if latest is None:
raise LPAError('no profile enabled')
if latest.iccid == iccid:
raise LPAError(f'profile {iccid} is already enabled')
else:
self.disable_profile(latest.iccid)
msgs = self._invoke('profile', 'enable', iccid)
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
self.process_notifications()
def disable_profile(self, iccid: str) -> None:
"""
Disable the profile on the eUICC.
"""
latest = self.get_active_profile()
if latest is None:
raise LPAError('no profile enabled')
if latest.iccid != iccid:
raise LPAError(f'profile {iccid} is not enabled')
msgs = self._invoke('profile', 'disable', iccid)
assert msgs[-1]['payload']['message'] == 'success', 'expected success notification'
self.process_notifications()
def _invoke(self, *cmd: str):
print(f"invoking lpac {' '.join(list(cmd))}")
ret = subprocess.Popen(['sudo', '-E', 'lpac'] + list(cmd), shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.env)
try:
out, err = ret.communicate(timeout=self.timeout_sec)
except subprocess.TimeoutExpired as e:
ret.kill()
raise LPAError(f"lpac {cmd} timed out after {self.timeout_sec} seconds") from e
if ret.returncode != 0:
raise Exception(f"lpac {cmd} failed: {err}")
messages = []
for line in out.decode().strip().splitlines():
if line.startswith('{'):
message = json.loads(line)
# lpac response format validations
assert 'type' in message
assert message['type'] == 'lpa'
assert 'payload' in message
assert 'code' in message['payload']
assert 'type' in message, 'expected type in message'
assert message['type'] == 'lpa' or message['type'] == 'progress', 'expected lpa or progress message type'
assert 'payload' in message, 'expected payload in message'
assert 'code' in message['payload'], 'expected code in message payload'
if message['payload']['code'] != 0:
raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: {message['payload']['message']}")
raise LPAError(f"lpac {' '.join(cmd)} failed with code {message['payload']['code']}: <{message['payload']['message']}> {message['payload']['data']}")
messages.append(message['payload']['data'])
assert 'data' in message['payload'], 'expected data in message payload'
messages.append(message)
# TODO: confirm we should always get a message
if len(messages) == 0:
raise LPAError(f"lpac {cmd} returned no messages")
return messages
def list_profiles(self):
raw = self._invoke(['profile', 'list'])[-1] # only one message
profiles = []
for profile in raw:
profiles.append({
'iccid': profile['iccid'],
'isdp_aid': profile['isdpAid'],
'nickname': profile['profileNickname'],
'enabled': profile['profileState'] == 'enabled',
'provider': profile['serviceProviderName'],
})
return profiles
def post(url, payload):
print()
print("POST to", url)
r = requests.post(
url,
data=payload,
verify=False,
headers={
"Content-Type": "application/json",
"X-Admin-Protocol": "gsma/rsp/v2.2.0",
"charset": "utf-8",
"User-Agent": "gsma-rsp-lpad",
},
)
print("resp", r)
print("resp text", repr(r.text))
print()
r.raise_for_status()
ret = f"HTTP/1.1 {r.status_code}"
ret += ''.join(f"{k}: {v}" for k, v in r.headers.items() if k != 'Connection')
return ret.encode() + r.content
class LPA:
def __init__(self):
self.dev = serial.Serial('/dev/ttyUSB2', baudrate=57600, timeout=1, bytesize=8)
self.dev.reset_input_buffer()
self.dev.reset_output_buffer()
assert "OK" in self.at("AT")
def at(self, cmd):
print(f"==> {cmd}")
self.dev.write(cmd.encode() + b'\r\n')
r = b""
cnt = 0
while b"OK" not in r and b"ERROR" not in r and cnt < 20:
r += self.dev.read(8192).strip()
cnt += 1
r = r.decode()
print(f"<== {repr(r)}")
return r
def download_ota(self, qr):
return self.at(f'AT+QESIM="ota","{qr}"')
def download(self, qr):
smdp = qr.split('$')[1]
out = self.at(f'AT+QESIM="download","{qr}"')
for _ in range(5):
line = out.split("+QESIM: ")[1].split("\r\n\r\nOK")[0]
parts = [x.strip().strip('"') for x in line.split(',', maxsplit=4)]
print(repr(parts))
trans, ret, url, payloadlen, payload = parts
assert trans == "trans" and ret == "0"
assert len(payload) == int(payloadlen)
r = post(f"https://{smdp}/{url}", payload)
to_send = binascii.hexlify(r).decode()
chunk_len = 1400
for i in range(math.ceil(len(to_send) / chunk_len)):
state = 1 if (i+1)*chunk_len < len(to_send) else 0
data = to_send[i * chunk_len : (i+1)*chunk_len]
out = self.at(f'AT+QESIM="trans",{len(to_send)},{state},{i},{len(data)},"{data}"')
assert "OK" in out
if '+QESIM:"download",1' in out:
raise Exception("profile install failed")
elif '+QESIM:"download",0' in out:
print("done, successfully loaded")
break
def enable(self, iccid):
self.at(f'AT+QESIM="enable","{iccid}"')
def disable(self, iccid):
self.at(f'AT+QESIM="disable","{iccid}"')
def delete(self, iccid):
self.at(f'AT+QESIM="delete","{iccid}"')
def list_profiles(self):
out = self.at('AT+QESIM="list"')
return out.strip().splitlines()[1:]
if __name__ == "__main__":
import sys
lpa = LPA()
print(lpa.list_profiles())
if len(sys.argv) > 2:
if sys.argv[1] == 'enable':
lpa.enable_profile(sys.argv[2])
elif sys.argv[1] == 'disable':
lpa.disable_profile(sys.argv[2])
else:
raise Exception(f"invalid command: {sys.argv[1]}")
if "RESTART" in os.environ:
subprocess.check_call("sudo systemctl stop ModemManager", shell=True)
subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True)
subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True)
while not os.path.exists('/dev/ttyUSB2'):
time.sleep(1)
time.sleep(3)
lpa = LPA()
print(lpa.list_profiles())
if len(sys.argv) > 1:
lpa.download(sys.argv[1])
print(lpa.list_profiles())

Loading…
Cancel
Save