loggerd: switch logging from raw file format to zstd compressed files (#34549)

* switch logging from raw file format to zstd compressed files

* more zst suffix

* compress bootlog

* remove class RawFile

* Optimize ZstdFileWriter by adding input caching

* use ZSTD_compressStream2

* cleanup

* LOG_COMPRESSION_LEVEL=10

* space

* add zst suffix to LOGS_SIZE_RATE
pull/34551/head
Dean Lee 3 months ago committed by GitHub
parent 4066d49d70
commit 9ec54f59c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 17
      selfdrive/test/test_onroad.py
  2. 2
      system/athena/tests/test_athenad.py
  3. 6
      system/loggerd/SConscript
  4. 5
      system/loggerd/bootlog.cc
  5. 43
      system/loggerd/logger.cc
  6. 26
      system/loggerd/logger.h
  7. 2
      system/loggerd/tests/test_encoder.py
  8. 5
      system/loggerd/tests/test_logger.cc
  9. 6
      system/loggerd/tests/test_loggerd.py
  10. 38
      system/loggerd/tests/test_zstd_writer.cc
  11. 65
      system/loggerd/zstd_writer.cc
  12. 24
      system/loggerd/zstd_writer.h
  13. 10
      tools/lib/logreader.py
  14. 2
      tools/longitudinal_maneuvers/generate_report.py

@ -8,7 +8,6 @@ import shutil
import subprocess import subprocess
import time import time
import numpy as np import numpy as np
import zstandard as zstd
from collections import Counter, defaultdict from collections import Counter, defaultdict
from pathlib import Path from pathlib import Path
from tabulate import tabulate from tabulate import tabulate
@ -23,7 +22,6 @@ from openpilot.selfdrive.selfdrived.events import EVENTS, ET
from openpilot.selfdrive.test.helpers import set_params_enabled, release_only from openpilot.selfdrive.test.helpers import set_params_enabled, release_only
from openpilot.system.hardware import HARDWARE from openpilot.system.hardware import HARDWARE
from openpilot.system.hardware.hw import Paths from openpilot.system.hardware.hw import Paths
from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL
from openpilot.tools.lib.logreader import LogReader from openpilot.tools.lib.logreader import LogReader
""" """
@ -102,8 +100,8 @@ TIMINGS = {
} }
LOGS_SIZE_RATE = { LOGS_SIZE_RATE = {
"qlog": 0.0083, "qlog.zst": 0.0083,
"rlog": 0.135, "rlog.zst": 0.135,
"qcamera.ts": 0.03828, "qcamera.ts": 0.03828,
} }
LOGS_SIZE_RATE.update(dict.fromkeys(['ecamera.hevc', 'fcamera.hevc'], 1.2740)) LOGS_SIZE_RATE.update(dict.fromkeys(['ecamera.hevc', 'fcamera.hevc'], 1.2740))
@ -119,10 +117,10 @@ class TestOnroad:
@classmethod @classmethod
def setup_class(cls): def setup_class(cls):
if "DEBUG" in os.environ: if "DEBUG" in os.environ:
segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog")), Path(Paths.log_root()).iterdir()) segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog.zst")), Path(Paths.log_root()).iterdir())
segs = sorted(segs, key=lambda x: x.stat().st_mtime) segs = sorted(segs, key=lambda x: x.stat().st_mtime)
print(segs[-3]) print(segs[-3])
cls.lr = list(LogReader(os.path.join(segs[-3], "rlog"))) cls.lr = list(LogReader(os.path.join(segs[-3], "rlog.zst")))
return return
# setup env # setup env
@ -173,18 +171,15 @@ class TestOnroad:
if proc.wait(60) is None: if proc.wait(60) is None:
proc.kill() proc.kill()
cls.lrs = [list(LogReader(os.path.join(str(s), "rlog"))) for s in cls.segments] cls.lrs = [list(LogReader(os.path.join(str(s), "rlog.zst"))) for s in cls.segments]
cls.lr = list(LogReader(os.path.join(str(cls.segments[0]), "rlog"))) cls.lr = list(LogReader(os.path.join(str(cls.segments[0]), "rlog.zst")))
cls.log_path = cls.segments[0] cls.log_path = cls.segments[0]
cls.log_sizes = {} cls.log_sizes = {}
for f in cls.log_path.iterdir(): for f in cls.log_path.iterdir():
assert f.is_file() assert f.is_file()
cls.log_sizes[f] = f.stat().st_size / 1e6 cls.log_sizes[f] = f.stat().st_size / 1e6
if f.name in ("qlog", "rlog"):
with open(f, 'rb') as ff:
cls.log_sizes[f] = len(zstd.compress(ff.read(), LOG_COMPRESSION_LEVEL)) / 1e6
cls.msgs = defaultdict(list) cls.msgs = defaultdict(list)
for m in cls.lr: for m in cls.lr:

