fix agnos updater bug + type hints (#21566)

* add type hints

* fix bug

* more hints
old-commit-hash: 21c7981919
commatwo_master
Adeeb Shihadeh 4 years ago committed by GitHub
parent 1eba7e05d6
commit 55d3871a5e
  1. 23
      selfdrive/hardware/tici/agnos.py

@ -6,7 +6,7 @@ import requests
import struct
import subprocess
import os
from typing import Generator
from typing import Generator, Optional
from common.spinner import Spinner
@ -76,17 +76,17 @@ def unsparsify(f: StreamingDecompressor) -> Generator[bytes, None, None]:
raise Exception("Unhandled sparse chunk type")
def get_target_slot_number():
def get_target_slot_number() -> int:
current_slot = subprocess.check_output(["abctl", "--boot_slot"], encoding='utf-8').strip()
return 1 if current_slot == "_a" else 0
def slot_number_to_suffix(slot_number):
def slot_number_to_suffix(slot_number: int) -> str:
assert slot_number in (0, 1)
return '_a' if slot_number == 0 else '_b'
def get_partition_path(target_slot_number, partition):
def get_partition_path(target_slot_number: int, partition: dict) -> str:
path = f"/dev/disk/by-partlabel/{partition['name']}"
if partition.get('has_ab', True):
@ -95,7 +95,7 @@ def get_partition_path(target_slot_number, partition):
return path
def verify_partition(target_slot_number, partition):
def verify_partition(target_slot_number: int, partition: dict) -> bool:
full_check = partition['full_check']
path = get_partition_path(target_slot_number, partition)
partition_size = partition['size']
@ -117,7 +117,7 @@ def verify_partition(target_slot_number, partition):
return out.read(64) == partition['hash_raw'].lower().encode()
def clear_partition_hash(target_slot_number, partition):
def clear_partition_hash(target_slot_number: int, partition: dict) -> None:
path = get_partition_path(target_slot_number, partition)
with open(path, 'wb+') as out:
partition_size = partition['size']
@ -127,7 +127,7 @@ def clear_partition_hash(target_slot_number, partition):
os.sync()
def flash_partition(target_slot_number, partition, cloudlog, spinner=None):
def flash_partition(target_slot_number: int, partition: dict, cloudlog, spinner: Optional[Spinner] = None):
cloudlog.info(f"Downloading and writing {partition['name']}")
if verify_partition(target_slot_number, partition):
@ -176,7 +176,7 @@ def flash_partition(target_slot_number, partition, cloudlog, spinner=None):
out.write(partition['hash_raw'].lower().encode())
def swap(manifest_path, target_slot_number):
def swap(manifest_path: str, target_slot_number: int) -> None:
update = json.load(open(manifest_path))
for partition in update:
if not partition.get('full_check', False):
@ -185,7 +185,7 @@ def swap(manifest_path, target_slot_number):
subprocess.check_output(f"abctl --set_active {target_slot_number}", shell=True)
def flash_agnos_update(manifest_path, target_slot_number, cloudlog, spinner=None):
def flash_agnos_update(manifest_path: str, target_slot_number: int, cloudlog, spinner: Optional[Spinner] = None) -> None:
update = json.load(open(manifest_path))
cloudlog.info(f"Target slot {target_slot_number}")
@ -204,7 +204,8 @@ def flash_agnos_update(manifest_path, target_slot_number, cloudlog, spinner=None
except requests.exceptions.RequestException:
cloudlog.exception("Failed")
spinner.update("Waiting for internet...")
if spinner is not None:
spinner.update("Waiting for internet...")
cloudlog.info(f"Failed to download {partition['name']}, retrying ({retries})")
time.sleep(10)
@ -215,7 +216,7 @@ def flash_agnos_update(manifest_path, target_slot_number, cloudlog, spinner=None
cloudlog.info(f"AGNOS ready on slot {target_slot_number}")
def verify_agnos_update(manifest_path, target_slot_number):
def verify_agnos_update(manifest_path: str, target_slot_number: int) -> bool:
update = json.load(open(manifest_path))
return all(verify_partition(target_slot_number, partition) for partition in update)

Loading…
Cancel
Save