You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					131 lines
				
				4.0 KiB
			
		
		
			
		
	
	
					131 lines
				
				4.0 KiB
			| 
								 
											4 years ago
										 
									 | 
							
								#!/usr/bin/env python3
							 | 
						||
| 
								 | 
							
								import argparse
							 | 
						||
| 
								 | 
							
								import hashlib
							 | 
						||
| 
								 | 
							
								import json
							 | 
						||
| 
								 | 
							
								import logging
							 | 
						||
| 
								 | 
							
								import os
							 | 
						||
| 
								 | 
							
								import requests
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								NEOSUPDATE_DIR = "/data/neoupdate"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								RECOVERY_DEV = "/dev/block/bootdevice/by-name/recovery"
							 | 
						||
| 
								 | 
							
								RECOVERY_COMMAND = "/cache/recovery/command"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def get_fn(url: str):
							 | 
						||
| 
								 | 
							
								  return os.path.join(NEOSUPDATE_DIR, os.path.basename(url))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def download_file(url: str, fn: str, sha256: str, display_name: str, cloudlog=logging) -> None:
							 | 
						||
| 
								 | 
							
								  # check if already downloaded
							 | 
						||
| 
								 | 
							
								  if check_hash(fn, sha256):
							 | 
						||
| 
								 | 
							
								    cloudlog.info(f"{display_name} already cached")
							 | 
						||
| 
								 | 
							
								    return
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  try:
							 | 
						||
| 
								 | 
							
								    with open(fn, "ab+") as f:
							 | 
						||
| 
								 | 
							
								      headers = {"Range": f"bytes={f.tell()}-"}
							 | 
						||
| 
								 | 
							
								      r = requests.get(url, stream=True, allow_redirects=True, headers=headers)
							 | 
						||
| 
								 | 
							
								      r.raise_for_status()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      total = int(r.headers['Content-Length'])
							 | 
						||
| 
								 | 
							
								      if 'Content-Range' in r.headers:
							 | 
						||
| 
								 | 
							
								        total = int(r.headers['Content-Range'].split('/')[-1])
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								      for chunk in r.iter_content(chunk_size=1024 * 1024):
							 | 
						||
| 
								 | 
							
								        f.write(chunk)
							 | 
						||
| 
								 | 
							
								        print(f"Downloading {display_name}: {f.tell() / total * 100}")
							 | 
						||
| 
								 | 
							
								  except Exception:
							 | 
						||
| 
								 | 
							
								    cloudlog.error("download error")
							 | 
						||
| 
								 | 
							
								    if os.path.isfile(fn):
							 | 
						||
| 
								 | 
							
								      os.unlink(fn)
							 | 
						||
| 
								 | 
							
								    raise
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  if not check_hash(fn, sha256):
							 | 
						||
| 
								 | 
							
								    if os.path.isfile(fn):
							 | 
						||
| 
								 | 
							
								      os.unlink(fn)
							 | 
						||
| 
								 | 
							
								    raise Exception("downloaded update failed hash check")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def check_hash(fn: str, sha256: str, length: int = -1) -> bool:
							 | 
						||
| 
								 | 
							
								  if not os.path.exists(fn):
							 | 
						||
| 
								 | 
							
								    return False
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  h = hashlib.sha256()
							 | 
						||
| 
								 | 
							
								  with open(fn, "rb") as f:
							 | 
						||
| 
								 | 
							
								    while f.tell() != length:
							 | 
						||
| 
								 | 
							
								      r = min(max(0, length - f.tell()), 1024 * 1024) if length > 0 else 1024 * 1024
							 | 
						||
| 
								 | 
							
								      dat = f.read(r)
							 | 
						||
| 
								 | 
							
								      if not dat:
							 | 
						||
| 
								 | 
							
								        break
							 | 
						||
| 
								 | 
							
								      h.update(dat)
							 | 
						||
| 
								 | 
							
								  return h.hexdigest().lower() == sha256.lower()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def flash_update(update_fn: str, out_path: str) -> None:
							 | 
						||
| 
								 | 
							
								  with open(update_fn, "rb") as update, open(out_path, "w+b") as out:
							 | 
						||
| 
								 | 
							
								    while True:
							 | 
						||
