diff --git a/common/file_helpers.py b/common/file_helpers.py index 29ad219c07..a8ff410779 100644 --- a/common/file_helpers.py +++ b/common/file_helpers.py @@ -1,7 +1,10 @@ +import io import os import tempfile import contextlib +import zstandard as zstd +LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change class CallbackReader: """Wraps a file, but overrides the read method to also @@ -35,3 +38,16 @@ def atomic_write_in_dir(path: str, mode: str = 'w', buffering: int = -1, encodin yield tmp_file tmp_file_name = tmp_file.name os.replace(tmp_file_name, path) + + +def get_upload_stream(filepath: str, compress: bool) -> io.BufferedReader | io.BytesIO: + if not compress: + return open(filepath, "rb") + + # Compress the file on the fly and return a BytesIO stream + stream = io.BytesIO() + compressor = zstd.ZstdCompressor(level=LOG_COMPRESSION_LEVEL) + with open(filepath, "rb") as f: + compressor.copy_stream(f, stream) + stream.seek(0) + return stream diff --git a/selfdrive/debug/qlog_size.py b/selfdrive/debug/qlog_size.py index 11606c7589..6d494b6f75 100755 --- a/selfdrive/debug/qlog_size.py +++ b/selfdrive/debug/qlog_size.py @@ -6,7 +6,7 @@ from collections import defaultdict import matplotlib.pyplot as plt from cereal.services import SERVICE_LIST -from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL +from openpilot.common.file_helpers import LOG_COMPRESSION_LEVEL from openpilot.tools.lib.logreader import LogReader from tqdm import tqdm diff --git a/selfdrive/test/test_onroad.py b/selfdrive/test/test_onroad.py index 1279a505dd..cba485c801 100644 --- a/selfdrive/test/test_onroad.py +++ b/selfdrive/test/test_onroad.py @@ -23,7 +23,7 @@ from openpilot.selfdrive.selfdrived.events import EVENTS, ET from openpilot.selfdrive.test.helpers import set_params_enabled, release_only from openpilot.system.hardware import HARDWARE from openpilot.system.hardware.hw import Paths -from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL +from openpilot.common.file_helpers import LOG_COMPRESSION_LEVEL from openpilot.tools.lib.logreader import LogReader """ diff --git a/system/athena/athenad.py b/system/athena/athenad.py index c926de9784..54e474f6e0 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -14,7 +14,6 @@ import sys import tempfile import threading import time -import zstandard as zstd from dataclasses import asdict, dataclass, replace from datetime import datetime from functools import partial @@ -31,11 +30,10 @@ import cereal.messaging as messaging from cereal import log from cereal.services import SERVICE_LIST from openpilot.common.api import Api -from openpilot.common.file_helpers import CallbackReader +from openpilot.common.file_helpers import CallbackReader, get_upload_stream from openpilot.common.params import Params from openpilot.common.realtime import set_core_affinity from openpilot.system.hardware import HARDWARE, PC -from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL from openpilot.system.loggerd.xattr_cache import getxattr, setxattr from openpilot.common.swaglog import cloudlog from openpilot.system.version import get_build_metadata @@ -294,16 +292,13 @@ def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.R path = strip_zst_extension(path) compress = True - with open(path, "rb") as f: - content = f.read() - if compress: - cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path) - content = zstd.compress(content, LOG_COMPRESSION_LEVEL) - - with io.BytesIO(content) as data: + with get_upload_stream(path, compress) as stream: + stream.seek(0, os.SEEK_END) + content_length = stream.tell() + stream.seek(0) return requests.put(upload_item.url, - data=CallbackReader(data, callback, len(content)) if callback else data, - headers={**upload_item.headers, 'Content-Length': str(len(content))}, + data=CallbackReader(stream, callback, content_length) if callback else stream, + headers={**upload_item.headers, 'Content-Length': str(content_length)}, timeout=30) diff --git a/system/loggerd/uploader.py b/system/loggerd/uploader.py index 0a21712096..4e1799c3df 100755 --- a/system/loggerd/uploader.py +++ b/system/loggerd/uploader.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -import io import json import os import random @@ -8,12 +7,12 @@ import threading import time import traceback import datetime -import zstandard as zstd from collections.abc import Iterator from cereal import log import cereal.messaging as messaging from openpilot.common.api import Api +from openpilot.common.file_helpers import get_upload_stream from openpilot.common.params import Params from openpilot.common.realtime import set_core_affinity from openpilot.system.hardware.hw import Paths @@ -29,7 +28,6 @@ MAX_UPLOAD_SIZES = { # bugs, including ones that can cause massive log sizes "qcam": 5*1e6, } -LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1")) force_wifi = os.getenv("FORCEWIFI") is not None @@ -154,13 +152,9 @@ class Uploader: if fake_upload: return FakeResponse() - with open(fn, "rb") as f: - content = f.read() - if key.endswith('.zst') and not fn.endswith('.zst'): - content = zstd.compress(content, LOG_COMPRESSION_LEVEL) - - with io.BytesIO(content) as data: - return requests.put(url, data=data, headers=headers, timeout=10) + compress = key.endswith('.zst') and not fn.endswith('.zst') + with get_upload_stream(fn, compress) as stream: + return requests.put(url, data=stream, headers=headers, timeout=10) def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool: try: