From d6a8213c804119c00ba32b7c57053dae8db0c02e Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Wed, 17 Jan 2024 14:24:09 -0800 Subject: [PATCH] uploader cleanup (#31035) * Reapply "uploader cleanup" (#31033) This reverts commit a2723989bced503a4506684a4a6fdc6c8101f7b2. * always sleep * more cleanup * little more * fix linter * little more old-commit-hash: 0b5fd7287e715707e32b7e6978e296918dd34234 --- system/loggerd/tests/loggerd_tests_common.py | 28 +--- system/loggerd/tests/test_uploader.py | 4 +- system/loggerd/uploader.py | 163 +++++++------------ 3 files changed, 64 insertions(+), 131 deletions(-) diff --git a/system/loggerd/tests/loggerd_tests_common.py b/system/loggerd/tests/loggerd_tests_common.py index 8bfb571861..3aa9e40531 100644 --- a/system/loggerd/tests/loggerd_tests_common.py +++ b/system/loggerd/tests/loggerd_tests_common.py @@ -3,10 +3,12 @@ import random import unittest from pathlib import Path from typing import Optional -from openpilot.system.hardware.hw import Paths + import openpilot.system.loggerd.deleter as deleter import openpilot.system.loggerd.uploader as uploader +from openpilot.common.params import Params +from openpilot.system.hardware.hw import Paths from openpilot.system.loggerd.xattr_cache import setxattr @@ -53,25 +55,6 @@ class MockApiIgnore(): def get_token(self): return "fake-token" -class MockParams(): - def __init__(self): - self.params = { - "DongleId": b"0000000000000000", - "IsOffroad": b"1", - } - - def get(self, k, block=False, encoding=None): - val = self.params[k] - - if encoding is not None: - return val.decode(encoding) - else: - return val - - def get_bool(self, k): - val = self.params[k] - return (val == b'1') - class UploaderTestCase(unittest.TestCase): f_type = "UNKNOWN" @@ -86,7 +69,6 @@ class UploaderTestCase(unittest.TestCase): def setUp(self): uploader.Api = MockApi - uploader.Params = MockParams uploader.fake_upload = True uploader.force_wifi = True uploader.allow_sleep = False @@ -95,6 +77,10 @@ class UploaderTestCase(unittest.TestCase): self.seg_format2 = "2019-05-18--11-22-33--{}" self.seg_dir = self.seg_format.format(self.seg_num) + self.params = Params() + self.params.put("IsOffroad", "1") + self.params.put("DongleId", "0000000000000000") + def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False, upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path: file_path = Path(Paths.log_root()) / f_dir / fn diff --git a/system/loggerd/tests/test_uploader.py b/system/loggerd/tests/test_uploader.py index 538d99f66f..b674de5438 100755 --- a/system/loggerd/tests/test_uploader.py +++ b/system/loggerd/tests/test_uploader.py @@ -10,7 +10,7 @@ from typing import List, Optional from openpilot.system.hardware.hw import Paths from openpilot.common.swaglog import cloudlog -from openpilot.system.loggerd.uploader import uploader_fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE +from openpilot.system.loggerd.uploader import main, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase @@ -45,7 +45,7 @@ class TestUploader(UploaderTestCase): def start_thread(self): self.end_event = threading.Event() - self.up_thread = threading.Thread(target=uploader_fn, args=[self.end_event]) + self.up_thread = threading.Thread(target=main, args=[self.end_event]) self.up_thread.daemon = True self.up_thread.start() diff --git a/system/loggerd/uploader.py b/system/loggerd/uploader.py index 8f27d4763d..65259d1e45 100755 --- a/system/loggerd/uploader.py +++ b/system/loggerd/uploader.py @@ -8,15 +8,13 @@ import requests import threading import time import traceback -from pathlib import Path -from typing import BinaryIO, Iterator, List, Optional, Tuple, Union +from typing import BinaryIO, Iterator, List, Optional, Tuple from cereal import log import cereal.messaging as messaging from openpilot.common.api import Api from openpilot.common.params import Params from openpilot.common.realtime import set_core_affinity -from openpilot.system.hardware import TICI from openpilot.system.hardware.hw import Paths from openpilot.system.loggerd.xattr_cache import getxattr, setxattr from openpilot.common.swaglog import cloudlog @@ -43,8 +41,6 @@ class FakeResponse: self.request = FakeRequest() -UploadResponse = Union[requests.Response, FakeResponse] - def get_directory_sort(d: str) -> List[str]: return [s.rjust(10, '0') for s in d.rsplit('--', 1)] @@ -74,32 +70,16 @@ class Uploader: self.api = Api(dongle_id) self.root = root - self.last_resp: Optional[UploadResponse] = None - self.last_exc: Optional[Tuple[Exception, str]] = None - - self.immediate_size = 0 - self.immediate_count = 0 - # stats for last successfully uploaded file - self.last_time = 0.0 - self.last_speed = 0.0 self.last_filename = "" self.immediate_folders = ["crash/", "boot/"] self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} - def get_upload_sort(self, name: str) -> int: - if name in self.immediate_priority: - return self.immediate_priority[name] - return 1000 - def list_upload_files(self) -> Iterator[Tuple[str, str, str]]: if not os.path.isdir(self.root): return - self.immediate_size = 0 - self.immediate_count = 0 - for logname in listdir_by_creation(self.root): path = os.path.join(self.root, logname) try: @@ -110,25 +90,18 @@ class Uploader: if any(name.endswith(".lock") for name in names): continue - for name in sorted(names, key=self.get_upload_sort): + for name in sorted(names, key=lambda n: self.immediate_priority.get(n, 1000)): key = os.path.join(logname, name) fn = os.path.join(path, name) # skip files already uploaded try: is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE except OSError: - cloudlog.event("uploader_getxattr_failed", exc=self.last_exc, key=key, fn=fn) + cloudlog.event("uploader_getxattr_failed", key=key, fn=fn) is_uploaded = True # deleter could have deleted if is_uploaded: continue - try: - if name in self.immediate_priority: - self.immediate_count += 1 - self.immediate_size += os.path.getsize(fn) - except OSError: - pass - yield name, key, fn def next_file_to_upload(self) -> Optional[Tuple[str, str, str]]: @@ -144,45 +117,28 @@ class Uploader: return None - def do_upload(self, key: str, fn: str) -> None: - try: - url_resp = self.api.get("v1.4/" + self.dongle_id + "/upload_url/", timeout=10, path=key, access_token=self.api.get_token()) - if url_resp.status_code == 412: - self.last_resp = url_resp - return - - url_resp_json = json.loads(url_resp.text) - url = url_resp_json['url'] - headers = url_resp_json['headers'] - cloudlog.debug("upload_url v1.4 %s %s", url, str(headers)) - - if fake_upload: - cloudlog.debug(f"*** WARNING, THIS IS A FAKE UPLOAD TO {url} ***") - self.last_resp = FakeResponse() - else: - with open(fn, "rb") as f: - data: BinaryIO - if key.endswith('.bz2') and not fn.endswith('.bz2'): - compressed = bz2.compress(f.read()) - data = io.BytesIO(compressed) - else: - data = f - - self.last_resp = requests.put(url, data=data, headers=headers, timeout=10) - except Exception as e: - self.last_exc = (e, traceback.format_exc()) - raise - - def normal_upload(self, key: str, fn: str) -> Optional[UploadResponse]: - self.last_resp = None - self.last_exc = None + def do_upload(self, key: str, fn: str): + url_resp = self.api.get("v1.4/" + self.dongle_id + "/upload_url/", timeout=10, path=key, access_token=self.api.get_token()) + if url_resp.status_code == 412: + return url_resp - try: - self.do_upload(key, fn) - except Exception: - pass + url_resp_json = json.loads(url_resp.text) + url = url_resp_json['url'] + headers = url_resp_json['headers'] + cloudlog.debug("upload_url v1.4 %s %s", url, str(headers)) + + if fake_upload: + return FakeResponse() - return self.last_resp + with open(fn, "rb") as f: + data: BinaryIO + if key.endswith('.bz2') and not fn.endswith('.bz2'): + compressed = bz2.compress(f.read()) + data = io.BytesIO(compressed) + else: + data = f + + return requests.put(url, data=data, headers=headers, timeout=10) def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool: try: @@ -201,44 +157,57 @@ class Uploader: success = True else: start_time = time.monotonic() - stat = self.normal_upload(key, fn) + + stat = None + last_exc = None + try: + stat = self.do_upload(key, fn) + except Exception as e: + last_exc = (e, traceback.format_exc()) + if stat is not None and stat.status_code in (200, 201, 401, 403, 412): self.last_filename = fn - self.last_time = time.monotonic() - start_time + dt = time.monotonic() - start_time if stat.status_code == 412: - self.last_speed = 0 cloudlog.event("upload_ignored", key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) else: content_length = int(stat.request.headers.get("Content-Length", 0)) - self.last_speed = (content_length / 1e6) / self.last_time + speed = (content_length / 1e6) / dt cloudlog.event("upload_success", key=key, fn=fn, sz=sz, content_length=content_length, - network_type=network_type, metered=metered, speed=self.last_speed) + network_type=network_type, metered=metered, speed=speed) success = True else: success = False - cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) + cloudlog.event("upload_failed", stat=stat, exc=last_exc, key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) if success: # tag file as uploaded try: setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE) except OSError: - cloudlog.event("uploader_setxattr_failed", exc=self.last_exc, key=key, fn=fn, sz=sz) + cloudlog.event("uploader_setxattr_failed", exc=last_exc, key=key, fn=fn, sz=sz) return success - def get_msg(self): - msg = messaging.new_message("uploaderState", valid=True) - us = msg.uploaderState - us.immediateQueueSize = int(self.immediate_size / 1e6) - us.immediateQueueCount = self.immediate_count - us.lastTime = self.last_time - us.lastSpeed = self.last_speed - us.lastFilename = self.last_filename - return msg + + def step(self, network_type: int, metered: bool) -> bool: + d = self.next_file_to_upload() + if d is None: + return True + + name, key, fn = d + + # qlogs and bootlogs need to be compressed before uploading + if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): + key += ".bz2" + + return self.upload(name, key, fn, network_type, metered) -def uploader_fn(exit_event: threading.Event) -> None: +def main(exit_event: Optional[threading.Event] = None) -> None: + if exit_event is None: + exit_event = threading.Event() + try: set_core_affinity([0, 1, 2, 3]) except Exception: @@ -253,11 +222,7 @@ def uploader_fn(exit_event: threading.Event) -> None: cloudlog.info("uploader missing dongle_id") raise Exception("uploader can't start without dongle id") - if TICI and not Path("/data/media").is_mount(): - cloudlog.warning("NVME not mounted") - sm = messaging.SubMaster(['deviceState']) - pm = messaging.PubMaster(['uploaderState']) uploader = Uploader(dongle_id, Paths.log_root()) backoff = 0.1 @@ -270,31 +235,13 @@ def uploader_fn(exit_event: threading.Event) -> None: time.sleep(60 if offroad else 5) continue - d = uploader.next_file_to_upload() - if d is None: # Nothing to upload - if allow_sleep: - time.sleep(60 if offroad else 5) - continue - - name, key, fn = d - - # qlogs and bootlogs need to be compressed before uploading - if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): - key += ".bz2" - - success = uploader.upload(name, key, fn, sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) + success = uploader.step(sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) if success: backoff = 0.1 elif allow_sleep: cloudlog.info("upload backoff %r", backoff) - time.sleep(backoff + random.uniform(0, backoff)) backoff = min(backoff*2, 120) - - pm.send("uploaderState", uploader.get_msg()) - - -def main() -> None: - uploader_fn(threading.Event()) + time.sleep(backoff + random.uniform(0, backoff)) if __name__ == "__main__":