| 
								 | 
							
								      dat = update.read(8192)
							 | 
						||
| 
								 | 
							
								      if len(dat) == 0:
							 | 
						||
| 
								 | 
							
								        break
							 | 
						||
| 
								 | 
							
								      out.write(dat)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def download_neos_update(manifest_path: str, cloudlog=logging) -> None:
							 | 
						||
| 
								 | 
							
								  with open(manifest_path) as f:
							 | 
						||
| 
								 | 
							
								    m = json.load(f)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  os.makedirs(NEOSUPDATE_DIR, exist_ok=True)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  # handle recovery updates
							 | 
						||
| 
								 | 
							
								  if not check_hash(RECOVERY_DEV, m['recovery_hash'], m['recovery_len']):
							 | 
						||
| 
								 | 
							
								    cloudlog.info("recovery needs update")
							 | 
						||
| 
								 | 
							
								    recovery_fn = os.path.join(NEOSUPDATE_DIR, os.path.basename(m['recovery_url']))
							 | 
						||
| 
								 | 
							
								    download_file(m['recovery_url'], recovery_fn, m['recovery_hash'], "recovery", cloudlog)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    flash_update(recovery_fn, RECOVERY_DEV)
							 | 
						||
| 
								 | 
							
								    assert check_hash(RECOVERY_DEV, m['recovery_hash'], m['recovery_len']), "recovery flash corrupted"
							 | 
						||
| 
								 | 
							
								    cloudlog.info("recovery successfully flashed")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  # download OTA update
							 | 
						||
| 
								 | 
							
								  download_file(m['ota_url'], get_fn(m['ota_url']), m['ota_hash'], "system", cloudlog)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def verify_update_ready(manifest_path: str) -> bool:
							 | 
						||
| 
								 | 
							
								  with open(manifest_path) as f:
							 | 
						||
| 
								 | 
							
								    m = json.load(f)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  ota_downloaded = check_hash(get_fn(m['ota_url']), m['ota_hash'])
							 | 
						||
| 
								 | 
							
								  recovery_flashed = check_hash(RECOVERY_DEV, m['recovery_hash'], m['recovery_len'])
							 | 
						||
| 
								 | 
							
								  return ota_downloaded and recovery_flashed
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def perform_ota_update(manifest_path: str) -> None:
							 | 
						||
| 
								 | 
							
								  with open(manifest_path) as f:
							 | 
						||
| 
								 | 
							
								    m = json.load(f)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  # reboot into recovery
							 | 
						||
| 
								 | 
							
								  ota_fn = get_fn(m['ota_url'])
							 | 
						||
| 
								 | 
							
								  with open(RECOVERY_COMMAND, "w") as rf:
							 | 
						||
| 
								 | 
							
								    rf.write(f"--update_package={ota_fn}\n")
							 | 
						||
| 
								 | 
							
								  os.system("service call power 16 i32 0 s16 recovery i32 1")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								if __name__ == "__main__":
							 | 
						||
| 
								 | 
							
								  parser = argparse.ArgumentParser(description="NEOS update utility",
							 | 
						||
| 
								 | 
							
								                                   formatter_class=argparse.ArgumentDefaultsHelpFormatter)
							 | 
						||
| 
								 | 
							
								  parser.add_argument("--swap", action="store_true", help="Peform update after downloading")
							 | 
						||
| 
								 | 
							
								  parser.add_argument("--swap-if-ready", action="store_true", help="Perform update if already downloaded")
							 | 
						||
| 
								 | 
							
								  parser.add_argument("manifest", help="Manifest json")
							 | 
						||
| 
								 | 
							
								  args = parser.parse_args()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  logging.basicConfig(level=logging.INFO)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  if args.swap_if_ready:
							 | 
						||
| 
								 | 
							
								    if verify_update_ready(args.manifest):
							 | 
						||
| 
								 | 
							
								      perform_ota_update(args.manifest)
							 | 
						||
| 
								 | 
							
								  else:
							 | 
						||
| 
								 | 
							
								    download_neos_update(args.manifest, logging)
							 | 
						||
| 
								 | 
							
								    if args.swap:
							 | 
						||
| 
								 | 
							
								      perform_ota_update(args.manifest)
							 |