diff --git a/system/loggerd/tests/loggerd_tests_common.py b/system/loggerd/tests/loggerd_tests_common.py index 8bfb571861..3aa9e40531 100644 --- a/system/loggerd/tests/loggerd_tests_common.py +++ b/system/loggerd/tests/loggerd_tests_common.py @@ -3,10 +3,12 @@ import random import unittest from pathlib import Path from typing import Optional -from openpilot.system.hardware.hw import Paths + import openpilot.system.loggerd.deleter as deleter import openpilot.system.loggerd.uploader as uploader +from openpilot.common.params import Params +from openpilot.system.hardware.hw import Paths from openpilot.system.loggerd.xattr_cache import setxattr @@ -53,25 +55,6 @@ class MockApiIgnore(): def get_token(self): return "fake-token" -class MockParams(): - def __init__(self): - self.params = { - "DongleId": b"0000000000000000", - "IsOffroad": b"1", - } - - def get(self, k, block=False, encoding=None): - val = self.params[k] - - if encoding is not None: - return val.decode(encoding) - else: - return val - - def get_bool(self, k): - val = self.params[k] - return (val == b'1') - class UploaderTestCase(unittest.TestCase): f_type = "UNKNOWN" @@ -86,7 +69,6 @@ class UploaderTestCase(unittest.TestCase): def setUp(self): uploader.Api = MockApi - uploader.Params = MockParams uploader.fake_upload = True uploader.force_wifi = True uploader.allow_sleep = False @@ -95,6 +77,10 @@ class UploaderTestCase(unittest.TestCase): self.seg_format2 = "2019-05-18--11-22-33--{}" self.seg_dir = self.seg_format.format(self.seg_num) + self.params = Params() + self.params.put("IsOffroad", "1") + self.params.put("DongleId", "0000000000000000") + def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False, upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path: file_path = Path(Paths.log_root()) / f_dir / fn diff --git a/system/loggerd/tests/test_uploader.py b/system/loggerd/tests/test_uploader.py index 538d99f66f..b674de5438 100755 --- a/system/loggerd/tests/test_uploader.py +++ b/system/loggerd/tests/test_uploader.py @@ -10,7 +10,7 @@ from typing import List, Optional from openpilot.system.hardware.hw import Paths from openpilot.common.swaglog import cloudlog -from openpilot.system.loggerd.uploader import uploader_fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE +from openpilot.system.loggerd.uploader import main, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase @@ -45,7 +45,7 @@ class TestUploader(UploaderTestCase): def start_thread(self): self.end_event = threading.Event() - self.up_thread = threading.Thread(target=uploader_fn, args=[self.end_event]) + self.up_thread = threading.Thread(target=main, args=[self.end_event]) self.up_thread.daemon = True self.up_thread.start() diff --git a/system/loggerd/uploader.py b/system/loggerd/uploader.py index 8f27d4763d..aac5c07e53 100755 --- a/system/loggerd/uploader.py +++ b/system/loggerd/uploader.py @@ -77,9 +77,6 @@ class Uploader: self.last_resp: Optional[UploadResponse] = None self.last_exc: Optional[Tuple[Exception, str]] = None - self.immediate_size = 0 - self.immediate_count = 0 - # stats for last successfully uploaded file self.last_time = 0.0 self.last_speed = 0.0 @@ -88,18 +85,10 @@ class Uploader: self.immediate_folders = ["crash/", "boot/"] self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} - def get_upload_sort(self, name: str) -> int: - if name in self.immediate_priority: - return self.immediate_priority[name] - return 1000 - def list_upload_files(self) -> Iterator[Tuple[str, str, str]]: if not os.path.isdir(self.root): return - self.immediate_size = 0 - self.immediate_count = 0 - for logname in listdir_by_creation(self.root): path = os.path.join(self.root, logname) try: @@ -110,7 +99,7 @@ class Uploader: if any(name.endswith(".lock") for name in names): continue - for name in sorted(names, key=self.get_upload_sort): + for name in sorted(names, key=lambda n: self.immediate_priority.get(n, 1000)): key = os.path.join(logname, name) fn = os.path.join(path, name) # skip files already uploaded @@ -122,13 +111,6 @@ class Uploader: if is_uploaded: continue - try: - if name in self.immediate_priority: - self.immediate_count += 1 - self.immediate_size += os.path.getsize(fn) - except OSError: - pass - yield name, key, fn def next_file_to_upload(self) -> Optional[Tuple[str, str, str]]: @@ -227,18 +209,25 @@ class Uploader: return success - def get_msg(self): - msg = messaging.new_message("uploaderState", valid=True) - us = msg.uploaderState - us.immediateQueueSize = int(self.immediate_size / 1e6) - us.immediateQueueCount = self.immediate_count - us.lastTime = self.last_time - us.lastSpeed = self.last_speed - us.lastFilename = self.last_filename - return msg + + def step(self, network_type: int, metered: bool) -> bool: + d = self.next_file_to_upload() + if d is None: + return True + + name, key, fn = d + + # qlogs and bootlogs need to be compressed before uploading + if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): + key += ".bz2" + + return self.upload(name, key, fn, network_type, metered) -def uploader_fn(exit_event: threading.Event) -> None: +def main(exit_event: Optional[threading.Event] = None) -> None: + if exit_event is None: + exit_event = threading.Event() + try: set_core_affinity([0, 1, 2, 3]) except Exception: @@ -257,7 +246,6 @@ def uploader_fn(exit_event: threading.Event) -> None: cloudlog.warning("NVME not mounted") sm = messaging.SubMaster(['deviceState']) - pm = messaging.PubMaster(['uploaderState']) uploader = Uploader(dongle_id, Paths.log_root()) backoff = 0.1 @@ -270,19 +258,8 @@ def uploader_fn(exit_event: threading.Event) -> None: time.sleep(60 if offroad else 5) continue - d = uploader.next_file_to_upload() - if d is None: # Nothing to upload - if allow_sleep: - time.sleep(60 if offroad else 5) - continue + success = uploader.step(sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) - name, key, fn = d - - # qlogs and bootlogs need to be compressed before uploading - if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): - key += ".bz2" - - success = uploader.upload(name, key, fn, sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) if success: backoff = 0.1 elif allow_sleep: @@ -290,12 +267,6 @@ def uploader_fn(exit_event: threading.Event) -> None: time.sleep(backoff + random.uniform(0, backoff)) backoff = min(backoff*2, 120) - pm.send("uploaderState", uploader.get_msg()) - - -def main() -> None: - uploader_fn(threading.Event()) - if __name__ == "__main__": main()