diff --git a/system/loggerd/uploader.py b/system/loggerd/uploader.py index d4ed6dca28..3604e1481d 100644 --- a/system/loggerd/uploader.py +++ b/system/loggerd/uploader.py @@ -9,6 +9,7 @@ import threading import time import traceback from pathlib import Path +from typing import BinaryIO, Iterator, List, Optional, Tuple, Union from cereal import log import cereal.messaging as messaging @@ -31,10 +32,17 @@ force_wifi = os.getenv("FORCEWIFI") is not None fake_upload = os.getenv("FAKEUPLOAD") is not None -def get_directory_sort(d): +class FakeResponse: + def __init__(self): + self.status_code = 200 + + +UploadResponse = Union[requests.Response, FakeResponse] + +def get_directory_sort(d: str) -> List[str]: return list(map(lambda s: s.rjust(10, '0'), d.rsplit('--', 1))) -def listdir_by_creation(d): +def listdir_by_creation(d: str) -> List[str]: try: paths = os.listdir(d) paths = sorted(paths, key=get_directory_sort) @@ -43,7 +51,7 @@ def listdir_by_creation(d): cloudlog.exception("listdir_by_creation failed") return list() -def clear_locks(root): +def clear_locks(root: str) -> None: for logname in os.listdir(root): path = os.path.join(root, logname) try: @@ -54,16 +62,14 @@ def clear_locks(root): cloudlog.exception("clear_locks failed") -class Uploader(): - def __init__(self, dongle_id, root): +class Uploader: + def __init__(self, dongle_id: str, root: str): self.dongle_id = dongle_id self.api = Api(dongle_id) self.root = root - self.upload_thread = None - - self.last_resp = None - self.last_exc = None + self.last_resp: Optional[UploadResponse] = None + self.last_exc: Optional[Tuple[Exception, str]] = None self.immediate_size = 0 self.immediate_count = 0 @@ -76,12 +82,12 @@ class Uploader(): self.immediate_folders = ["crash/", "boot/"] self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} - def get_upload_sort(self, name): + def get_upload_sort(self, name: str) -> int: if name in self.immediate_priority: return self.immediate_priority[name] return 1000 - def list_upload_files(self): + def list_upload_files(self) -> Iterator[Tuple[str, str, str]]: if not os.path.isdir(self.root): return @@ -103,7 +109,7 @@ class Uploader(): fn = os.path.join(path, name) # skip files already uploaded try: - is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME) + is_uploaded = bool(getxattr(fn, UPLOAD_ATTR_NAME)) except OSError: cloudlog.event("uploader_getxattr_failed", exc=self.last_exc, key=key, fn=fn) is_uploaded = True # deleter could have deleted @@ -117,22 +123,22 @@ class Uploader(): except OSError: pass - yield (name, key, fn) + yield name, key, fn - def next_file_to_upload(self): + def next_file_to_upload(self) -> Optional[Tuple[str, str, str]]: upload_files = list(self.list_upload_files()) for name, key, fn in upload_files: if any(f in fn for f in self.immediate_folders): - return (name, key, fn) + return name, key, fn for name, key, fn in upload_files: if name in self.immediate_priority: - return (name, key, fn) + return name, key, fn return None - def do_upload(self, key, fn): + def do_upload(self, key: str, fn: str) -> None: try: url_resp = self.api.get("v1.4/" + self.dongle_id + "/upload_url/", timeout=10, path=key, access_token=self.api.get_token()) if url_resp.status_code == 412: @@ -146,17 +152,13 @@ class Uploader(): if fake_upload: cloudlog.debug(f"*** WARNING, THIS IS A FAKE UPLOAD TO {url} ***") - - class FakeResponse(): - def __init__(self): - self.status_code = 200 - self.last_resp = FakeResponse() else: with open(fn, "rb") as f: + data: BinaryIO if key.endswith('.bz2') and not fn.endswith('.bz2'): - data = bz2.compress(f.read()) - data = io.BytesIO(data) + compressed = bz2.compress(f.read()) + data = io.BytesIO(compressed) else: data = f @@ -165,7 +167,7 @@ class Uploader(): self.last_exc = (e, traceback.format_exc()) raise - def normal_upload(self, key, fn): + def normal_upload(self, key: str, fn: str) -> Optional[UploadResponse]: self.last_resp = None self.last_exc = None @@ -224,7 +226,7 @@ class Uploader(): return msg -def uploader_fn(exit_event): +def uploader_fn(exit_event: threading.Event) -> None: try: set_core_affinity([0, 1, 2, 3]) except Exception: @@ -279,7 +281,7 @@ def uploader_fn(exit_event): pm.send("uploaderState", uploader.get_msg()) -def main(): +def main() -> None: uploader_fn(threading.Event())