diff --git a/selfdrive/hardware/tici/agnos.py b/selfdrive/hardware/tici/agnos.py index 3eeb03e094..b14e6007ec 100755 --- a/selfdrive/hardware/tici/agnos.py +++ b/selfdrive/hardware/tici/agnos.py @@ -6,7 +6,7 @@ import requests import struct import subprocess import os -from typing import Generator +from typing import Generator, Optional from common.spinner import Spinner @@ -76,17 +76,17 @@ def unsparsify(f: StreamingDecompressor) -> Generator[bytes, None, None]: raise Exception("Unhandled sparse chunk type") -def get_target_slot_number(): +def get_target_slot_number() -> int: current_slot = subprocess.check_output(["abctl", "--boot_slot"], encoding='utf-8').strip() return 1 if current_slot == "_a" else 0 -def slot_number_to_suffix(slot_number): +def slot_number_to_suffix(slot_number: int) -> str: assert slot_number in (0, 1) return '_a' if slot_number == 0 else '_b' -def get_partition_path(target_slot_number, partition): +def get_partition_path(target_slot_number: int, partition: dict) -> str: path = f"/dev/disk/by-partlabel/{partition['name']}" if partition.get('has_ab', True): @@ -95,7 +95,7 @@ def get_partition_path(target_slot_number, partition): return path -def verify_partition(target_slot_number, partition): +def verify_partition(target_slot_number: int, partition: dict) -> bool: full_check = partition['full_check'] path = get_partition_path(target_slot_number, partition) partition_size = partition['size'] @@ -117,7 +117,7 @@ def verify_partition(target_slot_number, partition): return out.read(64) == partition['hash_raw'].lower().encode() -def clear_partition_hash(target_slot_number, partition): +def clear_partition_hash(target_slot_number: int, partition: dict) -> None: path = get_partition_path(target_slot_number, partition) with open(path, 'wb+') as out: partition_size = partition['size'] @@ -127,7 +127,7 @@ def clear_partition_hash(target_slot_number, partition): os.sync() -def flash_partition(target_slot_number, partition, cloudlog, spinner=None): +def flash_partition(target_slot_number: int, partition: dict, cloudlog, spinner: Optional[Spinner] = None): cloudlog.info(f"Downloading and writing {partition['name']}") if verify_partition(target_slot_number, partition): @@ -176,7 +176,7 @@ def flash_partition(target_slot_number, partition, cloudlog, spinner=None): out.write(partition['hash_raw'].lower().encode()) -def swap(manifest_path, target_slot_number): +def swap(manifest_path: str, target_slot_number: int) -> None: update = json.load(open(manifest_path)) for partition in update: if not partition.get('full_check', False): @@ -185,7 +185,7 @@ def swap(manifest_path, target_slot_number): subprocess.check_output(f"abctl --set_active {target_slot_number}", shell=True) -def flash_agnos_update(manifest_path, target_slot_number, cloudlog, spinner=None): +def flash_agnos_update(manifest_path: str, target_slot_number: int, cloudlog, spinner: Optional[Spinner] = None) -> None: update = json.load(open(manifest_path)) cloudlog.info(f"Target slot {target_slot_number}") @@ -204,7 +204,8 @@ def flash_agnos_update(manifest_path, target_slot_number, cloudlog, spinner=None except requests.exceptions.RequestException: cloudlog.exception("Failed") - spinner.update("Waiting for internet...") + if spinner is not None: + spinner.update("Waiting for internet...") cloudlog.info(f"Failed to download {partition['name']}, retrying ({retries})") time.sleep(10) @@ -215,7 +216,7 @@ def flash_agnos_update(manifest_path, target_slot_number, cloudlog, spinner=None cloudlog.info(f"AGNOS ready on slot {target_slot_number}") -def verify_agnos_update(manifest_path, target_slot_number): +def verify_agnos_update(manifest_path: str, target_slot_number: int) -> bool: update = json.load(open(manifest_path)) return all(verify_partition(target_slot_number, partition) for partition in update)