zstd uploader

pull/32736/head
Shane Smiskol 1 year ago
parent 4708332abe
commit 573e2d4c0c
  1. 17
      system/athena/athenad.py
  2. 13
      system/loggerd/uploader.py

@ -2,7 +2,6 @@
from __future__ import annotations from __future__ import annotations
import base64 import base64
import bz2
import hashlib import hashlib
import io import io
import json import json
@ -15,6 +14,7 @@ import sys
import tempfile import tempfile
import threading import threading
import time import time
import zstd
from dataclasses import asdict, dataclass, replace from dataclasses import asdict, dataclass, replace
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
@ -35,6 +35,7 @@ from openpilot.common.file_helpers import CallbackReader
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.common.realtime import set_core_affinity from openpilot.common.realtime import set_core_affinity
from openpilot.system.hardware import HARDWARE, PC from openpilot.system.hardware import HARDWARE, PC
from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL
from openpilot.system.loggerd.xattr_cache import getxattr, setxattr from openpilot.system.loggerd.xattr_cache import getxattr, setxattr
from openpilot.common.swaglog import cloudlog from openpilot.common.swaglog import cloudlog
from openpilot.system.version import get_build_metadata from openpilot.system.version import get_build_metadata
@ -103,8 +104,8 @@ cancelled_uploads: set[str] = set()
cur_upload_items: dict[int, UploadItem | None] = {} cur_upload_items: dict[int, UploadItem | None] = {}
def strip_bz2_extension(fn: str) -> str: def strip_zstd_extension(fn: str) -> str:
if fn.endswith('.bz2'): if fn.endswith('.zstd'):
return fn[:-4] return fn[:-4]
return fn return fn
@ -283,16 +284,16 @@ def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.R
path = upload_item.path path = upload_item.path
compress = False compress = False
# If file does not exist, but does exist without the .bz2 extension we will compress on the fly # If file does not exist, but does exist without the .zstd extension we will compress on the fly
if not os.path.exists(path) and os.path.exists(strip_bz2_extension(path)): if not os.path.exists(path) and os.path.exists(strip_zstd_extension(path)):
path = strip_bz2_extension(path) path = strip_zstd_extension(path)
compress = True compress = True
with open(path, "rb") as f: with open(path, "rb") as f:
content = f.read() content = f.read()
if compress: if compress:
cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path) cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path)
content = bz2.compress(content) content = zstd.compress(content, LOG_COMPRESSION_LEVEL)
with io.BytesIO(content) as data: with io.BytesIO(content) as data:
return requests.put(upload_item.url, return requests.put(upload_item.url,
@ -388,7 +389,7 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo
continue continue
path = os.path.join(Paths.log_root(), file.fn) path = os.path.join(Paths.log_root(), file.fn)
if not os.path.exists(path) and not os.path.exists(strip_bz2_extension(path)): if not os.path.exists(path) and not os.path.exists(strip_zstd_extension(path)):
failed.append(file.fn) failed.append(file.fn)
continue continue

@ -1,5 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import bz2
import io import io
import json import json
import os import os
@ -9,6 +8,7 @@ import threading
import time import time
import traceback import traceback
import datetime import datetime
import zstd
from typing import BinaryIO from typing import BinaryIO
from collections.abc import Iterator from collections.abc import Iterator
@ -26,6 +26,7 @@ UPLOAD_ATTR_NAME = 'user.upload'
UPLOAD_ATTR_VALUE = b'1' UPLOAD_ATTR_VALUE = b'1'
UPLOAD_QLOG_QCAM_MAX_SIZE = 5 * 1e6 # MB UPLOAD_QLOG_QCAM_MAX_SIZE = 5 * 1e6 # MB
LOG_COMPRESSION_LEVEL = 14
allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1")) allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1"))
force_wifi = os.getenv("FORCEWIFI") is not None force_wifi = os.getenv("FORCEWIFI") is not None
@ -83,7 +84,7 @@ class Uploader:
self.last_filename = "" self.last_filename = ""
self.immediate_folders = ["crash/", "boot/"] self.immediate_folders = ["crash/", "boot/"]
self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1} self.immediate_priority = {"qlog": 0, "qlog.zstd": 0, "qcamera.ts": 1}
def list_upload_files(self, metered: bool) -> Iterator[tuple[str, str, str]]: def list_upload_files(self, metered: bool) -> Iterator[tuple[str, str, str]]:
r = self.params.get("AthenadRecentlyViewedRoutes", encoding="utf8") r = self.params.get("AthenadRecentlyViewedRoutes", encoding="utf8")
@ -152,8 +153,8 @@ class Uploader:
with open(fn, "rb") as f: with open(fn, "rb") as f:
data: BinaryIO data: BinaryIO
if key.endswith('.bz2') and not fn.endswith('.bz2'): if key.endswith('.zstd') and not fn.endswith('.zstd'):
compressed = bz2.compress(f.read()) compressed = zstd.compress(f.read(), LOG_COMPRESSION_LEVEL)
data = io.BytesIO(compressed) data = io.BytesIO(compressed)
else: else:
data = f data = f
@ -218,8 +219,8 @@ class Uploader:
name, key, fn = d name, key, fn = d
# qlogs and bootlogs need to be compressed before uploading # qlogs and bootlogs need to be compressed before uploading
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')): if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.zstd')):
key += ".bz2" key += ".zstd"
return self.upload(name, key, fn, network_type, metered) return self.upload(name, key, fn, network_type, metered)

Loading…
Cancel
Save