Revert "uploader cleanup" (#31033)

pull/31040/head
Justin Newberry 1 year ago committed by GitHub
parent f6cd009c77
commit 5e4df41b2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 28
      system/loggerd/tests/loggerd_tests_common.py
  2. 4
      system/loggerd/tests/test_uploader.py
  3. 67
      system/loggerd/uploader.py

@ -3,12 +3,10 @@ import random
import unittest import unittest
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
from openpilot.system.hardware.hw import Paths
import openpilot.system.loggerd.deleter as deleter import openpilot.system.loggerd.deleter as deleter
import openpilot.system.loggerd.uploader as uploader import openpilot.system.loggerd.uploader as uploader
from openpilot.common.params import Params
from openpilot.system.hardware.hw import Paths
from openpilot.system.loggerd.xattr_cache import setxattr from openpilot.system.loggerd.xattr_cache import setxattr
@ -55,6 +53,25 @@ class MockApiIgnore():
def get_token(self): def get_token(self):
return "fake-token" return "fake-token"
class MockParams():
def __init__(self):
self.params = {
"DongleId": b"0000000000000000",
"IsOffroad": b"1",
}
def get(self, k, block=False, encoding=None):
val = self.params[k]
if encoding is not None:
return val.decode(encoding)
else:
return val
def get_bool(self, k):
val = self.params[k]
return (val == b'1')
class UploaderTestCase(unittest.TestCase): class UploaderTestCase(unittest.TestCase):
f_type = "UNKNOWN" f_type = "UNKNOWN"
@ -69,6 +86,7 @@ class UploaderTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
uploader.Api = MockApi uploader.Api = MockApi
uploader.Params = MockParams
uploader.fake_upload = True uploader.fake_upload = True
uploader.force_wifi = True uploader.force_wifi = True
uploader.allow_sleep = False uploader.allow_sleep = False
@ -77,10 +95,6 @@ class UploaderTestCase(unittest.TestCase):
self.seg_format2 = "2019-05-18--11-22-33--{}" self.seg_format2 = "2019-05-18--11-22-33--{}"
self.seg_dir = self.seg_format.format(self.seg_num) self.seg_dir = self.seg_format.format(self.seg_num)
self.params = Params()
self.params.put("IsOffroad", "1")
self.params.put("DongleId", "0000000000000000")
def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False, def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False,
upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path: upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path:
file_path = Path(Paths.log_root()) / f_dir / fn file_path = Path(Paths.log_root()) / f_dir / fn

@ -10,7 +10,7 @@ from typing import List, Optional
from openpilot.system.hardware.hw import Paths from openpilot.system.hardware.hw import Paths
from openpilot.common.swaglog import cloudlog from openpilot.common.swaglog import cloudlog
from openpilot.system.loggerd.uploader import main, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE from openpilot.system.loggerd.uploader import uploader_fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE
from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase
@ -45,7 +45,7 @@ class TestUploader(UploaderTestCase):
def start_thread(self): def start_thread(self):
self.end_event = threading.Event() self.end_event = threading.Event()
self.up_thread = threading.Thread(target=main, args=[self.end_event]) self.up_thread = threading.Thread(target=uploader_fn, args=[self.end_event])
self.up_thread.daemon = True self.up_thread.daemon = True
self.up_thread.start() self.up_thread.start()

@ -77,6 +77,9 @@ class Uploader:
self.last_resp: Optional[UploadResponse] = None self.last_resp: Optional[UploadResponse] = None
self.last_exc: Optional[Tuple[Exception, str]] = None self.last_exc: Optional[Tuple[Exception, str]] = None
self.immediate_size = 0
self.immediate_count = 0
# stats for last successfully uploaded file # stats for last successfully uploaded file
self.last_time = 0.0 self.last_time = 0.0
self.last_speed = 0.0 self.last_speed = 0.0
@ -85,10 +88,18 @@ class Uploader:
self.immediate_folders = ["crash/", "boot/"] self.immediate_folders = ["crash/", "boot/"]
self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1}
def get_upload_sort(self, name: str) -> int:
if name in self.immediate_priority:
return self.immediate_priority[name]
return 1000
def list_upload_files(self) -> Iterator[Tuple[str, str, str]]: def list_upload_files(self) -> Iterator[Tuple[str, str, str]]:
if not os.path.isdir(self.root): if not os.path.isdir(self.root):
return return
self.immediate_size = 0
self.immediate_count = 0
for logname in listdir_by_creation(self.root): for logname in listdir_by_creation(self.root):
path = os.path.join(self.root, logname) path = os.path.join(self.root, logname)
try: try:
@ -99,7 +110,7 @@ class Uploader:
if any(name.endswith(".lock") for name in names): if any(name.endswith(".lock") for name in names):
continue continue
for name in sorted(names, key=lambda n: self.immediate_priority.get(n, 1000)): for name in sorted(names, key=self.get_upload_sort):
key = os.path.join(logname, name) key = os.path.join(logname, name)
fn = os.path.join(path, name) fn = os.path.join(path, name)
# skip files already uploaded # skip files already uploaded
@ -111,6 +122,13 @@ class Uploader:
if is_uploaded: if is_uploaded:
continue continue
try:
if name in self.immediate_priority:
self.immediate_count += 1
self.immediate_size += os.path.getsize(fn)
except OSError:
pass
yield name, key, fn yield name, key, fn
def next_file_to_upload(self) -> Optional[Tuple[str, str, str]]: def next_file_to_upload(self) -> Optional[Tuple[str, str, str]]:
@ -209,25 +227,18 @@ class Uploader:
return success return success
def get_msg(self):
def step(self, network_type: int, metered: bool) -> bool: msg = messaging.new_message("uploaderState", valid=True)
d = self.next_file_to_upload() us = msg.uploaderState
if d is None: us.immediateQueueSize = int(self.immediate_size / 1e6)
return True us.immediateQueueCount = self.immediate_count
us.lastTime = self.last_time
name, key, fn = d us.lastSpeed = self.last_speed
us.lastFilename = self.last_filename
# qlogs and bootlogs need to be compressed before uploading return msg
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')):
key += ".bz2"
return self.upload(name, key, fn, network_type, metered)
def main(exit_event: Optional[threading.Event] = None) -> None: def uploader_fn(exit_event: threading.Event) -> None:
if exit_event is None:
exit_event = threading.Event()
try: try:
set_core_affinity([0, 1, 2, 3]) set_core_affinity([0, 1, 2, 3])
except Exception: except Exception:
@ -246,6 +257,7 @@ def main(exit_event: Optional[threading.Event] = None) -> None:
cloudlog.warning("NVME not mounted") cloudlog.warning("NVME not mounted")
sm = messaging.SubMaster(['deviceState']) sm = messaging.SubMaster(['deviceState'])
pm = messaging.PubMaster(['uploaderState'])
uploader = Uploader(dongle_id, Paths.log_root()) uploader = Uploader(dongle_id, Paths.log_root())
backoff = 0.1 backoff = 0.1
@ -258,8 +270,19 @@ def main(exit_event: Optional[threading.Event] = None) -> None:
time.sleep(60 if offroad else 5) time.sleep(60 if offroad else 5)
continue continue
success = uploader.step(sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered) d = uploader.next_file_to_upload()
if d is None: # Nothing to upload
if allow_sleep:
time.sleep(60 if offroad else 5)
continue
name, key, fn = d
# qlogs and bootlogs need to be compressed before uploading
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')):
key += ".bz2"
success = uploader.upload(name, key, fn, sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered)
if success: if success:
backoff = 0.1 backoff = 0.1
elif allow_sleep: elif allow_sleep:
@ -267,6 +290,12 @@ def main(exit_event: Optional[threading.Event] = None) -> None:
time.sleep(backoff + random.uniform(0, backoff)) time.sleep(backoff + random.uniform(0, backoff))
backoff = min(backoff*2, 120) backoff = min(backoff*2, 120)
pm.send("uploaderState", uploader.get_msg())
def main() -> None:
uploader_fn(threading.Event())
if __name__ == "__main__": if __name__ == "__main__":
main() main()

Loading…
Cancel
Save