@ -138,7 +138,7 @@ class TestAthenadMethods:
route = '2021-03-29--13-32-47' route = '2021-03-29--13-32-47'
segments = [0, 1, 2, 3, 11] segments = [0, 1, 2, 3, 11]
filenames = ['qlog', 'qcamera.ts', 'rlog', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc'] filenames = ['qlog.zst', 'qcamera.ts', 'rlog.zst', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc']
files = [f'{route}--{s}/{f}' for s in segments for f in filenames] files = [f'{route}--{s}/{f}' for s in segments for f in filenames]
for file in files: for file in files:
self._create_file(file) self._create_file(file)

@ -2,9 +2,9 @@ Import('env', 'arch', 'messaging', 'common', 'visionipc')
libs = [common, messaging, visionipc, libs = [common, messaging, visionipc,
'z', 'avformat', 'avcodec', 'swscale', 'z', 'avformat', 'avcodec', 'swscale',
'avutil', 'yuv', 'OpenCL', 'pthread'] 'avutil', 'yuv', 'OpenCL', 'pthread', 'zstd']
src = ['logger.cc', 'video_writer.cc', 'encoder/encoder.cc', 'encoder/v4l_encoder.cc'] src = ['logger.cc', 'zstd_writer.cc', 'video_writer.cc', 'encoder/encoder.cc', 'encoder/v4l_encoder.cc']
if arch != "larch64": if arch != "larch64":
src += ['encoder/ffmpeg_encoder.cc'] src += ['encoder/ffmpeg_encoder.cc']
@ -23,4 +23,4 @@ env.Program('encoderd', ['encoderd.cc'], LIBS=libs)
env.Program('bootlog.cc', LIBS=libs) env.Program('bootlog.cc', LIBS=libs)
if GetOption('extras'): if GetOption('extras'):
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc'], LIBS=libs + ['curl', 'crypto']) env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc', 'tests/test_zstd_writer.cc'], LIBS=libs + ['curl', 'crypto'])

