athena upload: streaming file upload (#34559)

streaming file upload
pull/34462/head
Dean Lee 2 months ago committed by GitHub
parent 4cca971888
commit 27c11eb597
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 21
      common/file_helpers.py
  2. 2
      selfdrive/debug/qlog_size.py
  3. 26
      system/athena/athenad.py
  4. 20
      system/loggerd/uploader.py

@ -1,6 +1,10 @@
import io
import os import os
import tempfile import tempfile
import contextlib import contextlib
import zstandard as zstd
LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change
class CallbackReader: class CallbackReader:
@ -35,3 +39,20 @@ def atomic_write_in_dir(path: str, mode: str = 'w', buffering: int = -1, encodin
yield tmp_file yield tmp_file
tmp_file_name = tmp_file.name tmp_file_name = tmp_file.name
os.replace(tmp_file_name, path) os.replace(tmp_file_name, path)
def get_upload_stream(filepath: str, should_compress: bool) -> tuple[io.BufferedIOBase, int]:
if not should_compress:
file_size = os.path.getsize(filepath)
file_stream = open(filepath, "rb")
return file_stream, file_size
# Compress the file on the fly
compressed_stream = io.BytesIO()
compressor = zstd.ZstdCompressor(level=LOG_COMPRESSION_LEVEL)
with open(filepath, "rb") as f:
compressor.copy_stream(f, compressed_stream)
compressed_size = compressed_stream.tell()
compressed_stream.seek(0)
return compressed_stream, compressed_size

@ -6,7 +6,7 @@ from collections import defaultdict
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from cereal.services import SERVICE_LIST from cereal.services import SERVICE_LIST
from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL from openpilot.common.file_helpers import LOG_COMPRESSION_LEVEL
from openpilot.tools.lib.logreader import LogReader from openpilot.tools.lib.logreader import LogReader
from tqdm import tqdm from tqdm import tqdm

@ -14,7 +14,6 @@ import sys
import tempfile import tempfile
import threading import threading
import time import time
import zstandard as zstd
from dataclasses import asdict, dataclass, replace from dataclasses import asdict, dataclass, replace
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
@ -31,11 +30,10 @@ import cereal.messaging as messaging
from cereal import log from cereal import log
from cereal.services import SERVICE_LIST from cereal.services import SERVICE_LIST
from openpilot.common.api import Api from openpilot.common.api import Api
from openpilot.common.file_helpers import CallbackReader from openpilot.common.file_helpers import CallbackReader, get_upload_stream
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.common.realtime import set_core_affinity from openpilot.common.realtime import set_core_affinity
from openpilot.system.hardware import HARDWARE, PC from openpilot.system.hardware import HARDWARE, PC
from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL
from openpilot.system.loggerd.xattr_cache import getxattr, setxattr from openpilot.system.loggerd.xattr_cache import getxattr, setxattr
from openpilot.common.swaglog import cloudlog from openpilot.common.swaglog import cloudlog
from openpilot.system.version import get_build_metadata from openpilot.system.version import get_build_metadata
@ -294,17 +292,17 @@ def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.R
path = strip_zst_extension(path) path = strip_zst_extension(path)
compress = True compress = True
with open(path, "rb") as f: stream = None
content = f.read() try:
if compress: stream, content_length = get_upload_stream(path, compress)
cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path) response = requests.put(upload_item.url,
content = zstd.compress(content, LOG_COMPRESSION_LEVEL) data=CallbackReader(stream, callback, content_length) if callback else stream,
headers={**upload_item.headers, 'Content-Length': str(content_length)},
with io.BytesIO(content) as data: timeout=30)
return requests.put(upload_item.url, return response
data=CallbackReader(data, callback, len(content)) if callback else data, finally:
headers={**upload_item.headers, 'Content-Length': str(len(content))}, if stream:
timeout=30) stream.close()
# security: user should be able to request any message from their car # security: user should be able to request any message from their car

@ -1,5 +1,4 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import io
import json import json
import os import os
import random import random
@ -8,12 +7,12 @@ import threading
import time import time
import traceback import traceback
import datetime import datetime
import zstandard as zstd
from collections.abc import Iterator from collections.abc import Iterator
from cereal import log from cereal import log
import cereal.messaging as messaging import cereal.messaging as messaging
from openpilot.common.api import Api from openpilot.common.api import Api
from openpilot.common.file_helpers import get_upload_stream
from openpilot.common.params import Params from openpilot.common.params import Params
from openpilot.common.realtime import set_core_affinity from openpilot.common.realtime import set_core_affinity
from openpilot.system.hardware.hw import Paths from openpilot.system.hardware.hw import Paths
@ -29,7 +28,6 @@ MAX_UPLOAD_SIZES = {
# bugs, including ones that can cause massive log sizes # bugs, including ones that can cause massive log sizes
"qcam": 5*1e6, "qcam": 5*1e6,
} }
LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change
allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1")) allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1"))
force_wifi = os.getenv("FORCEWIFI") is not None force_wifi = os.getenv("FORCEWIFI") is not None
@ -154,13 +152,15 @@ class Uploader:
if fake_upload: if fake_upload:
return FakeResponse() return FakeResponse()
with open(fn, "rb") as f: stream = None
content = f.read() try:
if key.endswith('.zst') and not fn.endswith('.zst'): compress = key.endswith('.zst') and not fn.endswith('.zst')
content = zstd.compress(content, LOG_COMPRESSION_LEVEL) stream, _ = get_upload_stream(fn, compress)
response = requests.put(url, data=stream, headers=headers, timeout=10)
with io.BytesIO(content) as data: return response
return requests.put(url, data=data, headers=headers, timeout=10) finally:
if stream:
stream.close()
def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool: def upload(self, name: str, key: str, fn: str, network_type: int, metered: bool) -> bool:
try: try:

Loading…
Cancel
Save