diff --git a/system/loggerd/tests/loggerd_tests_common.py b/system/loggerd/tests/loggerd_tests_common.py index 3aa9e40531..8bfb571861 100644 --- a/system/loggerd/tests/loggerd_tests_common.py +++ b/system/loggerd/tests/loggerd_tests_common.py @@ -3,12 +3,10 @@ import random import unittest from pathlib import Path from typing import Optional - +from openpilot.system.hardware.hw import Paths import openpilot.system.loggerd.deleter as deleter import openpilot.system.loggerd.uploader as uploader -from openpilot.common.params import Params -from openpilot.system.hardware.hw import Paths from openpilot.system.loggerd.xattr_cache import setxattr @@ -55,6 +53,25 @@ class MockApiIgnore(): def get_token(self): return "fake-token" +class MockParams(): + def __init__(self): + self.params = { + "DongleId": b"0000000000000000", + "IsOffroad": b"1", + } + + def get(self, k, block=False, encoding=None): + val = self.params[k] + + if encoding is not None: + return val.decode(encoding) + else: + return val + + def get_bool(self, k): + val = self.params[k] + return (val == b'1') + class UploaderTestCase(unittest.TestCase): f_type = "UNKNOWN" @@ -69,6 +86,7 @@ class UploaderTestCase(unittest.TestCase): def setUp(self): uploader.Api = MockApi + uploader.Params = MockParams uploader.fake_upload = True uploader.force_wifi = True uploader.allow_sleep = False @@ -77,10 +95,6 @@ class UploaderTestCase(unittest.TestCase): self.seg_format2 = "2019-05-18--11-22-33--{}" self.seg_dir = self.seg_format.format(self.seg_num) - self.params = Params() - self.params.put("IsOffroad", "1") - self.params.put("DongleId", "0000000000000000") - def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False, upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path: file_path = Path(Paths.log_root()) / f_dir / fn diff --git a/system/loggerd/tests/test_uploader.py b/system/loggerd/tests/test_uploader.py index b674de5438..538d99f66f 100755 --- a/system/loggerd/tests/test_uploader.py +++ b/system/loggerd/tests/test_uploader.py @@ -10,7 +10,7 @@ from typing import List, Optional from openpilot.system.hardware.hw import Paths from openpilot.common.swaglog import cloudlog -from openpilot.system.loggerd.uploader import main, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE +from openpilot.system.loggerd.uploader import uploader_fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase @@ -45,7 +45,7 @@ class TestUploader(UploaderTestCase): def start_thread(self): self.end_event = threading.Event() - self.up_thread = threading.Thread(target=main, args=[self.end_event]) + self.up_thread = threading.Thread(target=uploader_fn, args=[self.end_event]) self.up_thread.daemon = True self.up_thread.start() diff --git a/system/loggerd/uploader.py b/system/loggerd/uploader.py index aac5c07e53..8f27d4763d 100755 --- a/system/loggerd/uploader.py +++ b/system/loggerd/uploader.py @@ -77,6 +77,9 @@ class Uploader: self.last_resp: Optional[UploadResponse] = None self.last_exc: Optional[Tuple[Exception, str]] = None + self.immediate_size = 0 + self.immediate_count = 0 + # stats for last successfully uploaded file self.last_time = 0.0 self.last_speed = 0.0 @@ -85,10 +88,18 @@ class Uploader: self.immediate_folders = ["crash/", "boot/"] self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} + def get_upload_sort(self, name: str) -> int: + if name in self.immediate_priority: + return self.immediate_priority[name] + return 1000 + def list_upload_files(self) -> Iterator[Tuple[str, str, str]]: if not os.path.isdir(self.root): return + self.immediate_size = 0 + self.immediate_count = 0 + for logname in listdir_by_creation(self.root): path = os.path.join(self.root, logname) try: @@ -99,7 +110,7 @@ class Uploader: if any(name.endswith(".lock") for name in names): continue - for name in sorted(names, key=lambda n: self.immediate_priority.get(n, 1000)): + for name in sorted(names, key=self.get_upload_sort): key = os.path.join(logname, name) fn = os.path.join(path, name) # skip files already uploaded @@ -111,6 +122,13 @@ class Uploader: if is_uploaded: continue + try: + if name in self.immediate_priority: + self.immediate_count += 1 + self.immediate_size += os.path.getsize(fn) + except OSError: + pass + yield name, key, fn def next_file_to_upload(self) -> Optional[Tuple[str, str, str]]: @@ -209,25 +227,18 @@ class Uploader: return success - - def step(self, network_type: int, metered: bool) -> bool: - d = self.next_file_to_upload() - if d is None: - return True - - name, key, fn = d - - # qlogs and bootlogs need to be compressed before uploading - if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): - key += ".bz2" - - return self.upload(name, key, fn, network_type, metered) + def get_msg(self): + msg = messaging.new_message("uploaderState", valid=True) + us = msg.uploaderState + us.immediateQueueSize = int(self.immediate_size / 1e6) + us.immediateQueueCount = self.immediate_count + us.lastTime = self.last_time + us.lastSpeed = self.last_speed + us.lastFilename = self.last_filename + return msg -def main(exit_event: Optional[threading.Event] = None) -> None: - if exit_event is None: - exit_event = threading.Event() - +def uploader_fn(exit_event: threading.Event) -> None: try: set_core_affinity([0, 1, 2, 3]) except Exception: @@ -246,6 +257,7 @@ def main(exit_event: Optional[threading.Event] = None) -> None: cloudlog.warning("NVME not mounted") sm = messaging.SubMaster(['deviceState']) + pm = messaging.PubMaster(['uploaderState']) uploader = Uploader(dongle_id, Paths.log_root()) backoff = 0.1 @@ -258,8 +270,19 @@ def main(exit_event: Optional[threading.Event] = None) -> None: time.sleep(60 if offroad else 5) continue - success = uploader.step(sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) + d = uploader.next_file_to_upload() + if d is None: # Nothing to upload + if allow_sleep: + time.sleep(60 if offroad else 5) + continue + name, key, fn = d + + # qlogs and bootlogs need to be compressed before uploading + if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): + key += ".bz2" + + success = uploader.upload(name, key, fn, sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) if success: backoff = 0.1 elif allow_sleep: @@ -267,6 +290,12 @@ def main(exit_event: Optional[threading.Event] = None) -> None: time.sleep(backoff + random.uniform(0, backoff)) backoff = min(backoff*2, 120) + pm.send("uploaderState", uploader.get_msg()) + + +def main() -> None: + uploader_fn(threading.Event()) + if __name__ == "__main__": main()