uploader: compress with zstd (#32736)

* zstd uploader

* fix that

* fix name of function

* comment

* log failed

* fix comma_api_source for routes with both bz2 and zst rlogs

* TODO

* 10-14 achieves almost no benefit on qlogs in a few cases, but takes 2x the time

* these aren't written out

* regen: specify any list of sources

ooh this is pretty nice

* regen and process replay

* damn, actually we don't need all this (cool tho)

Revert "regen: specify any list of sources"

This reverts commit ceb0b4abed.

* just let it auto resolve

* fix athenad/uploader tests

* zst here too

* TODOs

* yes

* Revert "TODOs"

This reverts commit 8c7da1dbd0.

* Revert "zst here too"

This reverts commit 23b0023ddf.

* Revert "just let it auto resolve"

This reverts commit f296d62424.

* Revert "regen and process replay"

This reverts commit 0768330e96.

* revert readme

* not in save_log either

* lfg

* Revert "lfg"

This reverts commit 3718559c6c.
pull/33104/head
Shane Smiskol 9 months ago committed by GitHub
parent 6f1ea5a1fd
commit 7dec7c39be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 17
      system/athena/athenad.py
  2. 59
      system/athena/tests/test_athenad.py
  3. 10
      system/loggerd/tests/test_uploader.py
  4. 13
      system/loggerd/uploader.py

@ -2,7 +2,6 @@
from __future__ import annotations
import base64
import bz2
import hashlib
import io
import json
@ -15,6 +14,7 @@ import sys
import tempfile
import threading
import time
import zstd
from dataclasses import asdict, dataclass, replace
from datetime import datetime
from functools import partial
@ -35,6 +35,7 @@ from openpilot.common.file_helpers import CallbackReader
from openpilot.common.params import Params
from openpilot.common.realtime import set_core_affinity
from openpilot.system.hardware import HARDWARE, PC
from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL
from openpilot.system.loggerd.xattr_cache import getxattr, setxattr
from openpilot.common.swaglog import cloudlog
from openpilot.system.version import get_build_metadata
@ -103,8 +104,8 @@ cancelled_uploads: set[str] = set()
cur_upload_items: dict[int, UploadItem | None] = {}
def strip_bz2_extension(fn: str) -> str:
if fn.endswith('.bz2'):
def strip_zst_extension(fn: str) -> str:
if fn.endswith('.zst'):
return fn[:-4]
return fn
@ -283,16 +284,16 @@ def _do_upload(upload_item: UploadItem, callback: Callable = None) -> requests.R
path = upload_item.path
compress = False
# If file does not exist, but does exist without the .bz2 extension we will compress on the fly
if not os.path.exists(path) and os.path.exists(strip_bz2_extension(path)):
path = strip_bz2_extension(path)
# If file does not exist, but does exist without the .zst extension we will compress on the fly
if not os.path.exists(path) and os.path.exists(strip_zst_extension(path)):
path = strip_zst_extension(path)
compress = True
with open(path, "rb") as f:
content = f.read()
if compress:
cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path)
content = bz2.compress(content)
content = zstd.compress(content, LOG_COMPRESSION_LEVEL)
with io.BytesIO(content) as data:
return requests.put(upload_item.url,
@ -375,7 +376,7 @@ def uploadFilesToUrls(files_data: list[UploadFileDict]) -> UploadFilesToUrlRespo
continue
path = os.path.join(Paths.log_root(), file.fn)
if not os.path.exists(path) and not os.path.exists(strip_bz2_extension(path)):
if not os.path.exists(path) and not os.path.exists(strip_zst_extension(path)):
failed.append(file.fn)
continue

@ -29,7 +29,7 @@ def seed_athena_server(host, port):
with Timeout(2, 'HTTP Server seeding failed'):
while True:
try:
requests.put(f'http://{host}:{port}/qlog.bz2', data='', timeout=10)
requests.put(f'http://{host}:{port}/qlog.zst', data='', timeout=10)
break
except requests.exceptions.ConnectionError:
time.sleep(0.1)
@ -174,54 +174,59 @@ class TestAthenadMethods:
assert resp, 'list empty!'
assert len(resp) == len(expected)
def test_strip_bz2_extension(self):
def test_strip_extension(self):
# any requested log file with an invalid extension won't return as existing
fn = self._create_file('qlog.bz2')
if fn.endswith('.bz2'):
assert athenad.strip_bz2_extension(fn) == fn[:-4]
assert athenad.strip_zst_extension(fn) == fn
fn = self._create_file('qlog.zst')
if fn.endswith('.zst'):
assert athenad.strip_zst_extension(fn) == fn[:-4]
@pytest.mark.parametrize("compress", [True, False])
def test_do_upload(self, host, compress):
# random bytes to ensure rather large object post-compression
fn = self._create_file('qlog', data=os.urandom(10000 * 1024))
upload_fn = fn + ('.bz2' if compress else '')
upload_fn = fn + ('.zst' if compress else '')
item = athenad.UploadItem(path=upload_fn, url="http://localhost:1238", headers={}, created_at=int(time.time()*1000), id='')
with pytest.raises(requests.exceptions.ConnectionError):
athenad._do_upload(item)
item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='')
item = athenad.UploadItem(path=upload_fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='')
resp = athenad._do_upload(item)
assert resp.status_code == 201
def test_upload_file_to_url(self, host):
fn = self._create_file('qlog.bz2')
fn = self._create_file('qlog.zst')
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", f"{host}/qlog.bz2", {})
resp = dispatcher["uploadFileToUrl"]("qlog.zst", f"{host}/qlog.zst", {})
assert resp['enqueued'] == 1
assert 'failed' not in resp
assert {"path": fn, "url": f"{host}/qlog.bz2", "headers": {}}.items() <= resp['items'][0].items()
assert {"path": fn, "url": f"{host}/qlog.zst", "headers": {}}.items() <= resp['items'][0].items()
assert resp['items'][0].get('id') is not None
assert athenad.upload_queue.qsize() == 1
def test_upload_file_to_url_duplicate(self, host):
self._create_file('qlog.bz2')
self._create_file('qlog.zst')
url1 = f"{host}/qlog.bz2?sig=sig1"
dispatcher["uploadFileToUrl"]("qlog.bz2", url1, {})
url1 = f"{host}/qlog.zst?sig=sig1"
dispatcher["uploadFileToUrl"]("qlog.zst", url1, {})
# Upload same file again, but with different signature
url2 = f"{host}/qlog.bz2?sig=sig2"
resp = dispatcher["uploadFileToUrl"]("qlog.bz2", url2, {})
url2 = f"{host}/qlog.zst?sig=sig2"
resp = dispatcher["uploadFileToUrl"]("qlog.zst", url2, {})
assert resp == {'enqueued': 0, 'items': []}
def test_upload_file_to_url_does_not_exist(self, host):
not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.bz2", "http://localhost:1238", {})
assert not_exists_resp == {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.bz2']}
not_exists_resp = dispatcher["uploadFileToUrl"]("does_not_exist.zst", "http://localhost:1238", {})
assert not_exists_resp == {'enqueued': 0, 'items': [], 'failed': ['does_not_exist.zst']}
@with_upload_handler
def test_upload_handler(self, host):
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
@ -236,8 +241,8 @@ class TestAthenadMethods:
def test_upload_handler_retry(self, mocker, host, status, retry):
mock_put = mocker.patch('requests.put')
mock_put.return_value.status_code = status
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
@ -251,8 +256,8 @@ class TestAthenadMethods:
@with_upload_handler
def test_upload_handler_timeout(self):
"""When an upload times out or fails to connect it should be placed back in the queue"""
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
item_no_retry = replace(item, retry_count=MAX_RETRY_COUNT)
athenad.upload_queue.put_nowait(item_no_retry)
@ -272,7 +277,7 @@ class TestAthenadMethods:
@with_upload_handler
def test_cancel_upload(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={},
item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={},
created_at=int(time.time()*1000), id='id', allow_cellular=True)
athenad.upload_queue.put_nowait(item)
dispatcher["cancelUpload"](item.id)
@ -291,8 +296,8 @@ class TestAthenadMethods:
ts = int(t_future.strftime("%s")) * 1000
# Item that would time out if actually uploaded
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.bz2", headers={}, created_at=ts, id='', allow_cellular=True)
fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url="http://localhost:44444/qlog.zst", headers={}, created_at=ts, id='', allow_cellular=True)
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
@ -306,8 +311,8 @@ class TestAthenadMethods:
@with_upload_handler
def test_list_upload_queue_current(self, host: str):
fn = self._create_file('qlog.bz2')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.bz2", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
fn = self._create_file('qlog.zst')
item = athenad.UploadItem(path=fn, url=f"{host}/qlog.zst", headers={}, created_at=int(time.time()*1000), id='', allow_cellular=True)
athenad.upload_queue.put_nowait(item)
self._wait_for_upload()
@ -317,7 +322,7 @@ class TestAthenadMethods:
assert items[0]['current']
def test_list_upload_queue(self):
item = athenad.UploadItem(path="qlog.bz2", url="http://localhost:44444/qlog.bz2", headers={},
item = athenad.UploadItem(path="qlog.zst", url="http://localhost:44444/qlog.zst", headers={},
created_at=int(time.time()*1000), id='id', allow_cellular=True)
athenad.upload_queue.put_nowait(item)

@ -62,10 +62,10 @@ class TestUploader(UploaderTestCase):
def gen_order(self, seg1: list[int], seg2: list[int], boot=True) -> list[str]:
keys = []
if boot:
keys += [f"boot/{self.seg_format.format(i)}.bz2" for i in seg1]
keys += [f"boot/{self.seg_format2.format(i)}.bz2" for i in seg2]
keys += [f"{self.seg_format.format(i)}/qlog.bz2" for i in seg1]
keys += [f"{self.seg_format2.format(i)}/qlog.bz2" for i in seg2]
keys += [f"boot/{self.seg_format.format(i)}.zst" for i in seg1]
keys += [f"boot/{self.seg_format2.format(i)}.zst" for i in seg2]
keys += [f"{self.seg_format.format(i)}/qlog.zst" for i in seg1]
keys += [f"{self.seg_format2.format(i)}/qlog.zst" for i in seg2]
return keys
def test_upload(self):
@ -159,7 +159,7 @@ class TestUploader(UploaderTestCase):
self.join_thread()
for f_path in f_paths:
fn = f_path.with_suffix(f_path.suffix.replace(".bz2", ""))
fn = f_path.with_suffix(f_path.suffix.replace(".zst", ""))
uploaded = UPLOAD_ATTR_NAME in os.listxattr(fn) and os.getxattr(fn, UPLOAD_ATTR_NAME) == UPLOAD_ATTR_VALUE
assert not uploaded, "File upload when locked"

@ -1,5 +1,4 @@
#!/usr/bin/env python3
import bz2
import io
import json
import os
@ -9,6 +8,7 @@ import threading
import time
import traceback
import datetime
import zstd
from typing import BinaryIO
from collections.abc import Iterator
@ -26,6 +26,7 @@ UPLOAD_ATTR_NAME = 'user.upload'
UPLOAD_ATTR_VALUE = b'1'
UPLOAD_QLOG_QCAM_MAX_SIZE = 5 * 1e6 # MB
LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change
allow_sleep = bool(os.getenv("UPLOADER_SLEEP", "1"))
force_wifi = os.getenv("FORCEWIFI") is not None
@ -83,7 +84,7 @@ class Uploader:
self.last_filename = ""
self.immediate_folders = ["crash/", "boot/"]
self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1}
self.immediate_priority = {"qlog": 0, "qlog.zst": 0, "qcamera.ts": 1}
def list_upload_files(self, metered: bool) -> Iterator[tuple[str, str, str]]:
r = self.params.get("AthenadRecentlyViewedRoutes", encoding="utf8")
@ -152,8 +153,8 @@ class Uploader:
with open(fn, "rb") as f:
data: BinaryIO
if key.endswith('.bz2') and not fn.endswith('.bz2'):
compressed = bz2.compress(f.read())
if key.endswith('.zst') and not fn.endswith('.zst'):
compressed = zstd.compress(f.read(), LOG_COMPRESSION_LEVEL)
data = io.BytesIO(compressed)
else:
data = f
@ -218,8 +219,8 @@ class Uploader:
name, key, fn = d
# qlogs and bootlogs need to be compressed before uploading
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.bz2')):
key += ".bz2"
if key.endswith(('qlog', 'rlog')) or (key.startswith('boot/') and not key.endswith('.zst')):
key += ".zst"
return self.upload(name, key, fn, network_type, metered)

Loading…
Cancel
Save