uploader: add typing (#28001)

* uploader: add typing

* fix types
old-commit-hash: a27ce83b28
beeps
Cameron Clough 2 years ago committed by GitHub
parent 650ed84920
commit 6626f3036f
  1. 56
      system/loggerd/uploader.py

@ -9,6 +9,7 @@ import threading
import time
import traceback
from pathlib import Path
from typing import BinaryIO, Iterator, List, Optional, Tuple, Union
from cereal import log
import cereal.messaging as messaging
@ -31,10 +32,17 @@ force_wifi = os.getenv("FORCEWIFI") is not None
fake_upload = os.getenv("FAKEUPLOAD") is not None
def get_directory_sort(d):
class FakeResponse:
def __init__(self):
self.status_code = 200
UploadResponse = Union[requests.Response, FakeResponse]
def get_directory_sort(d: str) -> List[str]:
return list(map(lambda s: s.rjust(10, '0'), d.rsplit('--', 1)))
def listdir_by_creation(d):
def listdir_by_creation(d: str) -> List[str]:
try:
paths = os.listdir(d)
paths = sorted(paths, key=get_directory_sort)
@ -43,7 +51,7 @@ def listdir_by_creation(d):
cloudlog.exception("listdir_by_creation failed")
return list()
def clear_locks(root):
def clear_locks(root: str) -> None:
for logname in os.listdir(root):
path = os.path.join(root, logname)
try:
@ -54,16 +62,14 @@ def clear_locks(root):
cloudlog.exception("clear_locks failed")
class Uploader():
def __init__(self, dongle_id, root):
class Uploader:
def __init__(self, dongle_id: str, root: str):
self.dongle_id = dongle_id
self.api = Api(dongle_id)
self.root = root
self.upload_thread = None
self.last_resp = None
self.last_exc = None
self.last_resp: Optional[UploadResponse] = None
self.last_exc: Optional[Tuple[Exception, str]] = None
self.immediate_size = 0
self.immediate_count = 0
@ -76,12 +82,12 @@ class Uploader():
self.immediate_folders = ["crash/", "boot/"]
self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1}
def get_upload_sort(self, name):
def get_upload_sort(self, name: str) -> int:
if name in self.immediate_priority:
return self.immediate_priority[name]
return 1000
def list_upload_files(self):
def list_upload_files(self) -> Iterator[Tuple[str, str, str]]:
if not os.path.isdir(self.root):
return
@ -103,7 +109,7 @@ class Uploader():
fn = os.path.join(path, name)
# skip files already uploaded
try:
is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME)
is_uploaded = bool(getxattr(fn, UPLOAD_ATTR_NAME))
except OSError:
cloudlog.event("uploader_getxattr_failed", exc=self.last_exc, key=key, fn=fn)
is_uploaded = True # deleter could have deleted
@ -117,22 +123,22 @@ class Uploader():
except OSError:
pass
yield (name, key, fn)
yield name, key, fn
def next_file_to_upload(self):
def next_file_to_upload(self) -> Optional[Tuple[str, str, str]]:
upload_files = list(self.list_upload_files())
for name, key, fn in upload_files:
if any(f in fn for f in self.immediate_folders):
return (name, key, fn)
return name, key, fn
for name, key, fn in upload_files:
if name in self.immediate_priority:
return (name, key, fn)
return name, key, fn
return None
def do_upload(self, key, fn):
def do_upload(self, key: str, fn: str) -> None:
try:
url_resp = self.api.get("v1.4/" + self.dongle_id + "/upload_url/", timeout=10, path=key, access_token=self.api.get_token())
if url_resp.status_code == 412:
@ -146,17 +152,13 @@ class Uploader():
if fake_upload:
cloudlog.debug(f"*** WARNING, THIS IS A FAKE UPLOAD TO {url} ***")
class FakeResponse():
def __init__(self):
self.status_code = 200
self.last_resp = FakeResponse()
else:
with open(fn, "rb") as f:
data: BinaryIO
if key.endswith('.bz2') and not fn.endswith('.bz2'):
data = bz2.compress(f.read())
data = io.BytesIO(data)
compressed = bz2.compress(f.read())
data = io.BytesIO(compressed)
else:
data = f
@ -165,7 +167,7 @@ class Uploader():
self.last_exc = (e, traceback.format_exc())
raise
def normal_upload(self, key, fn):
def normal_upload(self, key: str, fn: str) -> Optional[UploadResponse]:
self.last_resp = None
self.last_exc = None
@ -224,7 +226,7 @@ class Uploader():
return msg
def uploader_fn(exit_event):
def uploader_fn(exit_event: threading.Event) -> None:
try:
set_core_affinity([0, 1, 2, 3])
except Exception:
@ -279,7 +281,7 @@ def uploader_fn(exit_event):
pm.send("uploaderState", uploader.get_msg())
def main():
def main() -> None:
uploader_fn(threading.Event())

Loading…
Cancel
Save