openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

328 lines
11 KiB

#!/usr/bin/env python3
import hashlib
import json
import lzma
import os
import struct
import subprocess
import time
from typing import Dict, Generator, List, Tuple, Union
import requests
import openpilot.system.hardware.tici.casync as casync
SPARSE_CHUNK_FMT = struct.Struct('H2xI4x')
CAIBX_URL = "https://commadist.azureedge.net/agnosupdate/"
class StreamingDecompressor:
def __init__(self, url: str) -> None:
self.buf = b""
self.req = requests.get(url, stream=True, headers={'Accept-Encoding': None}, timeout=60) # type: ignore
self.it = self.req.iter_content(chunk_size=1024 * 1024)
self.decompressor = lzma.LZMADecompressor(format=lzma.FORMAT_AUTO)
self.eof = False
self.sha256 = hashlib.sha256()
def read(self, length: int) -> bytes:
while len(self.buf) < length:
self.req.raise_for_status()
try:
compressed = next(self.it)
except StopIteration:
self.eof = True
break
out = self.decompressor.decompress(compressed)
self.buf += out
result = self.buf[:length]
self.buf = self.buf[length:]
self.sha256.update(result)
return result
def unsparsify(f: StreamingDecompressor) -> Generator[bytes, None, None]:
# https://source.android.com/devices/bootloader/images#sparse-format
magic = struct.unpack("I", f.read(4))[0]
assert(magic == 0xed26ff3a)
# Version
major = struct.unpack("H", f.read(2))[0]
minor = struct.unpack("H", f.read(2))[0]
assert(major == 1 and minor == 0)
f.read(2) # file header size
f.read(2) # chunk header size
block_sz = struct.unpack("I", f.read(4))[0]
f.read(4) # total blocks
num_chunks = struct.unpack("I", f.read(4))[0]
f.read(4) # crc checksum
for _ in range(num_chunks):
chunk_type, out_blocks = SPARSE_CHUNK_FMT.unpack(f.read(12))
if chunk_type == 0xcac1: # Raw
# TODO: yield in smaller chunks. Yielding only block_sz is too slow. Largest observed data chunk is 252 MB.
yield f.read(out_blocks * block_sz)
elif chunk_type == 0xcac2: # Fill
filler = f.read(4) * (block_sz // 4)
for _ in range(out_blocks):
yield filler
elif chunk_type == 0xcac3: # Don't care
yield b""
else:
raise Exception("Unhandled sparse chunk type")
# noop wrapper with same API as unsparsify() for non sparse images
def noop(f: StreamingDecompressor) -> Generator[bytes, None, None]:
while not f.eof:
yield f.read(1024 * 1024)
def get_target_slot_number() -> int:
current_slot = subprocess.check_output(["abctl", "--boot_slot"], encoding='utf-8').strip()
return 1 if current_slot == "_a" else 0
def slot_number_to_suffix(slot_number: int) -> str:
assert slot_number in (0, 1)
return '_a' if slot_number == 0 else '_b'
def get_partition_path(target_slot_number: int, partition: dict) -> str:
path = f"/dev/disk/by-partlabel/{partition['name']}"
if partition.get('has_ab', True):
path += slot_number_to_suffix(target_slot_number)
return path
def get_raw_hash(path: str, partition_size: int) -> str:
raw_hash = hashlib.sha256()
pos, chunk_size = 0, 1024 * 1024
with open(path, 'rb+') as out:
while pos < partition_size:
n = min(chunk_size, partition_size - pos)
raw_hash.update(out.read(n))
pos += n
return raw_hash.hexdigest().lower()
def verify_partition(target_slot_number: int, partition: Dict[str, Union[str, int]], force_full_check: bool = False) -> bool:
full_check = partition['full_check'] or force_full_check
path = get_partition_path(target_slot_number, partition)
if not isinstance(partition['size'], int):
return False
partition_size: int = partition['size']
if not isinstance(partition['hash_raw'], str):
return False
partition_hash: str = partition['hash_raw']
if full_check:
return get_raw_hash(path, partition_size) == partition_hash.lower()
else:
with open(path, 'rb+') as out:
out.seek(partition_size)
return out.read(64) == partition_hash.lower().encode()
def clear_partition_hash(target_slot_number: int, partition: dict) -> None:
path = get_partition_path(target_slot_number, partition)
with open(path, 'wb+') as out:
partition_size = partition['size']
out.seek(partition_size)
out.write(b"\x00" * 64)
os.sync()
def extract_compressed_image(target_slot_number: int, partition: dict, cloudlog):
path = get_partition_path(target_slot_number, partition)
downloader = StreamingDecompressor(partition['url'])
with open(path, 'wb+') as out:
# Flash partition
last_p = 0
raw_hash = hashlib.sha256()
f = unsparsify if partition['sparse'] else noop
for chunk in f(downloader):
raw_hash.update(chunk)
out.write(chunk)
p = int(out.tell() / partition['size'] * 100)
if p != last_p:
last_p = p
print(f"Installing {partition['name']}: {p}", flush=True)
if raw_hash.hexdigest().lower() != partition['hash_raw'].lower():
raise Exception(f"Raw hash mismatch '{raw_hash.hexdigest().lower()}'")
if downloader.sha256.hexdigest().lower() != partition['hash'].lower():
raise Exception("Uncompressed hash mismatch")
if out.tell() != partition['size']:
raise Exception("Uncompressed size mismatch")
os.sync()
def extract_casync_image(target_slot_number: int, partition: dict, cloudlog):
path = get_partition_path(target_slot_number, partition)
seed_path = path[:-1] + ('b' if path[-1] == 'a' else 'a')
target = casync.parse_caibx(partition['casync_caibx'])
sources: List[Tuple[str, casync.ChunkReader, casync.ChunkDict]] = []
# First source is the current partition.
try:
raw_hash = get_raw_hash(seed_path, partition['size'])
caibx_url = f"{CAIBX_URL}{partition['name']}-{raw_hash}.caibx"
try:
cloudlog.info(f"casync fetching {caibx_url}")
sources += [('seed', casync.FileChunkReader(seed_path), casync.build_chunk_dict(casync.parse_caibx(caibx_url)))]
except requests.RequestException:
cloudlog.error(f"casync failed to load {caibx_url}")
except Exception:
cloudlog.exception("casync failed to hash seed partition")
# Second source is the target partition, this allows for resuming
sources += [('target', casync.FileChunkReader(path), casync.build_chunk_dict(target))]
# Finally we add the remote source to download any missing chunks
sources += [('remote', casync.RemoteChunkReader(partition['casync_store']), casync.build_chunk_dict(target))]
last_p = 0
def progress(cur):
nonlocal last_p
p = int(cur / partition['size'] * 100)
if p != last_p:
last_p = p
print(f"Installing {partition['name']}: {p}", flush=True)
stats = casync.extract(target, sources, path, progress)
cloudlog.error(f'casync done {json.dumps(stats)}')
os.sync()
if not verify_partition(target_slot_number, partition, force_full_check=True):
raise Exception(f"Raw hash mismatch '{partition['hash_raw'].lower()}'")
def flash_partition(target_slot_number: int, partition: dict, cloudlog, standalone=False):
cloudlog.info(f"Downloading and writing {partition['name']}")
if verify_partition(target_slot_number, partition):
cloudlog.info(f"Already flashed {partition['name']}")
return
# Clear hash before flashing in case we get interrupted
full_check = partition['full_check']
if not full_check:
clear_partition_hash(target_slot_number, partition)
path = get_partition_path(target_slot_number, partition)
if ('casync_caibx' in partition) and not standalone:
extract_casync_image(target_slot_number, partition, cloudlog)
else:
extract_compressed_image(target_slot_number, partition, cloudlog)
# Write hash after successful flash
if not full_check:
with open(path, 'wb+') as out:
out.seek(partition['size'])
out.write(partition['hash_raw'].lower().encode())
def swap(manifest_path: str, target_slot_number: int, cloudlog) -> None:
update = json.load(open(manifest_path))
for partition in update:
if not partition.get('full_check', False):
clear_partition_hash(target_slot_number, partition)
while True:
out = subprocess.check_output(f"abctl --set_active {target_slot_number}", shell=True, stderr=subprocess.STDOUT, encoding='utf8')
if ("No such file or directory" not in out) and ("lun as boot lun" in out):
cloudlog.info(f"Swap successful {out}")
break
else:
cloudlog.error(f"Swap failed {out}")
def flash_agnos_update(manifest_path: str, target_slot_number: int, cloudlog, standalone=False) -> None:
update = json.load(open(manifest_path))
cloudlog.info(f"Target slot {target_slot_number}")
# set target slot as unbootable
os.system(f"abctl --set_unbootable {target_slot_number}")
for partition in update:
success = False
for retries in range(10):
try:
flash_partition(target_slot_number, partition, cloudlog, standalone)
success = True
break
except requests.exceptions.RequestException:
cloudlog.exception("Failed")
cloudlog.info(f"Failed to download {partition['name']}, retrying ({retries})")
time.sleep(10)
if not success:
cloudlog.info(f"Failed to flash {partition['name']}, aborting")
raise Exception("Maximum retries exceeded")
cloudlog.info(f"AGNOS ready on slot {target_slot_number}")
def verify_agnos_update(manifest_path: str, target_slot_number: int) -> bool:
update = json.load(open(manifest_path))
return all(verify_partition(target_slot_number, partition) for partition in update)
if __name__ == "__main__":
import argparse
import logging
parser = argparse.ArgumentParser(description="Flash and verify AGNOS update",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument("--verify", action="store_true", help="Verify and perform swap if update ready")
parser.add_argument("--swap", action="store_true", help="Verify and perform swap, downloads if necessary")
parser.add_argument("manifest", help="Manifest json")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO)
target_slot_number = get_target_slot_number()
if args.verify:
if verify_agnos_update(args.manifest, target_slot_number):
swap(args.manifest, target_slot_number, logging)
exit(0)
exit(1)
elif args.swap:
while not verify_agnos_update(args.manifest, target_slot_number):
logging.error("Verification failed. Flashing AGNOS")
flash_agnos_update(args.manifest, target_slot_number, logging, standalone=True)
logging.warning(f"Verification succeeded. Swapping to slot {target_slot_number}")
swap(args.manifest, target_slot_number, logging)
else:
flash_agnos_update(args.manifest, target_slot_number, logging, standalone=True)