You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					116 lines
				
				3.0 KiB
			
		
		
			
		
	
	
					116 lines
				
				3.0 KiB
			| 
											2 years ago
										 | #!/usr/bin/env python3
 | ||
|  | import os
 | ||
|  | import math
 | ||
|  | import time
 | ||
|  | import binascii
 | ||
|  | import requests
 | ||
|  | import serial
 | ||
|  | import subprocess
 | ||
|  | 
 | ||
|  | 
 | ||
|  | def post(url, payload):
 | ||
|  |   print()
 | ||
|  |   print("POST to", url)
 | ||
|  |   r = requests.post(
 | ||
|  |     url,
 | ||
|  |     data=payload,
 | ||
|  |     verify=False,
 | ||
|  |     headers={
 | ||
|  |       "Content-Type": "application/json",
 | ||
|  |       "X-Admin-Protocol": "gsma/rsp/v2.2.0",
 | ||
|  |       "charset": "utf-8",
 | ||
|  |       "User-Agent": "gsma-rsp-lpad",
 | ||
|  |     },
 | ||
|  |   )
 | ||
|  |   print("resp", r)
 | ||
|  |   print("resp text", repr(r.text))
 | ||
|  |   print()
 | ||
|  |   r.raise_for_status()
 | ||
|  | 
 | ||
|  |   ret = f"HTTP/1.1 {r.status_code}"
 | ||
|  |   ret += ''.join(f"{k}: {v}" for k, v in r.headers.items() if k != 'Connection')
 | ||
|  |   return ret.encode() + r.content
 | ||
|  | 
 | ||
|  | 
 | ||
|  | class LPA:
 | ||
|  |   def __init__(self):
 | ||
|  |     self.dev = serial.Serial('/dev/ttyUSB2', baudrate=57600, timeout=1, bytesize=8)
 | ||
|  |     self.dev.reset_input_buffer()
 | ||
|  |     self.dev.reset_output_buffer()
 | ||
|  |     assert "OK" in self.at("AT")
 | ||
|  | 
 | ||
|  |   def at(self, cmd):
 | ||
|  |     print(f"==> {cmd}")
 | ||
|  |     self.dev.write(cmd.encode() + b'\r\n')
 | ||
|  | 
 | ||
|  |     r = b""
 | ||
|  |     cnt = 0
 | ||
|  |     while b"OK" not in r and b"ERROR" not in r and cnt < 20:
 | ||
|  |       r += self.dev.read(8192).strip()
 | ||
|  |       cnt += 1
 | ||
|  |     r = r.decode()
 | ||
|  |     print(f"<== {repr(r)}")
 | ||
|  |     return r
 | ||
|  | 
 | ||
|  |   def download_ota(self, qr):
 | ||
|  |     return self.at(f'AT+QESIM="ota","{qr}"')
 | ||
|  | 
 | ||
|  |   def download(self, qr):
 | ||
|  |     smdp = qr.split('$')[1]
 | ||
|  |     out = self.at(f'AT+QESIM="download","{qr}"')
 | ||
|  |     for _ in range(5):
 | ||
|  |       line = out.split("+QESIM: ")[1].split("\r\n\r\nOK")[0]
 | ||
|  | 
 | ||
|  |       parts = [x.strip().strip('"') for x in line.split(',', maxsplit=4)]
 | ||
|  |       print(repr(parts))
 | ||
|  |       trans, ret, url, payloadlen, payload = parts
 | ||
|  |       assert trans == "trans" and ret == "0"
 | ||
|  |       assert len(payload) == int(payloadlen)
 | ||
|  | 
 | ||
|  |       r = post(f"https://{smdp}/{url}", payload)
 | ||
|  |       to_send = binascii.hexlify(r).decode()
 | ||
|  | 
 | ||
|  |       chunk_len = 1400
 | ||
|  |       for i in range(math.ceil(len(to_send) / chunk_len)):
 | ||
|  |         state = 1 if (i+1)*chunk_len < len(to_send) else 0
 | ||
|  |         data = to_send[i * chunk_len : (i+1)*chunk_len]
 | ||
|  |         out = self.at(f'AT+QESIM="trans",{len(to_send)},{state},{i},{len(data)},"{data}"')
 | ||
|  |         assert "OK" in out
 | ||
|  | 
 | ||
|  |       if '+QESIM:"download",1' in out:
 | ||
|  |         raise Exception("profile install failed")
 | ||
|  |       elif '+QESIM:"download",0' in out:
 | ||
|  |         print("done, successfully loaded")
 | ||
|  |         break
 | ||
|  | 
 | ||
|  |   def enable(self, iccid):
 | ||
|  |     self.at(f'AT+QESIM="enable","{iccid}"')
 | ||
|  | 
 | ||
|  |   def disable(self, iccid):
 | ||
|  |     self.at(f'AT+QESIM="disable","{iccid}"')
 | ||
|  | 
 | ||
|  |   def delete(self, iccid):
 | ||
|  |     self.at(f'AT+QESIM="delete","{iccid}"')
 | ||
|  | 
 | ||
|  |   def list_profiles(self):
 | ||
|  |     out = self.at('AT+QESIM="list"')
 | ||
|  |     return out.strip().splitlines()[1:]
 | ||
|  | 
 | ||
|  | 
 | ||
|  | if __name__ == "__main__":
 | ||
|  |   import sys
 | ||
|  | 
 | ||
|  |   if "RESTART" in os.environ:
 | ||
|  |     subprocess.check_call("sudo systemctl stop ModemManager", shell=True)
 | ||
|  |     subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True)
 | ||
|  |     subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True)
 | ||
|  |     while not os.path.exists('/dev/ttyUSB2'):
 | ||
|  |       time.sleep(1)
 | ||
|  |     time.sleep(3)
 | ||
|  | 
 | ||
|  |   lpa = LPA()
 | ||
|  |   print(lpa.list_profiles())
 | ||
|  |   if len(sys.argv) > 1:
 | ||
|  |     lpa.download(sys.argv[1])
 | ||
|  |     print(lpa.list_profiles())
 |