uploader cleanup (#31035)

* Reapply "uploader cleanup" (#31033)

This reverts commit a2723989bced503a4506684a4a6fdc6c8101f7b2.

* always sleep

* more cleanup

* little more

* fix linter

* little more
old-commit-hash: 0b5fd7287e
chrysler-long2
Adeeb Shihadeh 1 year ago committed by GitHub
parent 09b95b715c
commit d6a8213c80
  1. 28
      system/loggerd/tests/loggerd_tests_common.py
  2. 4
      system/loggerd/tests/test_uploader.py
  3. 163
      system/loggerd/uploader.py

@ -3,10 +3,12 @@ import random
import unittest import unittest
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
from openpilot.system.hardware.hw import Paths
import openpilot.system.loggerd.deleter as deleter import openpilot.system.loggerd.deleter as deleter
import openpilot.system.loggerd.uploader as uploader import openpilot.system.loggerd.uploader as uploader
from openpilot.common.params import Params
from openpilot.system.hardware.hw import Paths
from openpilot.system.loggerd.xattr_cache import setxattr from openpilot.system.loggerd.xattr_cache import setxattr
@ -53,25 +55,6 @@ class MockApiIgnore():
def get_token(self): def get_token(self):
return "fake-token" return "fake-token"
class MockParams():
def __init__(self):
self.params = {
"DongleId": b"0000000000000000",
"IsOffroad": b"1",
}
def get(self, k, block=False, encoding=None):
val = self.params[k]
if encoding is not None:
return val.decode(encoding)
else:
return val
def get_bool(self, k):
val = self.params[k]
return (val == b'1')
class UploaderTestCase(unittest.TestCase): class UploaderTestCase(unittest.TestCase):
f_type = "UNKNOWN" f_type = "UNKNOWN"
@ -86,7 +69,6 @@ class UploaderTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
uploader.Api = MockApi uploader.Api = MockApi
uploader.Params = MockParams
uploader.fake_upload = True uploader.fake_upload = True
uploader.force_wifi = True uploader.force_wifi = True
uploader.allow_sleep = False uploader.allow_sleep = False
@ -95,6 +77,10 @@ class UploaderTestCase(unittest.TestCase):
self.seg_format2 = "2019-05-18--11-22-33--{}" self.seg_format2 = "2019-05-18--11-22-33--{}"
self.seg_dir = self.seg_format.format(self.seg_num) self.seg_dir = self.seg_format.format(self.seg_num)
self.params = Params()
self.params.put("IsOffroad", "1")
self.params.put("DongleId", "0000000000000000")
def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False, def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False,
upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path: upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path:
file_path = Path(Paths.log_root()) / f_dir / fn file_path = Path(Paths.log_root()) / f_dir / fn

@ -10,7 +10,7 @@ from typing import List, Optional
from openpilot.system.hardware.hw import Paths from openpilot.system.hardware.hw import Paths
from openpilot.common.swaglog import cloudlog from openpilot.common.swaglog import cloudlog
from openpilot.system.loggerd.uploader import uploader_fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE from openpilot.system.loggerd.uploader import main, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE
from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase from openpilot.system.loggerd.tests.loggerd_tests_common import UploaderTestCase
@ -45,7 +45,7 @@ class TestUploader(UploaderTestCase):
def start_thread(self): def start_thread(self):
self.end_event = threading.Event() self.end_event = threading.Event()
self.up_thread = threading.Thread(target=uploader_fn, args=[self.end_event]) self.up_thread = threading.Thread(target=main, args=[self.end_event])
self.up_thread.daemon = True self.up_thread.daemon = True
self.up_thread.start() self.up_thread.start()

@ -8,15 +8,13 @@ import requests
import threading import threading
import time import time
import traceback import traceback
from pathlib import Path from typing import BinaryIO, Iterator, List, Optional, Tuple
from typing import BinaryIO, Iterator, List, Optional, Tuple, Union
from cereal import log from cereal import log
import cereal.messaging as messaging import cereal.messaging as messaging
from openpilot.common.api import Api from openpilot.common.api import Api
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.common.realtime import set_core_affinity from openpilot.common.realtime import set_core_affinity
from openpilot.system.hardware import TICI
from openpilot.system.hardware.hw import Paths from openpilot.system.hardware.hw import Paths
from openpilot.system.loggerd.xattr_cache import getxattr, setxattr from openpilot.system.loggerd.xattr_cache import getxattr, setxattr
from openpilot.common.swaglog import cloudlog from openpilot.common.swaglog import cloudlog
@ -43,8 +41,6 @@ class FakeResponse:
self.request = FakeRequest() self.request = FakeRequest()
UploadResponse = Union[requests.Response, FakeResponse]
def get_directory_sort(d: str) -> List[str]: def get_directory_sort(d: str) -> List[str]:
return [s.rjust(10, '0') for s in d.rsplit('--', 1)] return [s.rjust(10, '0') for s in d.rsplit('--', 1)]
@ -74,32 +70,16 @@ class Uploader:
self.api = Api(dongle_id) self.api = Api(dongle_id)
self.root = root self.root = root
self.last_resp: Optional[UploadResponse] = None
self.last_exc: Optional[Tuple[Exception, str]] = None
self.immediate_size = 0
self.immediate_count = 0
# stats for last successfully uploaded file # stats for last successfully uploaded file
self.last_time = 0.0
self.last_speed = 0.0
self.last_filename = "" self.last_filename = ""
self.immediate_folders = ["crash/", "boot/"] self.immediate_folders = ["crash/", "boot/"]
self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1}
def get_upload_sort(self, name: str) -> int:
if name in self.immediate_priority:
return self.immediate_priority[name]
return 1000
def list_upload_files(self) -> Iterator[Tuple[str, str, str]]: def list_upload_files(self) -> Iterator[Tuple[str, str, str]]:
if not os.path.isdir(self.root): if not os.path.isdir(self.root):
return return
self.immediate_size = 0
self.immediate_count = 0
for logname in listdir_by_creation(self.root): for logname in listdir_by_creation(self.root):
path = os.path.join(self.root, logname) path = os.path.join(self.root, logname)
try: try:
@ -110,25 +90,18 @@ class Uploader:
if any(name.endswith(".lock") for name in names): if any(name.endswith(".lock") for name in names):
continue continue
for name in sorted(names, key=self.get_upload_sort): for name in sorted(names, key=lambda n: self.immediate_priority.get(n, 1000)):
key = os.path.join(logname, name) key = os.path.join(logname, name)
fn = os.path.join(path, name) fn = os.path.join(path, name)
# skip files already uploaded # skip files already uploaded
try: try:
is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE is_uploaded = getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE
except OSError: except OSError:
cloudlog.event("uploader_getxattr_failed", exc=self.last_exc, key=key, fn=fn) cloudlog.event("uploader_getxattr_failed", key=key, fn=fn)
is_uploaded = True # deleter could have deleted is_uploaded = True # deleter could have deleted
if is_uploaded: if is_uploaded:
continue continue
try:
if name in self.immediate_priority:
self.immediate_count += 1
self.immediate_size += os.path.getsize(fn)
except OSError:
pass
yield name, key, fn yield name, key, fn
def next_file_to_upload(self) -> Optional[Tuple[str, str, str]]: def next_file_to_upload(self) -> Optional[Tuple[str, str, str]]:
@ -144,45 +117,28 @@ class Uploader:
return None return None
def do_upload(self, key: str, fn: str) -> None: def do_upload(self, key: str, fn: str):
try: url_resp = self.api.get("v1.4/" + self.dongle_id + "/upload_url/", timeout=10, path=key, access_token=self.api.get_token())
url_resp = self.api.get("v1.4/" + self.dongle_id + "/upload_url/", timeout=10, path=key, access_token=self.api.get_token()) if url_resp.status_code == 412:
if url_resp.status_code == 412: return url_resp
self.last_resp = url_resp
return
url_resp_json = json.loads(url_resp.text)
url = url_resp_json['url']
headers = url_resp_json['headers']
cloudlog.debug("upload_url v1.4 %s %s", url, str(headers))
if fake_upload:
cloudlog.debug(f"*** WARNING, THIS IS A FAKE UPLOAD TO {url} ***")
self.last_resp = FakeResponse()
else:
with open(fn, "rb") as f:
data: BinaryIO
if key.endswith('.bz2') and not fn.endswith('.bz2'):
compressed = bz2.compress(f.read())
data = io.BytesIO(compressed)
else:
data = f
self.last_resp = requests.put(url, data=data, headers=headers, timeout=10)
except Exception as e:
self.last_exc = (e, traceback.format_exc())
raise
def normal_upload(self, key: str, fn: str) -> Optional[UploadResponse]:
self.last_resp = None
self.last_exc = None
try: url_resp_json = json.loads(url_resp.text)
self.do_upload(key, fn) url = url_resp_json['url']
except Exception: headers = url_resp_json['headers']
pass cloudlog.debug("upload_url v1.4 %s %s", url, str(headers))
if fake_upload:
return FakeResponse()
return self.last_resp with open(fn, "rb") as f:
data: BinaryIO
if key.endswith('.bz2') and not fn.endswith('.bz2'):
compressed = bz2.compress(f.read())
data = io.BytesIO(compressed)
else:
data = f
return requests.put(url, data=data, headers=headers, timeout=10)
def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool: def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool:
try: try:
@ -201,44 +157,57 @@ class Uploader:
success = True success = True
else: else:
start_time = time.monotonic() start_time = time.monotonic()
stat = self.normal_upload(key, fn)
stat = None
last_exc = None
try:
stat = self.do_upload(key, fn)
except Exception as e:
last_exc = (e, traceback.format_exc())
if stat is not None and stat.status_code in (200, 201, 401, 403, 412): if stat is not None and stat.status_code in (200, 201, 401, 403, 412):
self.last_filename = fn self.last_filename = fn
self.last_time = time.monotonic() - start_time dt = time.monotonic() - start_time
if stat.status_code == 412: if stat.status_code == 412:
self.last_speed = 0
cloudlog.event("upload_ignored", key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) cloudlog.event("upload_ignored", key=key, fn=fn, sz=sz, network_type=network_type, metered=metered)
else: else:
content_length = int(stat.request.headers.get("Content-Length", 0)) content_length = int(stat.request.headers.get("Content-Length", 0))
self.last_speed = (content_length / 1e6) / self.last_time speed = (content_length / 1e6) / dt
cloudlog.event("upload_success", key=key, fn=fn, sz=sz, content_length=content_length, cloudlog.event("upload_success", key=key, fn=fn, sz=sz, content_length=content_length,
network_type=network_type, metered=metered, speed=self.last_speed) network_type=network_type, metered=metered, speed=speed)
success = True success = True
else: else:
success = False success = False
cloudlog.event("upload_failed", stat=stat, exc=self.last_exc, key=key, fn=fn, sz=sz, network_type=network_type, metered=metered) cloudlog.event("upload_failed", stat=stat, exc=last_exc, key=key, fn=fn, sz=sz, network_type=network_type, metered=metered)
if success: if success:
# tag file as uploaded # tag file as uploaded
try: try:
setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE) setxattr(fn, UPLOAD_ATTR_NAME, UPLOAD_ATTR_VALUE)
except OSError: except OSError:
cloudlog.event("uploader_setxattr_failed", exc=self.last_exc, key=key, fn=fn, sz=sz) cloudlog.event("uploader_setxattr_failed", exc=last_exc, key=key, fn=fn, sz=sz)
return success return success
def get_msg(self):
msg = messaging.new_message("uploaderState", valid=True) def step(self, network_type: int, metered: bool) -> bool:
us = msg.uploaderState d = self.next_file_to_upload()
us.immediateQueueSize = int(self.immediate_size / 1e6) if d is None:
us.immediateQueueCount = self.immediate_count return True
us.lastTime = self.last_time
us.lastSpeed = self.last_speed name, key, fn = d
us.lastFilename = self.last_filename
return msg # qlogs and bootlogs need to be compressed before uploading
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')):
key += ".bz2"
return self.upload(name, key, fn, network_type, metered)
def uploader_fn(exit_event: threading.Event) -> None: def main(exit_event: Optional[threading.Event] = None) -> None:
if exit_event is None:
exit_event = threading.Event()
try: try:
set_core_affinity([0, 1, 2, 3]) set_core_affinity([0, 1, 2, 3])
except Exception: except Exception:
@ -253,11 +222,7 @@ def uploader_fn(exit_event: threading.Event) -> None:
cloudlog.info("uploader missing dongle_id") cloudlog.info("uploader missing dongle_id")
raise Exception("uploader can't start without dongle id") raise Exception("uploader can't start without dongle id")
if TICI and not Path("/data/media").is_mount():
cloudlog.warning("NVME not mounted")
sm = messaging.SubMaster(['deviceState']) sm = messaging.SubMaster(['deviceState'])
pm = messaging.PubMaster(['uploaderState'])
uploader = Uploader(dongle_id, Paths.log_root()) uploader = Uploader(dongle_id, Paths.log_root())
backoff = 0.1 backoff = 0.1
@ -270,31 +235,13 @@ def uploader_fn(exit_event: threading.Event) -> None:
time.sleep(60 if offroad else 5) time.sleep(60 if offroad else 5)
continue continue
d = uploader.next_file_to_upload() success = uploader.step(sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered)
if d is None: # Nothing to upload
if allow_sleep:
time.sleep(60 if offroad else 5)
continue
name, key, fn = d
# qlogs and bootlogs need to be compressed before uploading
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')):
key += ".bz2"
success = uploader.upload(name, key, fn, sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered)
if success: if success:
backoff = 0.1 backoff = 0.1
elif allow_sleep: elif allow_sleep:
cloudlog.info("upload backoff %r", backoff) cloudlog.info("upload backoff %r", backoff)
time.sleep(backoff + random.uniform(0, backoff))
backoff = min(backoff*2, 120) backoff = min(backoff*2, 120)
time.sleep(backoff + random.uniform(0, backoff))
pm.send("uploaderState", uploader.get_msg())
def main() -> None:
uploader_fn(threading.Event())
if __name__ == "__main__": if __name__ == "__main__":

Loading…
Cancel
Save