@ -5,6 +5,7 @@
#include "common/params.h" #include "common/params.h"
#include "common/swaglog.h" #include "common/swaglog.h"
#include "system/loggerd/logger.h" #include "system/loggerd/logger.h"
#include "system/loggerd/zstd_writer.h"
static kj::Array<capnp::word> build_boot_log() { static kj::Array<capnp::word> build_boot_log() {
@ -50,14 +51,14 @@ static kj::Array<capnp::word> build_boot_log() {
int main(int argc, char** argv) { int main(int argc, char** argv) {
const std::string id = logger_get_identifier("BootCount"); const std::string id = logger_get_identifier("BootCount");
const std::string path = Path::log_root() + "/boot/" + id; const std::string path = Path::log_root() + "/boot/" + id + ".zst";
LOGW("bootlog to %s", path.c_str()); LOGW("bootlog to %s", path.c_str());
// Open bootlog // Open bootlog
bool r = util::create_directories(Path::log_root() + "/boot/", 0775); bool r = util::create_directories(Path::log_root() + "/boot/", 0775);
assert(r); assert(r);
RawFile file(path.c_str()); ZstdFileWriter file(path, LOG_COMPRESSION_LEVEL);
// Write initdata // Write initdata
file.write(logger_build_init_data().asBytes()); file.write(logger_build_init_data().asBytes());
// Write bootlog // Write bootlog

@ -113,6 +113,42 @@ std::string logger_get_identifier(std::string key) {
return util::string_format("%08x--%s", cnt, ss.str().c_str()); return util::string_format("%08x--%s", cnt, ss.str().c_str());
} }
std::string zstd_decompress(const std::string &in) {
ZSTD_DCtx *dctx = ZSTD_createDCtx();
assert(dctx != nullptr);
// Initialize input and output buffers
ZSTD_inBuffer input = {in.data(), in.size(), 0};
// Estimate and reserve memory for decompressed data
size_t estimatedDecompressedSize = ZSTD_getFrameContentSize(in.data(), in.size());
if (estimatedDecompressedSize == ZSTD_CONTENTSIZE_ERROR || estimatedDecompressedSize == ZSTD_CONTENTSIZE_UNKNOWN) {
estimatedDecompressedSize = in.size() * 2; // Use a fallback size
}
std::string decompressedData;
decompressedData.reserve(estimatedDecompressedSize);
const size_t bufferSize = ZSTD_DStreamOutSize(); // Recommended output buffer size
std::string outputBuffer(bufferSize, '\0');
while (input.pos < input.size) {
ZSTD_outBuffer output = {outputBuffer.data(), bufferSize, 0};
size_t result = ZSTD_decompressStream(dctx, &output, &input);
if (ZSTD_isError(result)) {
break;
}
decompressedData.append(outputBuffer.data(), output.pos);
}
ZSTD_freeDCtx(dctx);
decompressedData.shrink_to_fit();
return decompressedData;
}
static void log_sentinel(LoggerState *log, SentinelType type, int exit_signal = 0) { static void log_sentinel(LoggerState *log, SentinelType type, int exit_signal = 0) {
MessageBuilder msg; MessageBuilder msg;
auto sen = msg.initEvent().initSentinel(); auto sen = msg.initEvent().initSentinel();
@ -144,12 +180,11 @@ bool LoggerState::next() {
bool ret = util::create_directories(segment_path, 0775); bool ret = util::create_directories(segment_path, 0775);
assert(ret == true); assert(ret == true);
const std::string rlog_path = segment_path + "/rlog"; lock_file = segment_path + "/rlog.lock";
lock_file = rlog_path + ".lock";
std::ofstream{lock_file}; std::ofstream{lock_file};
rlog.reset(new RawFile(rlog_path)); rlog.reset(new ZstdFileWriter(segment_path + "/rlog.zst", LOG_COMPRESSION_LEVEL));
qlog.reset(new RawFile(segment_path + "/qlog")); qlog.reset(new ZstdFileWriter(segment_path + "/qlog.zst", LOG_COMPRESSION_LEVEL));
// log init data & sentinel type. // log init data & sentinel type.
write(init_data.asBytes(), true); write(init_data.asBytes(), true);

@ -7,31 +7,12 @@
#include "cereal/messaging/messaging.h" #include "cereal/messaging/messaging.h"
#include "common/util.h" #include "common/util.h"
#include "system/hardware/hw.h" #include "system/hardware/hw.h"
#include "system/loggerd/zstd_writer.h"
class RawFile { constexpr int LOG_COMPRESSION_LEVEL = 10;
public:
RawFile(const std::string &path) {
file = util::safe_fopen(path.c_str(), "wb");
assert(file != nullptr);
}
~RawFile() {
util::safe_fflush(file);
int err = fclose(file);
assert(err == 0);
}
inline void write(void* data, size_t size) {
int written = util::safe_fwrite(data, 1, size, file);
assert(written == size);
}
inline void write(kj::ArrayPtr<capnp::byte> array) { write(array.begin(), array.size()); }
private:
FILE* file = nullptr;
};
typedef cereal::Sentinel::SentinelType SentinelType; typedef cereal::Sentinel::SentinelType SentinelType;
class LoggerState { class LoggerState {
public: public:
LoggerState(const std::string& log_root = Path::log_root()); LoggerState(const std::string& log_root = Path::log_root());
@ -48,8 +29,9 @@ protected:
int part = -1, exit_signal = 0; int part = -1, exit_signal = 0;
std::string route_path, route_name, segment_path, lock_file; std::string route_path, route_name, segment_path, lock_file;
kj::Array<capnp::word> init_data; kj::Array<capnp::word> init_data;
std::unique_ptr<RawFile> rlog, qlog; std::unique_ptr<ZstdFileWriter> rlog, qlog;
}; };
kj::Array<capnp::word> logger_build_init_data(); kj::Array<capnp::word> logger_build_init_data();
std::string logger_get_identifier(std::string key); std::string logger_get_identifier(std::string key);
std::string zstd_decompress(const std::string &in);

@ -106,7 +106,7 @@ class TestEncoder:
# Check encodeIdx # Check encodeIdx
if encode_idx_name is not None: if encode_idx_name is not None:
rlog_path = f"{route_prefix_path}--{i}/rlog" rlog_path = f"{route_prefix_path}--{i}/rlog.zst"
msgs = [m for m in LogReader(rlog_path) if m.which() == encode_idx_name] msgs = [m for m in LogReader(rlog_path) if m.which() == encode_idx_name]
encode_msgs = [getattr(m, encode_idx_name) for m in msgs] encode_msgs = [getattr(m, encode_idx_name) for m in msgs]

@ -9,12 +9,13 @@ void verify_segment(const std::string &route_path, int segment, int max_segment,
SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT; SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT;
REQUIRE(!util::file_exists(segment_path + "/rlog.lock")); REQUIRE(!util::file_exists(segment_path + "/rlog.lock"));
for (const char *fn : {"/rlog", "/qlog"}) { for (const char *fn : {"/rlog.zst", "/qlog.zst"}) {
const std::string log_file = segment_path + fn; const std::string log_file = segment_path + fn;
std::string log = util::read_file(log_file); std::string log = util::read_file(log_file);
REQUIRE(!log.empty()); REQUIRE(!log.empty());
std::string decompressed_log = zstd_decompress(log);
int event_cnt = 0, i = 0; int event_cnt = 0, i = 0;
kj::ArrayPtr<const capnp::word> words((capnp::word *)log.data(), log.size() / sizeof(capnp::word)); kj::ArrayPtr<const capnp::word> words((capnp::word *)decompressed_log.data(), decompressed_log.size() / sizeof(capnp::word));
while (words.size() > 0) { while (words.size() > 0) {
try { try {
capnp::FlatArrayMessageReader reader(words); capnp::FlatArrayMessageReader reader(words);

@ -142,7 +142,7 @@ class TestLoggerd:
Params().put("RecordFront", "1") Params().put("RecordFront", "1")
d = DEVICE_CAMERAS[("tici", "ar0231")] d = DEVICE_CAMERAS[("tici", "ar0231")]
expected_files = {"rlog", "qlog", "qcamera.ts", "fcamera.hevc", "dcamera.hevc", "ecamera.hevc"} expected_files = {"rlog.zst", "qlog.zst", "qcamera.ts", "fcamera.hevc", "dcamera.hevc", "ecamera.hevc"}
streams = [(VisionStreamType.VISION_STREAM_ROAD, (d.fcam.width, d.fcam.height, 2048*2346, 2048, 2048*1216), "roadCameraState"), streams = [(VisionStreamType.VISION_STREAM_ROAD, (d.fcam.width, d.fcam.height, 2048*2346, 2048, 2048*1216), "roadCameraState"),
(VisionStreamType.VISION_STREAM_DRIVER, (d.dcam.width, d.dcam.height, 2048*2346, 2048, 2048*1216), "driverCameraState"), (VisionStreamType.VISION_STREAM_DRIVER, (d.dcam.width, d.dcam.height, 2048*2346, 2048, 2048*1216), "driverCameraState"),
(VisionStreamType.VISION_STREAM_WIDE_ROAD, (d.ecam.width, d.ecam.height, 2048*2346, 2048, 2048*1216), "wideRoadCameraState")] (VisionStreamType.VISION_STREAM_WIDE_ROAD, (d.ecam.width, d.ecam.height, 2048*2346, 2048, 2048*1216), "wideRoadCameraState")]
@ -229,7 +229,7 @@ class TestLoggerd:
random.sample(no_qlog_services, random.randint(2, min(10, len(no_qlog_services)))) random.sample(no_qlog_services, random.randint(2, min(10, len(no_qlog_services))))
sent_msgs = self._publish_random_messages(services) sent_msgs = self._publish_random_messages(services)
qlog_path = os.path.join(self._get_latest_log_dir(), "qlog") qlog_path = os.path.join(self._get_latest_log_dir(), "qlog.zst")
lr = list(LogReader(qlog_path)) lr = list(LogReader(qlog_path))
# check initData and sentinel # check initData and sentinel
@ -255,7 +255,7 @@ class TestLoggerd:
services = random.sample(CEREAL_SERVICES, random.randint(5, 10)) services = random.sample(CEREAL_SERVICES, random.randint(5, 10))
sent_msgs = self._publish_random_messages(services) sent_msgs = self._publish_random_messages(services)
lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog"))) lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog.zst")))
# check initData and sentinel # check initData and sentinel
self._check_init_data(lr) self._check_init_data(lr)

@ -0,0 +1,38 @@
#include <zstd.h>
#include <catch2/catch.hpp>
#include <cstring>
#include <vector>
#include "common/util.h"
#include "system/loggerd/logger.h"
#include "system/loggerd/zstd_writer.h"
TEST_CASE("ZstdFileWriter writes and compresses data correctly in loops", "[ZstdFileWriter]") {
const std::string filename = "test_zstd_file.zst";
const int iterations = 100;
const size_t dataSize = 1024;
std::string totalTestData;
// Step 1: Write compressed data to file in a loop
{
ZstdFileWriter writer(filename, LOG_COMPRESSION_LEVEL);
for (int i = 0; i < iterations; ++i) {
std::string testData = util::random_string(dataSize);
totalTestData.append(testData);
writer.write((void *)testData.c_str(), testData.size());
}
}
// Step 2: Decompress the file and verify the data
auto compressedContent = util::read_file(filename);
std::string decompressedData = zstd_decompress(compressedContent);
// Step 3: Verify that the decompressed data matches the original accumulated data
REQUIRE(decompressedData.size() == totalTestData.size());
REQUIRE(std::memcmp(decompressedData.data(), totalTestData.c_str(), totalTestData.size()) == 0);
// Clean up the test file
std::remove(filename.c_str());
}

@ -0,0 +1,65 @@
#include "system/loggerd/zstd_writer.h"
#include <cassert>
#include "common/util.h"
// Constructor: Initializes compression stream and opens file
ZstdFileWriter::ZstdFileWriter(const std::string& filename, int compression_level) {
// Create the compression stream
cstream_ = ZSTD_createCStream();
assert(cstream_);
size_t initResult = ZSTD_initCStream(cstream_, compression_level);
assert(!ZSTD_isError(initResult));
input_cache_capacity_ = ZSTD_CStreamInSize();
input_cache_.reserve(input_cache_capacity_);
output_buffer_.resize(ZSTD_CStreamOutSize());
file_ = util::safe_fopen(filename.c_str(), "wb");
assert(file_ != nullptr);
}
// Destructor: Finalizes compression and closes file
ZstdFileWriter::~ZstdFileWriter() {
flushCache(true);
util::safe_fflush(file_);
int err = fclose(file_);
assert(err == 0);
ZSTD_freeCStream(cstream_);
}
// Compresses and writes data to file
void ZstdFileWriter::write(void* data, size_t size) {
// Add data to the input cache
input_cache_.insert(input_cache_.end(), (uint8_t*)data, (uint8_t*)data + size);
// If the cache is full, compress and write to the file
if (input_cache_.size() >= input_cache_capacity_) {
flushCache(false);
}
}
// Compress and flush the input cache to the file
void ZstdFileWriter::flushCache(bool last_chunk) {
ZSTD_inBuffer input = {input_cache_.data(), input_cache_.size(), 0};
ZSTD_EndDirective mode = !last_chunk ? ZSTD_e_continue : ZSTD_e_end;
int finished = 0;
do {
ZSTD_outBuffer output = {output_buffer_.data(), output_buffer_.size(), 0};
size_t remaining = ZSTD_compressStream2(cstream_, &output, &input, mode);
assert(!ZSTD_isError(remaining));
size_t written = util::safe_fwrite(output_buffer_.data(), 1, output.pos, file_);
assert(written == output.pos);
finished = last_chunk ? (remaining == 0) : (input.pos == input.size);
} while (!finished);
input_cache_.clear(); // Clear cache after compression
}

@ -0,0 +1,24 @@
#pragma once
#include <zstd.h>
#include <string>
#include <vector>
#include <capnp/common.h>
class ZstdFileWriter {
public:
ZstdFileWriter(const std::string &filename, int compression_level);
~ZstdFileWriter();
void write(void* data, size_t size);
inline void write(kj::ArrayPtr<capnp::byte> array) { write(array.begin(), array.size()); }
private:
void flushCache(bool last_chunk);
size_t input_cache_capacity_ = 0;
std::vector<char> input_cache_;
std::vector<char> output_buffer_;
ZSTD_CStream *cstream_;
FILE* file_ = nullptr;
};

@ -38,6 +38,14 @@ def save_log(dest, log_msgs, compress=True):
with open(dest, "wb") as f: with open(dest, "wb") as f:
f.write(dat) f.write(dat)
def decompress_stream(data: bytes):
dctx = zstd.ZstdDecompressor()
decompressed_data = b""
with dctx.stream_reader(data) as reader:
decompressed_data = reader.read()
return decompressed_data
class _LogFileReader: class _LogFileReader:
def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False, dat=None): def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False, dat=None):
@ -58,7 +66,7 @@ class _LogFileReader:
dat = bz2.decompress(dat) dat = bz2.decompress(dat)
elif ext == ".zst" or dat.startswith(b'\x28\xB5\x2F\xFD'): elif ext == ".zst" or dat.startswith(b'\x28\xB5\x2F\xFD'):
# https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#zstandard-frames # https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#zstandard-frames
dat = zstd.decompress(dat) dat = decompress_stream(dat)
ents = capnp_log.Event.read_multiple_bytes(dat) ents = capnp_log.Event.read_multiple_bytes(dat)

@ -159,7 +159,7 @@ if __name__ == '__main__':
lr = LogReader(args.route) lr = LogReader(args.route)
else: else:
segs = [seg for seg in os.listdir(Paths.log_root()) if args.route in seg] segs = [seg for seg in os.listdir(Paths.log_root()) if args.route in seg]
lr = LogReader([os.path.join(Paths.log_root(), seg, 'rlog') for seg in segs]) lr = LogReader([os.path.join(Paths.log_root(), seg, 'rlog.zst') for seg in segs])
CP = lr.first('carParams') CP = lr.first('carParams')
ID = lr.first('initData') ID = lr.first('initData')

Loading…
Cancel
Save