From 27c11eb597682ba158454d1973af666372d02925 Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Tue, 11 Feb 2025 05:57:31 +0800 Subject: [PATCH] athena upload: streaming file upload (#34559) streaming file upload --- common/file_helpers.py | 21 +++++++++++++++++++++ selfdrive/debug/qlog_size.py | 2 +- system/athena/athenad.py | 26 ++++++++++++-------------- system/loggerd/uploader.py | 20 ++++++++++---------- 4 files changed, 44 insertions(+), 25 deletions(-) diff --git a/common/file_helpers.py b/common/file_helpers.py index 29ad219c07..b0d889f163 100644 --- a/common/file_helpers.py +++ b/common/file_helpers.py @@ -1,6 +1,10 @@ +import io import os import tempfile import contextlib +import zstandard as zstd + +LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change class CallbackReader: @@ -35,3 +39,20 @@ def atomic_write_in_dir(path: str, mode: str = 'w', buffering: int = -1, encodin yield tmp_file tmp_file_name = tmp_file.name os.replace(tmp_file_name, path) + + +def get_upload_stream(filepath: str, should_compress: bool) -> tuple[io.BufferedIOBase, int]: + if not should_compress: + file_size = os.path.getsize(filepath) + file_stream = open(filepath, "rb") + return file_stream, file_size + + # Compress the file on the fly + compressed_stream = io.BytesIO() + compressor = zstd.ZstdCompressor(level=LOG_COMPRESSION_LEVEL) + + with open(filepath, "rb") as f: + compressor.copy_stream(f, compressed_stream) + compressed_size = compressed_stream.tell() + compressed_stream.seek(0) + return compressed_stream, compressed_size diff --git a/selfdrive/debug/qlog_size.py b/selfdrive/debug/qlog_size.py index 11606c7589..6d494b6f75 100755 --- a/selfdrive/debug/qlog_size.py +++ b/selfdrive/debug/qlog_size.py @@ -6,7 +6,7 @@ from collections import defaultdict import matplotlib.pyplot as plt from cereal.services import SERVICE_LIST -from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL +from openpilot.common.file_helpers import LOG_COMPRESSION_LEVEL from openpilot.tools.lib.logreader import LogReader from tqdm import tqdm diff --git a/system/athena/athenad.py b/system/athena/athenad.py index c926de9784..27440e84a6 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -14,7 +14,6 @@ import sys import tempfile import threading import time -import zstandard as zstd from dataclasses import asdict, dataclass, replace from datetime import datetime from functools import partial @@ -31,11 +30,10 @@ import cereal.messaging as messaging from cereal import log from cereal.services import SERVICE_LIST from openpilot.common.api import Api -from openpilot.common.file_helpers import CallbackReader +from openpilot.common.file_helpers import CallbackReader, get_upload_stream from openpilot.common.params import Params from openpilot.common.realtime import set_core_affinity from openpilot.system.hardware import HARDWARE, PC -from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL from openpilot.system.loggerd.xattr_cache import getxattr, setxattr from openpilot.common.swaglog import cloudlog from openpilot.system.version import get_build_metadata @@ -294,17 +292,17 @@ def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.R path = strip_zst_extension(path) compress = True - with open(path, "rb") as f: - content = f.read() - if compress: - cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path) - content = zstd.compress(content, LOG_COMPRESSION_LEVEL) - - with io.BytesIO(content) as data: - return requests.put(upload_item.url, - data=CallbackReader(data, callback, len(content)) if callback else data, - headers={**upload_item.headers, 'Content-Length': str(len(content))}, - timeout=30) + stream = None + try: + stream, content_length = get_upload_stream(path, compress) + response = requests.put(upload_item.url, + data=CallbackReader(stream, callback, content_length) if callback else stream, + headers={**upload_item.headers, 'Content-Length': str(content_length)}, + timeout=30) + return response + finally: + if stream: + stream.close() # security: user should be able to request any message from their car diff --git a/system/loggerd/uploader.py b/system/loggerd/uploader.py index 0a21712096..c1bf804880 100755 --- a/system/loggerd/uploader.py +++ b/system/loggerd/uploader.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -import io import json import os import random @@ -8,12 +7,12 @@ import threading import time import traceback import datetime -import zstandard as zstd from collections.abc import Iterator from cereal import log import cereal.messaging as messaging from openpilot.common.api import Api +from openpilot.common.file_helpers import get_upload_stream from openpilot.common.params import Params from openpilot.common.realtime import set_core_affinity from openpilot.system.hardware.hw import Paths @@ -29,7 +28,6 @@ MAX_UPLOAD_SIZES = { # bugs, including ones that can cause massive log sizes "qcam": 5*1e6, } -LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1")) force_wifi = os.getenv("FORCEWIFI") is not None @@ -154,13 +152,15 @@ class Uploader: if fake_upload: return FakeResponse() - with open(fn, "rb") as f: - content = f.read() - if key.endswith('.zst') and not fn.endswith('.zst'): - content = zstd.compress(content, LOG_COMPRESSION_LEVEL) - - with io.BytesIO(content) as data: - return requests.put(url, data=data, headers=headers, timeout=10) + stream = None + try: + compress = key.endswith('.zst') and not fn.endswith('.zst') + stream, _ = get_upload_stream(fn, compress) + response = requests.put(url, data=stream, headers=headers, timeout=10) + return response + finally: + if stream: + stream.close() def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool: try: