#!/usr/bin/env python3 import hashlib import json import lzma import os import struct import subprocess import time from typing import Dict, Generator, List, Tuple, Union import requests import system.hardware.tici.casync as casync SPARSE_CHUNK_FMT = struct.Struct('H2xI4x') CAIBX_URL = "https://commadist.azureedge.net/agnosupdate/" class StreamingDecompressor: def __init__(self, url: str) -> None: self.buf = b"" self.req = requests.get(url, stream=True, headers={'Accept-Encoding': None}) # type: ignore # pylint: disable=missing-timeout self.it = self.req.iter_content(chunk_size=1024 * 1024) self.decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_AUTO) self.eof = False self.sha256 = hashlib.sha256() def read(self, length: int) -> bytes: while len(self.buf) < length: self.req.raise_for_status() try: compressed = next(self.it) except StopIteration: self.eof = True break out = self.decompressor.decompress(compressed) self.buf += out result = self.buf[:length] self.buf = self.buf[length:] self.sha256.update(result) return result def unsparsify(f: StreamingDecompressor) -> Generator[bytes, None, None]: # https://source.android.com/devices/bootloader/images#sparse-format magic = struct.unpack("I", f.read(4))[0] assert(magic == 0xed26ff3a) # Version major = struct.unpack("H", f.read(2))[0] minor = struct.unpack("H", f.read(2))[0] assert(major == 1 and minor == 0) f.read(2) # file header size f.read(2) # chunk header size block_sz = struct.unpack("I", f.read(4))[0] f.read(4) # total blocks num_chunks = struct.unpack("I", f.read(4))[0] f.read(4) # crc checksum for _ in range(num_chunks): chunk_type, out_blocks = SPARSE_CHUNK_FMT.unpack(f.read(12)) if chunk_type == 0xcac1: # Raw # TODO: yield in smaller chunks. Yielding only block_sz is too slow. Largest observed data chunk is 252 MB. yield f.read(out_blocks * block_sz) elif chunk_type == 0xcac2: # Fill filler = f.read(4) * (block_sz // 4) for _ in range(out_blocks): yield filler elif chunk_type == 0xcac3: # Don't care yield b"" else: raise Exception("Unhandled sparse chunk type") # noop wrapper with same API as unsparsify() for non sparse images def noop(f: StreamingDecompressor) -> Generator[bytes, None, None]: while not f.eof: yield f.read(1024 * 1024) def get_target_slot_number() -> int: current_slot = subprocess.check_output(["abctl", "--boot_slot"], encoding='utf-8').strip() return 1 if current_slot == "_a" else 0 def slot_number_to_suffix(slot_number: int) -> str: assert slot_number in (0, 1) return '_a' if slot_number == 0 else '_b' def get_partition_path(target_slot_number: int, partition: dict) -> str: path = f"/dev/disk/by-partlabel/{partition['name']}" if partition.get('has_ab', True): path += slot_number_to_suffix(target_slot_number) return path def get_raw_hash(path: str, partition_size: int) -> str: raw_hash = hashlib.sha256() pos, chunk_size = 0, 1024 * 1024 with open(path, 'rb+') as out: while pos < partition_size: n = min(chunk_size, partition_size - pos) raw_hash.update(out.read(n)) pos += n return raw_hash.hexdigest().lower() def verify_partition(target_slot_number: int, partition: Dict[str, Union[str, int]], force_full_check: bool = False) -> bool: full_check = partition['full_check'] or force_full_check path = get_partition_path(target_slot_number, partition) if not isinstance(partition['size'], int): return False partition_size: int = partition['size'] if not isinstance(partition['hash_raw'], str): return False partition_hash: str = partition['hash_raw'] if full_check: return get_raw_hash(path, partition_size) == partition_hash.lower() else: with open(path, 'rb+') as out: out.seek(partition_size) return out.read(64) == partition_hash.lower().encode() def clear_partition_hash(target_slot_number: int, partition: dict) -> None: path = get_partition_path(target_slot_number, partition) with open(path, 'wb+') as out: partition_size = partition['size'] out.seek(partition_size) out.write(b"\x00" * 64) os.sync() def extract_compressed_image(target_slot_number: int, partition: dict, cloudlog): path = get_partition_path(target_slot_number, partition) downloader = StreamingDecompressor(partition['url']) with open(path, 'wb+') as out: # Flash partition last_p = 0 raw_hash = hashlib.sha256() f = unsparsify if partition['sparse'] else noop for chunk in f(downloader): raw_hash.update(chunk) out.write(chunk) p = int(out.tell() / partition['size'] * 100) if p != last_p: last_p = p print(f"Installing {partition['name']}: {p}", flush=True) if raw_hash.hexdigest().lower() != partition['hash_raw'].lower(): raise Exception(f"Raw hash mismatch '{raw_hash.hexdigest().lower()}'") if downloader.sha256.hexdigest().lower() != partition['hash'].lower(): raise Exception("Uncompressed hash mismatch") if out.tell() != partition['size']: raise Exception("Uncompressed size mismatch") os.sync() def extract_casync_image(target_slot_number: int, partition: dict, cloudlog): path = get_partition_path(target_slot_number, partition) seed_path = path[:-1] + ('b' if path[-1] == 'a' else 'a') target = casync.parse_caibx(partition['casync_caibx']) sources: List[Tuple[str, casync.ChunkReader, casync.ChunkDict]] = [] # First source is the current partition. try: raw_hash = get_raw_hash(seed_path, partition['size']) caibx_url = f"{CAIBX_URL}{partition['name']}-{raw_hash}.caibx" try: cloudlog.info(f"casync fetching {caibx_url}") sources += [('seed', casync.FileChunkReader(seed_path), casync.build_chunk_dict(casync.parse_caibx(caibx_url)))] except requests.RequestException: cloudlog.error(f"casync failed to load {caibx_url}") except Exception: cloudlog.exception("casync failed to hash seed partition") # Second source is the target partition, this allows for resuming sources += [('target', casync.FileChunkReader(path), casync.build_chunk_dict(target))] # Finally we add the remote source to download any missing chunks sources += [('remote', casync.RemoteChunkReader(partition['casync_store']), casync.build_chunk_dict(target))] last_p = 0 def progress(cur): nonlocal last_p p = int(cur / partition['size'] * 100) if p != last_p: last_p = p print(f"Installing {partition['name']}: {p}", flush=True) stats = casync.extract(target, sources, path, progress) cloudlog.error(f'casync done {json.dumps(stats)}') os.sync() if not verify_partition(target_slot_number, partition, force_full_check=True): raise Exception(f"Raw hash mismatch '{partition['hash_raw'].lower()}'") def flash_partition(target_slot_number: int, partition: dict, cloudlog, standalone=False): cloudlog.info(f"Downloading and writing {partition['name']}") if verify_partition(target_slot_number, partition): cloudlog.info(f"Already flashed {partition['name']}") return # Clear hash before flashing in case we get interrupted full_check = partition['full_check'] if not full_check: clear_partition_hash(target_slot_number, partition) path = get_partition_path(target_slot_number, partition) if ('casync_caibx' in partition) and not standalone: extract_casync_image(target_slot_number, partition, cloudlog) else: extract_compressed_image(target_slot_number, partition, cloudlog) # Write hash after successful flash if not full_check: with open(path, 'wb+') as out: out.seek(partition['size']) out.write(partition['hash_raw'].lower().encode()) def swap(manifest_path: str, target_slot_number: int, cloudlog) -> None: update = json.load(open(manifest_path)) for partition in update: if not partition.get('full_check', False): clear_partition_hash(target_slot_number, partition) while True: out = subprocess.check_output(f"abctl --set_active {target_slot_number}", shell=True, stderr=subprocess.STDOUT, encoding='utf8') if ("No such file or directory" not in out) and ("lun as boot lun" in out): cloudlog.info(f"Swap successful {out}") break else: cloudlog.error(f"Swap failed {out}") def flash_agnos_update(manifest_path: str, target_slot_number: int, cloudlog, standalone=False) -> None: update = json.load(open(manifest_path)) cloudlog.info(f"Target slot {target_slot_number}") # set target slot as unbootable os.system(f"abctl --set_unbootable {target_slot_number}") for partition in update: success = False for retries in range(10): try: flash_partition(target_slot_number, partition, cloudlog, standalone) success = True break except requests.exceptions.RequestException: cloudlog.exception("Failed") cloudlog.info(f"Failed to download {partition['name']}, retrying ({retries})") time.sleep(10) if not success: cloudlog.info(f"Failed to flash {partition['name']}, aborting") raise Exception("Maximum retries exceeded") cloudlog.info(f"AGNOS ready on slot {target_slot_number}") def verify_agnos_update(manifest_path: str, target_slot_number: int) -> bool: update = json.load(open(manifest_path)) return all(verify_partition(target_slot_number, partition) for partition in update) if __name__ == "__main__": import argparse import logging parser = argparse.ArgumentParser(description="Flash and verify AGNOS update", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--verify", action="store_true", help="Verify and perform swap if update ready") parser.add_argument("--swap", action="store_true", help="Verify and perform swap, downloads if necessary") parser.add_argument("manifest", help="Manifest json") args = parser.parse_args() logging.basicConfig(level=logging.INFO) target_slot_number = get_target_slot_number() if args.verify: if verify_agnos_update(args.manifest, target_slot_number): swap(args.manifest, target_slot_number, logging) exit(0) exit(1) elif args.swap: while not verify_agnos_update(args.manifest, target_slot_number): logging.error("Verification failed. Flashing AGNOS") flash_agnos_update(args.manifest, target_slot_number, logging, standalone=True) logging.warning(f"Verification succeeded. Swapping to slot {target_slot_number}") swap(args.manifest, target_slot_number, logging) else: flash_agnos_update(args.manifest, target_slot_number, logging, standalone=True)