delay bz2 compression from logging to uploading (#24392)

* remove log_name

* log without compression

* fix tests

* remove extension for bootlog

* another test fix

* uploader compresses

* also compress in athena

* only compress qlog

* more generic check in do_upload

* fix bootlog compression

* lower loggerd cpu usage

* dont link against bz2

* set core affinity to little cluster

* handle old files
old-commit-hash: 77a6f3d034
taco
Willem Melching 3 years ago committed by GitHub
parent b35ac8b532
commit be08693d5c
  1. 41
      selfdrive/athena/athenad.py
  2. 3
      selfdrive/athena/tests/test_athenad.py
  3. 4
      selfdrive/loggerd/SConscript
  4. 4
      selfdrive/loggerd/bootlog.cc
  5. 11
      selfdrive/loggerd/logger.cc
  6. 32
      selfdrive/loggerd/logger.h
  7. 2
      selfdrive/loggerd/loggerd.cc
  8. 2
      selfdrive/loggerd/tests/test_encoder.py
  9. 10
      selfdrive/loggerd/tests/test_logger.cc
  10. 6
      selfdrive/loggerd/tests/test_loggerd.py
  11. 13
      selfdrive/loggerd/tests/test_uploader.py
  12. 21
      selfdrive/loggerd/uploader.py
  13. 6
      selfdrive/test/process_replay/regen.py
  14. 10
      selfdrive/test/test_onroad.py

@ -1,5 +1,6 @@
#!/usr/bin/env python3
import base64
import bz2
import hashlib
import io
import json
@ -30,7 +31,7 @@ from common.api import Api
from common.basedir import PERSIST
from common.file_helpers import CallbackReader
from common.params import Params
from common.realtime import sec_since_boot
from common.realtime import sec_since_boot, set_core_affinity
from selfdrive.hardware import HARDWARE, PC, TICI
from selfdrive.loggerd.config import ROOT
from selfdrive.loggerd.xattr_cache import getxattr, setxattr
@ -64,6 +65,13 @@ UploadItem = namedtuple('UploadItem', ['path', 'url', 'headers', 'created_at', '
cur_upload_items: Dict[int, Any] = {}
def strip_bz2_extension(fn):
if fn.endswith('.bz2'):
return fn[:-4]
return fn
class AbortTransferException(Exception):
pass
@ -227,14 +235,29 @@ def upload_handler(end_event: threading.Event) -> None:
def _do_upload(upload_item, callback=None):
with open(upload_item.path, "rb") as f:
size = os.fstat(f.fileno()).st_size
path = upload_item.path
compress = False
# If file does not exist, but does exist without the .bz2 extension we will compress on the fly
if not os.path.exists(path) and os.path.exists(strip_bz2_extension(path)):
path = strip_bz2_extension(path)
compress = True
with open(path, "rb") as f:
if compress:
cloudlog.event("athena.upload_handler.compress", fn=path, fn_orig=upload_item.path)
data = bz2.compress(f.read())
size = len(data)
data = io.BytesIO(data)
else:
size = os.fstat(f.fileno()).st_size
data = f
if callback:
f = CallbackReader(f, callback, size)
data = CallbackReader(data, callback, size)
return requests.put(upload_item.url,
data=f,
data=data,
headers={**upload_item.headers, 'Content-Length': str(size)},
timeout=30)
@ -335,8 +358,9 @@ def uploadFilesToUrls(files_data):
if len(fn) == 0 or fn[0] == '/' or '..' in fn or 'url' not in file:
failed.append(fn)
continue
path = os.path.join(ROOT, fn)
if not os.path.exists(path):
if not os.path.exists(path) and not os.path.exists(strip_bz2_extension(path)):
failed.append(fn)
continue
@ -680,6 +704,11 @@ def backoff(retries):
def main():
try:
set_core_affinity([0, 1, 2, 3])
except Exception:
cloudlog.exception("failed to set core affinity")
params = Params()
dongle_id = params.get("DongleId", encoding='utf-8')
UploadQueueCache.initialize(upload_queue)

@ -81,7 +81,8 @@ class TestAthenadMethods(unittest.TestCase):
def test_listDataDirectory(self):
route = '2021-03-29--13-32-47'
segments = [0, 1, 2, 3, 11]
filenames = ['qlog.bz2', 'qcamera.ts', 'rlog.bz2', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc']
filenames = ['qlog', 'qcamera.ts', 'rlog', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc']
files = [f'{route}--{s}/{f}' for s in segments for f in filenames]
for file in files:
fn = os.path.join(athenad.ROOT, file)

@ -3,7 +3,7 @@ Import('env', 'arch', 'cereal', 'messaging', 'common', 'visionipc')
libs = [common, cereal, messaging, visionipc,
'zmq', 'capnp', 'kj', 'z',
'avformat', 'avcodec', 'swscale', 'avutil',
'yuv', 'bz2', 'OpenCL', 'pthread']
'yuv', 'OpenCL', 'pthread']
src = ['logger.cc', 'loggerd.cc', 'video_writer.cc', 'remote_encoder.cc']
if arch == "larch64":
@ -25,4 +25,4 @@ if arch == "larch64":
env.Program('bootlog.cc', LIBS=libs)
if GetOption('test'):
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_loggerd.cc', 'tests/test_logger.cc', env.Object('logger_util', '#/selfdrive/ui/replay/util.cc')], LIBS=libs + ['curl', 'crypto'])
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_loggerd.cc', 'tests/test_logger.cc'], LIBS=libs + ['curl', 'crypto'])

@ -49,14 +49,14 @@ static kj::Array<capnp::word> build_boot_log() {
}
int main(int argc, char** argv) {
const std::string path = LOG_ROOT + "/boot/" + logger_get_route_name() + ".bz2";
const std::string path = LOG_ROOT + "/boot/" + logger_get_route_name();
LOGW("bootlog to %s", path.c_str());
// Open bootlog
bool r = util::create_directories(LOG_ROOT + "/boot/", 0775);
assert(r);
BZFile bz_file(path.c_str());
RawFile bz_file(path.c_str());
// Write initdata
bz_file.write(logger_build_init_data().asBytes());

@ -109,13 +109,12 @@ static void lh_log_sentinel(LoggerHandle *h, SentinelType type) {
// ***** logging functions *****
void logger_init(LoggerState *s, const char* log_name, bool has_qlog) {
void logger_init(LoggerState *s, bool has_qlog) {
pthread_mutex_init(&s->lock, NULL);
s->part = -1;
s->has_qlog = has_qlog;
s->route_name = logger_get_route_name();
snprintf(s->log_name, sizeof(s->log_name), "%s", log_name);
s->init_data = logger_build_init_data();
}
@ -132,8 +131,8 @@ static LoggerHandle* logger_open(LoggerState *s, const char* root_path) {
snprintf(h->segment_path, sizeof(h->segment_path),
"%s/%s--%d", root_path, s->route_name.c_str(), s->part);
snprintf(h->log_path, sizeof(h->log_path), "%s/%s.bz2", h->segment_path, s->log_name);
snprintf(h->qlog_path, sizeof(h->qlog_path), "%s/qlog.bz2", h->segment_path);
snprintf(h->log_path, sizeof(h->log_path), "%s/rlog", h->segment_path);
snprintf(h->qlog_path, sizeof(h->qlog_path), "%s/qlog", h->segment_path);
snprintf(h->lock_path, sizeof(h->lock_path), "%s.lock", h->log_path);
h->end_sentinel_type = SentinelType::END_OF_SEGMENT;
h->exit_signal = 0;
@ -144,9 +143,9 @@ static LoggerHandle* logger_open(LoggerState *s, const char* root_path) {
if (lock_file == NULL) return NULL;
fclose(lock_file);
h->log = std::make_unique<BZFile>(h->log_path);
h->log = std::make_unique<RawFile>(h->log_path);
if (s->has_qlog) {
h->q_log = std::make_unique<BZFile>(h->qlog_path);
h->q_log = std::make_unique<RawFile>(h->qlog_path);
}
pthread_mutex_init(&h->lock, NULL);

@ -7,7 +7,6 @@
#include <cstdio>
#include <memory>
#include <bzlib.h>
#include <capnp/serialize.h>
#include <kj/array.h>
@ -20,42 +19,25 @@ const std::string LOG_ROOT = Path::log_root();
#define LOGGER_MAX_HANDLES 16
class BZFile {
class RawFile {
public:
BZFile(const char* path) {
RawFile(const char* path) {
file = util::safe_fopen(path, "wb");
assert(file != nullptr);
int bzerror;
bz_file = BZ2_bzWriteOpen(&bzerror, file, 9, 0, 30);
assert(bzerror == BZ_OK);
}
~BZFile() {
int bzerror;
BZ2_bzWriteClose(&bzerror, bz_file, 0, nullptr, nullptr);
if (bzerror != BZ_OK) {
LOGE("BZ2_bzWriteClose error, bzerror=%d", bzerror);
}
~RawFile() {
util::safe_fflush(file);
int err = fclose(file);
assert(err == 0);
}
inline void write(void* data, size_t size) {
int bzerror;
do {
BZ2_bzWrite(&bzerror, bz_file, data, size);
} while (bzerror == BZ_IO_ERROR && errno == EINTR);
if (bzerror != BZ_OK && !error_logged) {
LOGE("BZ2_bzWrite error, bzerror=%d", bzerror);
error_logged = true;
}
int written = util::safe_fwrite(data, 1, size, file);
assert(written == size);
}
inline void write(kj::ArrayPtr<capnp::byte> array) { write(array.begin(), array.size()); }
private:
bool error_logged = false;
FILE* file = nullptr;
BZFILE* bz_file = nullptr;
};
typedef cereal::Sentinel::SentinelType SentinelType;
@ -69,7 +51,7 @@ typedef struct LoggerHandle {
char log_path[4096];
char qlog_path[4096];
char lock_path[4096];
std::unique_ptr<BZFile> log, q_log;
std::unique_ptr<RawFile> log, q_log;
} LoggerHandle;
typedef struct LoggerState {
@ -86,7 +68,7 @@ typedef struct LoggerState {
kj::Array<capnp::word> logger_build_init_data();
std::string logger_get_route_name();
void logger_init(LoggerState *s, const char* log_name, bool has_qlog);
void logger_init(LoggerState *s, bool has_qlog);
int logger_next(LoggerState *s, const char* root_path,
char* out_segment_path, size_t out_segment_path_len,
int* out_part);

@ -217,7 +217,7 @@ void loggerd_thread() {
LoggerdState s;
// init logger
logger_init(&s.logger, "rlog", true);
logger_init(&s.logger, true);
logger_rotate(&s);
Params().put("CurrentRoute", s.logger.route_name);

@ -112,7 +112,7 @@ class TestEncoder(unittest.TestCase):
# Check encodeIdx
if encode_idx_name is not None:
rlog_path = f"{route_prefix_path}--{i}/rlog.bz2"
rlog_path = f"{route_prefix_path}--{i}/rlog"
msgs = [m for m in LogReader(rlog_path) if m.which() == encode_idx_name]
encode_msgs = [getattr(m, encode_idx_name) for m in msgs]

@ -18,10 +18,10 @@ void verify_segment(const std::string &route_path, int segment, int max_segment,
SentinelType begin_sentinel = segment == 0 ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT;
SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT;
REQUIRE(!util::file_exists(segment_path + "/rlog.bz2.lock"));
for (const char *fn : {"/rlog.bz2", "/qlog.bz2"}) {
REQUIRE(!util::file_exists(segment_path + "/rlog.lock"));
for (const char *fn : {"/rlog", "/qlog"}) {
const std::string log_file = segment_path + fn;
std::string log = decompressBZ2(util::read_file(log_file));
std::string log = util::read_file(log_file);
REQUIRE(!log.empty());
int event_cnt = 0, i = 0;
kj::ArrayPtr<const capnp::word> words((capnp::word *)log.data(), log.size() / sizeof(capnp::word));
@ -70,7 +70,7 @@ TEST_CASE("logger") {
ExitHandler do_exit;
LoggerState logger = {};
logger_init(&logger, "rlog", true);
logger_init(&logger, true);
char segment_path[PATH_MAX] = {};
int segment = -1;
@ -78,7 +78,7 @@ TEST_CASE("logger") {
const int segment_cnt = 100;
for (int i = 0; i < segment_cnt; ++i) {
REQUIRE(logger_next(&logger, log_root.c_str(), segment_path, sizeof(segment_path), &segment) == 0);
REQUIRE(util::file_exists(std::string(segment_path) + "/rlog.bz2.lock"));
REQUIRE(util::file_exists(std::string(segment_path) + "/rlog.lock"));
REQUIRE(segment == i);
write_msg(logger.cur_handle);
}

@ -108,7 +108,7 @@ class TestLoggerd(unittest.TestCase):
os.environ["LOGGERD_TEST"] = "1"
Params().put("RecordFront", "1")
expected_files = {"rlog.bz2", "qlog.bz2", "qcamera.ts", "fcamera.hevc", "dcamera.hevc"}
expected_files = {"rlog", "qlog", "qcamera.ts", "fcamera.hevc", "dcamera.hevc"}
streams = [(VisionStreamType.VISION_STREAM_ROAD, tici_f_frame_size if TICI else eon_f_frame_size, "roadCameraState"),
(VisionStreamType.VISION_STREAM_DRIVER, tici_d_frame_size if TICI else eon_d_frame_size, "driverCameraState")]
if TICI:
@ -208,7 +208,7 @@ class TestLoggerd(unittest.TestCase):
time.sleep(1)
managed_processes["loggerd"].stop()
qlog_path = os.path.join(self._get_latest_log_dir(), "qlog.bz2")
qlog_path = os.path.join(self._get_latest_log_dir(), "qlog")
lr = list(LogReader(qlog_path))
# check initData and sentinel
@ -254,7 +254,7 @@ class TestLoggerd(unittest.TestCase):
time.sleep(2)
managed_processes["loggerd"].stop()
lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog.bz2")))
lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog")))
# check initData and sentinel
self._check_init_data(lr)

@ -54,11 +54,11 @@ class TestUploader(UploaderTestCase):
def gen_files(self, lock=False, boot=True):
f_paths = list()
for t in ["qlog.bz2", "rlog.bz2", "dcamera.hevc", "fcamera.hevc"]:
for t in ["qlog", "rlog", "dcamera.hevc", "fcamera.hevc"]:
f_paths.append(self.make_file_with_data(self.seg_dir, t, 1, lock=lock))
if boot:
f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}.bz2", 1, lock=lock))
f_paths.append(self.make_file_with_data("boot", f"{self.seg_dir}", 1, lock=lock))
return f_paths
def gen_order(self, seg1, seg2, boot=True):
@ -84,7 +84,7 @@ class TestUploader(UploaderTestCase):
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order:
self.assertTrue(getxattr(os.path.join(self.root, f_path), uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
self.assertTrue(getxattr(os.path.join(self.root, f_path.replace('.bz2', '')), uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
@ -103,7 +103,7 @@ class TestUploader(UploaderTestCase):
self.assertFalse(len(log_handler.upload_ignored) < len(exp_order), "Some files failed to ignore")
self.assertFalse(len(log_handler.upload_ignored) > len(exp_order), "Some files were ignored twice")
for f_path in exp_order:
self.assertTrue(getxattr(os.path.join(self.root, f_path), uploader.UPLOAD_ATTR_NAME), "All files not ignored")
self.assertTrue(getxattr(os.path.join(self.root, f_path.replace('.bz2', '')), uploader.UPLOAD_ATTR_NAME), "All files not ignored")
self.assertTrue(log_handler.upload_ignored == exp_order, "Files ignored in wrong order")
@ -128,7 +128,7 @@ class TestUploader(UploaderTestCase):
self.assertFalse(len(log_handler.upload_order) < len(exp_order), "Some files failed to upload")
self.assertFalse(len(log_handler.upload_order) > len(exp_order), "Some files were uploaded twice")
for f_path in exp_order:
self.assertTrue(getxattr(os.path.join(self.root, f_path), uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
self.assertTrue(getxattr(os.path.join(self.root, f_path.replace('.bz2', '')), uploader.UPLOAD_ATTR_NAME), "All files not uploaded")
self.assertTrue(log_handler.upload_order == exp_order, "Files uploaded in wrong order")
@ -143,8 +143,7 @@ class TestUploader(UploaderTestCase):
self.join_thread()
for f_path in f_paths:
self.assertFalse(getxattr(f_path, uploader.UPLOAD_ATTR_NAME), "File upload when locked")
self.assertFalse(getxattr(f_path.replace('.bz2', ''), uploader.UPLOAD_ATTR_NAME), "File upload when locked")
def test_clear_locks_on_startup(self):
f_paths = self.gen_files(lock=True, boot=False)

@ -1,4 +1,5 @@
#!/usr/bin/env python3
import bz2
import json
import os
import random
@ -12,6 +13,7 @@ from cereal import log
import cereal.messaging as messaging
from common.api import Api
from common.params import Params
from common.realtime import set_core_affinity
from selfdrive.hardware import TICI
from selfdrive.loggerd.xattr_cache import getxattr, setxattr
from selfdrive.loggerd.config import ROOT
@ -69,7 +71,7 @@ class Uploader():
self.last_filename = ""
self.immediate_folders = ["crash/", "boot/"]
self.immediate_priority = {"qlog.bz2": 0, "qcamera.ts": 1}
self.immediate_priority = {"qlog": 0, "qlog.bz2": 0, "qcamera.ts": 1}
def get_upload_sort(self, name):
if name in self.immediate_priority:
@ -149,7 +151,12 @@ class Uploader():
self.last_resp = FakeResponse()
else:
with open(fn, "rb") as f:
self.last_resp = requests.put(url, data=f, headers=headers, timeout=10)
data = f.read()
if key.endswith('.bz2') and not fn.endswith('.bz2'):
data = bz2.compress(data)
self.last_resp = requests.put(url, data=data, headers=headers, timeout=10)
except Exception as e:
self.last_exc = (e, traceback.format_exc())
raise
@ -212,7 +219,13 @@ class Uploader():
us.lastFilename = self.last_filename
return msg
def uploader_fn(exit_event):
try:
set_core_affinity([0, 1, 2, 3])
except Exception:
cloudlog.exception("failed to set core affinity")
clear_locks(ROOT)
params = Params()
@ -247,6 +260,10 @@ def uploader_fn(exit_event):
key, fn = d
# qlogs and bootlogs need to be compressed before uploading
if key.endswith('qlog') or (key.startswith('boot/') and not key.endswith('.bz2')):
key += ".bz2"
success = uploader.upload(key, fn, sm['deviceState'].networkType.raw, sm['deviceState'].networkMetered)
if success:
backoff = 0.1

@ -256,7 +256,7 @@ def regen_segment(lr, frs=None, outdir=FAKEDATA):
segment = params.get("CurrentRoute", encoding='utf-8') + "--0"
seg_path = os.path.join(outdir, segment)
# check to make sure openpilot is engaged in the route
if not check_enabled(LogReader(os.path.join(seg_path, "rlog.bz2"))):
if not check_enabled(LogReader(os.path.join(seg_path, "rlog"))):
raise Exception(f"Route never enabled: {segment}")
return seg_path
@ -268,11 +268,11 @@ def regen_and_save(route, sidx, upload=False, use_route_meta=False):
lr = LogReader(r.log_paths()[args.seg])
fr = FrameReader(r.camera_paths()[args.seg])
else:
lr = LogReader(f"cd:/{route.replace('|', '/')}/{sidx}/rlog.bz2")
lr = LogReader(f"cd:/{route.replace('|', '/')}/{sidx}/rlog")
fr = FrameReader(f"cd:/{route.replace('|', '/')}/{sidx}/fcamera.hevc")
rpath = regen_segment(lr, {'roadCameraState': fr})
lr = LogReader(os.path.join(rpath, 'rlog.bz2'))
lr = LogReader(os.path.join(rpath, 'rlog2'))
controls_state_active = [m.controlsState.active for m in lr if m.which() == 'controlsState']
assert any(controls_state_active), "Segment did not engage"

@ -22,7 +22,7 @@ from tools.lib.logreader import LogReader
# Baseline CPU usage by process
PROCS = {
"selfdrive.controls.controlsd": 31.0,
"./loggerd": 70.0,
"./loggerd": 50.0,
"./camerad": 26.0,
"./locationd": 9.1,
"selfdrive.controls.plannerd": 11.7,
@ -108,9 +108,9 @@ class TestOnroad(unittest.TestCase):
@classmethod
def setUpClass(cls):
if "DEBUG" in os.environ:
segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog.bz2")), Path(ROOT).iterdir())
segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog")), Path(ROOT).iterdir())
segs = sorted(segs, key=lambda x: x.stat().st_mtime)
cls.lr = list(LogReader(os.path.join(segs[-1], "rlog.bz2")))
cls.lr = list(LogReader(os.path.join(segs[-1], "rlog")))
return
# setup env
@ -160,10 +160,10 @@ class TestOnroad(unittest.TestCase):
if proc.wait(60) is None:
proc.kill()
cls.lrs = [list(LogReader(os.path.join(str(s), "rlog.bz2"))) for s in cls.segments]
cls.lrs = [list(LogReader(os.path.join(str(s), "rlog"))) for s in cls.segments]
# use the second segment by default as it's the first full segment
cls.lr = list(LogReader(os.path.join(str(cls.segments[1]), "rlog.bz2")))
cls.lr = list(LogReader(os.path.join(str(cls.segments[1]), "rlog")))
def test_cloudlog_size(self):
msgs = [m for m in self.lr if m.which() == 'logMessage']

Loading…
Cancel
Save