eSIM profile management (#30262)
	
		
	
				
					
				
			* eSIM profile management * start download * loaded a profile * more stuff * fixups * fix linter --------- Co-authored-by: Comma Device <device@comma.ai>pull/214/head
							parent
							
								
									cad17b1255
								
							
						
					
					
						commit
						2338f76fdd
					
				
				 1 changed files with 115 additions and 0 deletions
			
			
		@ -0,0 +1,115 @@ | 
				
			|||||||
 | 
					#!/usr/bin/env python3 | 
				
			||||||
 | 
					import os | 
				
			||||||
 | 
					import math | 
				
			||||||
 | 
					import time | 
				
			||||||
 | 
					import binascii | 
				
			||||||
 | 
					import requests | 
				
			||||||
 | 
					import serial | 
				
			||||||
 | 
					import subprocess | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					def post(url, payload): | 
				
			||||||
 | 
					  print() | 
				
			||||||
 | 
					  print("POST to", url) | 
				
			||||||
 | 
					  r = requests.post( | 
				
			||||||
 | 
					    url, | 
				
			||||||
 | 
					    data=payload, | 
				
			||||||
 | 
					    verify=False, | 
				
			||||||
 | 
					    headers={ | 
				
			||||||
 | 
					      "Content-Type": "application/json", | 
				
			||||||
 | 
					      "X-Admin-Protocol": "gsma/rsp/v2.2.0", | 
				
			||||||
 | 
					      "charset": "utf-8", | 
				
			||||||
 | 
					      "User-Agent": "gsma-rsp-lpad", | 
				
			||||||
 | 
					    }, | 
				
			||||||
 | 
					  ) | 
				
			||||||
 | 
					  print("resp", r) | 
				
			||||||
 | 
					  print("resp text", repr(r.text)) | 
				
			||||||
 | 
					  print() | 
				
			||||||
 | 
					  r.raise_for_status() | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  ret = f"HTTP/1.1 {r.status_code}" | 
				
			||||||
 | 
					  ret += ''.join(f"{k}: {v}" for k, v in r.headers.items() if k != 'Connection') | 
				
			||||||
 | 
					  return ret.encode() + r.content | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class LPA: | 
				
			||||||
 | 
					  def __init__(self): | 
				
			||||||
 | 
					    self.dev = serial.Serial('/dev/ttyUSB2', baudrate=57600, timeout=1, bytesize=8) | 
				
			||||||
 | 
					    self.dev.reset_input_buffer() | 
				
			||||||
 | 
					    self.dev.reset_output_buffer() | 
				
			||||||
 | 
					    assert "OK" in self.at("AT") | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def at(self, cmd): | 
				
			||||||
 | 
					    print(f"==> {cmd}") | 
				
			||||||
 | 
					    self.dev.write(cmd.encode() + b'\r\n') | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    r = b"" | 
				
			||||||
 | 
					    cnt = 0 | 
				
			||||||
 | 
					    while b"OK" not in r and b"ERROR" not in r and cnt < 20: | 
				
			||||||
 | 
					      r += self.dev.read(8192).strip() | 
				
			||||||
 | 
					      cnt += 1 | 
				
			||||||
 | 
					    r = r.decode() | 
				
			||||||
 | 
					    print(f"<== {repr(r)}") | 
				
			||||||
 | 
					    return r | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def download_ota(self, qr): | 
				
			||||||
 | 
					    return self.at(f'AT+QESIM="ota","{qr}"') | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def download(self, qr): | 
				
			||||||
 | 
					    smdp = qr.split('$')[1] | 
				
			||||||
 | 
					    out = self.at(f'AT+QESIM="download","{qr}"') | 
				
			||||||
 | 
					    for _ in range(5): | 
				
			||||||
 | 
					      line = out.split("+QESIM: ")[1].split("\r\n\r\nOK")[0] | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      parts = [x.strip().strip('"') for x in line.split(',', maxsplit=4)] | 
				
			||||||
 | 
					      print(repr(parts)) | 
				
			||||||
 | 
					      trans, ret, url, payloadlen, payload = parts | 
				
			||||||
 | 
					      assert trans == "trans" and ret == "0" | 
				
			||||||
 | 
					      assert len(payload) == int(payloadlen) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      r = post(f"https://{smdp}/{url}", payload) | 
				
			||||||
 | 
					      to_send = binascii.hexlify(r).decode() | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      chunk_len = 1400 | 
				
			||||||
 | 
					      for i in range(math.ceil(len(to_send) / chunk_len)): | 
				
			||||||
 | 
					        state = 1 if (i+1)*chunk_len < len(to_send) else 0 | 
				
			||||||
 | 
					        data = to_send[i * chunk_len : (i+1)*chunk_len] | 
				
			||||||
 | 
					        out = self.at(f'AT+QESIM="trans",{len(to_send)},{state},{i},{len(data)},"{data}"') | 
				
			||||||
 | 
					        assert "OK" in out | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      if '+QESIM:"download",1' in out: | 
				
			||||||
 | 
					        raise Exception("profile install failed") | 
				
			||||||
 | 
					      elif '+QESIM:"download",0' in out: | 
				
			||||||
 | 
					        print("done, successfully loaded") | 
				
			||||||
 | 
					        break | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def enable(self, iccid): | 
				
			||||||
 | 
					    self.at(f'AT+QESIM="enable","{iccid}"') | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def disable(self, iccid): | 
				
			||||||
 | 
					    self.at(f'AT+QESIM="disable","{iccid}"') | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def delete(self, iccid): | 
				
			||||||
 | 
					    self.at(f'AT+QESIM="delete","{iccid}"') | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  def list_profiles(self): | 
				
			||||||
 | 
					    out = self.at('AT+QESIM="list"') | 
				
			||||||
 | 
					    return out.strip().splitlines()[1:] | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					if __name__ == "__main__": | 
				
			||||||
 | 
					  import sys | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  if "RESTART" in os.environ: | 
				
			||||||
 | 
					    subprocess.check_call("sudo systemctl stop ModemManager", shell=True) | 
				
			||||||
 | 
					    subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) | 
				
			||||||
 | 
					    subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) | 
				
			||||||
 | 
					    while not os.path.exists('/dev/ttyUSB2'): | 
				
			||||||
 | 
					      time.sleep(1) | 
				
			||||||
 | 
					    time.sleep(3) | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  lpa = LPA() | 
				
			||||||
 | 
					  print(lpa.list_profiles()) | 
				
			||||||
 | 
					  if len(sys.argv) > 1: | 
				
			||||||
 | 
					    lpa.download(sys.argv[1]) | 
				
			||||||
 | 
					    print(lpa.list_profiles()) | 
				
			||||||
					Loading…
					
					
				
		Reference in new issue