From 9bc35c091959dbca6a02fc6d9f6dca6b2d41b7d9 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Thu, 6 Feb 2025 11:50:04 -0800 Subject: [PATCH] Revert "athena upload: reduce memory usage and improve efficiency with streaming (#34528)" This reverts commit 4c65f51a55144d4ad6d1233b8730f4cac98b2fbf. --- common/file_helpers.py | 16 ---------------- selfdrive/debug/qlog_size.py | 2 +- selfdrive/test/test_onroad.py | 2 +- system/athena/athenad.py | 19 ++++++++++++------- system/loggerd/uploader.py | 14 ++++++++++---- 5 files changed, 24 insertions(+), 29 deletions(-) diff --git a/common/file_helpers.py b/common/file_helpers.py index a8ff410779..29ad219c07 100644 --- a/common/file_helpers.py +++ b/common/file_helpers.py @@ -1,10 +1,7 @@ -import io import os import tempfile import contextlib -import zstandard as zstd -LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change class CallbackReader: """Wraps a file, but overrides the read method to also @@ -38,16 +35,3 @@ def atomic_write_in_dir(path: str, mode: str = 'w', buffering: int = -1, encodin yield tmp_file tmp_file_name = tmp_file.name os.replace(tmp_file_name, path) - - -def get_upload_stream(filepath: str, compress: bool) -> io.BufferedReader | io.BytesIO: - if not compress: - return open(filepath, "rb") - - # Compress the file on the fly and return a BytesIO stream - stream = io.BytesIO() - compressor = zstd.ZstdCompressor(level=LOG_COMPRESSION_LEVEL) - with open(filepath, "rb") as f: - compressor.copy_stream(f, stream) - stream.seek(0) - return stream diff --git a/selfdrive/debug/qlog_size.py b/selfdrive/debug/qlog_size.py index 6d494b6f75..11606c7589 100755 --- a/selfdrive/debug/qlog_size.py +++ b/selfdrive/debug/qlog_size.py @@ -6,7 +6,7 @@ from collections import defaultdict import matplotlib.pyplot as plt from cereal.services import SERVICE_LIST -from openpilot.common.file_helpers import LOG_COMPRESSION_LEVEL +from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL from openpilot.tools.lib.logreader import LogReader from tqdm import tqdm diff --git a/selfdrive/test/test_onroad.py b/selfdrive/test/test_onroad.py index cba485c801..1279a505dd 100644 --- a/selfdrive/test/test_onroad.py +++ b/selfdrive/test/test_onroad.py @@ -23,7 +23,7 @@ from openpilot.selfdrive.selfdrived.events import EVENTS, ET from openpilot.selfdrive.test.helpers import set_params_enabled, release_only from openpilot.system.hardware import HARDWARE from openpilot.system.hardware.hw import Paths -from openpilot.common.file_helpers import LOG_COMPRESSION_LEVEL +from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL from openpilot.tools.lib.logreader import LogReader """ diff --git a/system/athena/athenad.py b/system/athena/athenad.py index 54e474f6e0..c926de9784 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -14,6 +14,7 @@ import sys import tempfile import threading import time +import zstandard as zstd from dataclasses import asdict, dataclass, replace from datetime import datetime from functools import partial @@ -30,10 +31,11 @@ import cereal.messaging as messaging from cereal import log from cereal.services import SERVICE_LIST from openpilot.common.api import Api -from openpilot.common.file_helpers import CallbackReader, get_upload_stream +from openpilot.common.file_helpers import CallbackReader from openpilot.common.params import Params from openpilot.common.realtime import set_core_affinity from openpilot.system.hardware import HARDWARE, PC +from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL from openpilot.system.loggerd.xattr_cache import getxattr, setxattr from openpilot.common.swaglog import cloudlog from openpilot.system.version import get_build_metadata @@ -292,13 +294,16 @@ def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.R path = strip_zst_extension(path) compress = True - with get_upload_stream(path, compress) as stream: - stream.seek(0, os.SEEK_END) - content_length = stream.tell() - stream.seek(0) + with open(path, "rb") as f: + content = f.read() + if compress: + cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path) + content = zstd.compress(content, LOG_COMPRESSION_LEVEL) + + with io.BytesIO(content) as data: return requests.put(upload_item.url, - data=CallbackReader(stream, callback, content_length) if callback else stream, - headers={**upload_item.headers, 'Content-Length': str(content_length)}, + data=CallbackReader(data, callback, len(content)) if callback else data, + headers={**upload_item.headers, 'Content-Length': str(len(content))}, timeout=30) diff --git a/system/loggerd/uploader.py b/system/loggerd/uploader.py index 4e1799c3df..0a21712096 100755 --- a/system/loggerd/uploader.py +++ b/system/loggerd/uploader.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import io import json import os import random @@ -7,12 +8,12 @@ import threading import time import traceback import datetime +import zstandard as zstd from collections.abc import Iterator from cereal import log import cereal.messaging as messaging from openpilot.common.api import Api -from openpilot.common.file_helpers import get_upload_stream from openpilot.common.params import Params from openpilot.common.realtime import set_core_affinity from openpilot.system.hardware.hw import Paths @@ -28,6 +29,7 @@ MAX_UPLOAD_SIZES = { # bugs, including ones that can cause massive log sizes "qcam": 5*1e6, } +LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1")) force_wifi = os.getenv("FORCEWIFI") is not None @@ -152,9 +154,13 @@ class Uploader: if fake_upload: return FakeResponse() - compress = key.endswith('.zst') and not fn.endswith('.zst') - with get_upload_stream(fn, compress) as stream: - return requests.put(url, data=stream, headers=headers, timeout=10) + with open(fn, "rb") as f: + content = f.read() + if key.endswith('.zst') and not fn.endswith('.zst'): + content = zstd.compress(content, LOG_COMPRESSION_LEVEL) + + with io.BytesIO(content) as data: + return requests.put(url, data=data, headers=headers, timeout=10) def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool: try: