From 9ec54f59c666fdb285134c32555b7ff8352cf4e7 Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Sun, 9 Feb 2025 04:55:31 +0800 Subject: [PATCH] loggerd: switch logging from raw file format to zstd compressed files (#34549) * switch logging from raw file format to zstd compressed files * more zst suffix * compress bootlog * remove class RawFile * Optimize ZstdFileWriter by adding input caching * use ZSTD_compressStream2 * cleanup * LOG_COMPRESSION_LEVEL=10 * space * add zst suffix to LOGS_SIZE_RATE --- selfdrive/test/test_onroad.py | 17 ++--- system/athena/tests/test_athenad.py | 2 +- system/loggerd/SConscript | 6 +- system/loggerd/bootlog.cc | 5 +- system/loggerd/logger.cc | 43 ++++++++++-- system/loggerd/logger.h | 26 ++------ system/loggerd/tests/test_encoder.py | 2 +- system/loggerd/tests/test_logger.cc | 5 +- system/loggerd/tests/test_loggerd.py | 6 +- system/loggerd/tests/test_zstd_writer.cc | 38 +++++++++++ system/loggerd/zstd_writer.cc | 65 +++++++++++++++++++ system/loggerd/zstd_writer.h | 24 +++++++ tools/lib/logreader.py | 10 ++- .../longitudinal_maneuvers/generate_report.py | 2 +- 14 files changed, 200 insertions(+), 51 deletions(-) create mode 100644 system/loggerd/tests/test_zstd_writer.cc create mode 100644 system/loggerd/zstd_writer.cc create mode 100644 system/loggerd/zstd_writer.h diff --git a/selfdrive/test/test_onroad.py b/selfdrive/test/test_onroad.py index 1279a505dd..72f0b10a97 100644 --- a/selfdrive/test/test_onroad.py +++ b/selfdrive/test/test_onroad.py @@ -8,7 +8,6 @@ import shutil import subprocess import time import numpy as np -import zstandard as zstd from collections import Counter, defaultdict from pathlib import Path from tabulate import tabulate @@ -23,7 +22,6 @@ from openpilot.selfdrive.selfdrived.events import EVENTS, ET from openpilot.selfdrive.test.helpers import set_params_enabled, release_only from openpilot.system.hardware import HARDWARE from openpilot.system.hardware.hw import Paths -from openpilot.system.loggerd.uploader import LOG_COMPRESSION_LEVEL from openpilot.tools.lib.logreader import LogReader """ @@ -102,8 +100,8 @@ TIMINGS = { } LOGS_SIZE_RATE = { - "qlog": 0.0083, - "rlog": 0.135, + "qlog.zst": 0.0083, + "rlog.zst": 0.135, "qcamera.ts": 0.03828, } LOGS_SIZE_RATE.update(dict.fromkeys(['ecamera.hevc', 'fcamera.hevc'], 1.2740)) @@ -119,10 +117,10 @@ class TestOnroad: @classmethod def setup_class(cls): if "DEBUG" in os.environ: - segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog")), Path(Paths.log_root()).iterdir()) + segs = filter(lambda x: os.path.exists(os.path.join(x, "rlog.zst")), Path(Paths.log_root()).iterdir()) segs = sorted(segs, key=lambda x: x.stat().st_mtime) print(segs[-3]) - cls.lr = list(LogReader(os.path.join(segs[-3], "rlog"))) + cls.lr = list(LogReader(os.path.join(segs[-3], "rlog.zst"))) return # setup env @@ -173,18 +171,15 @@ class TestOnroad: if proc.wait(60) is None: proc.kill() - cls.lrs = [list(LogReader(os.path.join(str(s), "rlog"))) for s in cls.segments] + cls.lrs = [list(LogReader(os.path.join(str(s), "rlog.zst"))) for s in cls.segments] - cls.lr = list(LogReader(os.path.join(str(cls.segments[0]), "rlog"))) + cls.lr = list(LogReader(os.path.join(str(cls.segments[0]), "rlog.zst"))) cls.log_path = cls.segments[0] cls.log_sizes = {} for f in cls.log_path.iterdir(): assert f.is_file() cls.log_sizes[f] = f.stat().st_size / 1e6 - if f.name in ("qlog", "rlog"): - with open(f, 'rb') as ff: - cls.log_sizes[f] = len(zstd.compress(ff.read(), LOG_COMPRESSION_LEVEL)) / 1e6 cls.msgs = defaultdict(list) for m in cls.lr: diff --git a/system/athena/tests/test_athenad.py b/system/athena/tests/test_athenad.py index a6bfc68930..e16e73a7ea 100644 --- a/system/athena/tests/test_athenad.py +++ b/system/athena/tests/test_athenad.py @@ -138,7 +138,7 @@ class TestAthenadMethods: route = '2021-03-29--13-32-47' segments = [0, 1, 2, 3, 11] - filenames = ['qlog', 'qcamera.ts', 'rlog', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc'] + filenames = ['qlog.zst', 'qcamera.ts', 'rlog.zst', 'fcamera.hevc', 'ecamera.hevc', 'dcamera.hevc'] files = [f'{route}--{s}/{f}' for s in segments for f in filenames] for file in files: self._create_file(file) diff --git a/system/loggerd/SConscript b/system/loggerd/SConscript index 196d18476a..6ec0cc15ed 100644 --- a/system/loggerd/SConscript +++ b/system/loggerd/SConscript @@ -2,9 +2,9 @@ Import('env', 'arch', 'messaging', 'common', 'visionipc') libs = [common, messaging, visionipc, 'z', 'avformat', 'avcodec', 'swscale', - 'avutil', 'yuv', 'OpenCL', 'pthread'] + 'avutil', 'yuv', 'OpenCL', 'pthread', 'zstd'] -src = ['logger.cc', 'video_writer.cc', 'encoder/encoder.cc', 'encoder/v4l_encoder.cc'] +src = ['logger.cc', 'zstd_writer.cc', 'video_writer.cc', 'encoder/encoder.cc', 'encoder/v4l_encoder.cc'] if arch != "larch64": src += ['encoder/ffmpeg_encoder.cc'] @@ -23,4 +23,4 @@ env.Program('encoderd', ['encoderd.cc'], LIBS=libs) env.Program('bootlog.cc', LIBS=libs) if GetOption('extras'): - env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc'], LIBS=libs + ['curl', 'crypto']) + env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc', 'tests/test_zstd_writer.cc'], LIBS=libs + ['curl', 'crypto']) diff --git a/system/loggerd/bootlog.cc b/system/loggerd/bootlog.cc index 85eeb369a1..a7b1ae8244 100644 --- a/system/loggerd/bootlog.cc +++ b/system/loggerd/bootlog.cc @@ -5,6 +5,7 @@ #include "common/params.h" #include "common/swaglog.h" #include "system/loggerd/logger.h" +#include "system/loggerd/zstd_writer.h" static kj::Array build_boot_log() { @@ -50,14 +51,14 @@ static kj::Array build_boot_log() { int main(int argc, char** argv) { const std::string id = logger_get_identifier("BootCount"); - const std::string path = Path::log_root() + "/boot/" + id; + const std::string path = Path::log_root() + "/boot/" + id + ".zst"; LOGW("bootlog to %s", path.c_str()); // Open bootlog bool r = util::create_directories(Path::log_root() + "/boot/", 0775); assert(r); - RawFile file(path.c_str()); + ZstdFileWriter file(path, LOG_COMPRESSION_LEVEL); // Write initdata file.write(logger_build_init_data().asBytes()); // Write bootlog diff --git a/system/loggerd/logger.cc b/system/loggerd/logger.cc index 213aa69d3e..1461ceaca6 100644 --- a/system/loggerd/logger.cc +++ b/system/loggerd/logger.cc @@ -113,6 +113,42 @@ std::string logger_get_identifier(std::string key) { return util::string_format("%08x--%s", cnt, ss.str().c_str()); } +std::string zstd_decompress(const std::string &in) { + ZSTD_DCtx *dctx = ZSTD_createDCtx(); + assert(dctx != nullptr); + + // Initialize input and output buffers + ZSTD_inBuffer input = {in.data(), in.size(), 0}; + + // Estimate and reserve memory for decompressed data + size_t estimatedDecompressedSize = ZSTD_getFrameContentSize(in.data(), in.size()); + if (estimatedDecompressedSize == ZSTD_CONTENTSIZE_ERROR || estimatedDecompressedSize == ZSTD_CONTENTSIZE_UNKNOWN) { + estimatedDecompressedSize = in.size() * 2; // Use a fallback size + } + + std::string decompressedData; + decompressedData.reserve(estimatedDecompressedSize); + + const size_t bufferSize = ZSTD_DStreamOutSize(); // Recommended output buffer size + std::string outputBuffer(bufferSize, '\0'); + + while (input.pos < input.size) { + ZSTD_outBuffer output = {outputBuffer.data(), bufferSize, 0}; + + size_t result = ZSTD_decompressStream(dctx, &output, &input); + if (ZSTD_isError(result)) { + break; + } + + decompressedData.append(outputBuffer.data(), output.pos); + } + + ZSTD_freeDCtx(dctx); + decompressedData.shrink_to_fit(); + return decompressedData; +} + + static void log_sentinel(LoggerState *log, SentinelType type, int exit_signal = 0) { MessageBuilder msg; auto sen = msg.initEvent().initSentinel(); @@ -144,12 +180,11 @@ bool LoggerState::next() { bool ret = util::create_directories(segment_path, 0775); assert(ret == true); - const std::string rlog_path = segment_path + "/rlog"; - lock_file = rlog_path + ".lock"; + lock_file = segment_path + "/rlog.lock"; std::ofstream{lock_file}; - rlog.reset(new RawFile(rlog_path)); - qlog.reset(new RawFile(segment_path + "/qlog")); + rlog.reset(new ZstdFileWriter(segment_path + "/rlog.zst", LOG_COMPRESSION_LEVEL)); + qlog.reset(new ZstdFileWriter(segment_path + "/qlog.zst", LOG_COMPRESSION_LEVEL)); // log init data & sentinel type. write(init_data.asBytes(), true); diff --git a/system/loggerd/logger.h b/system/loggerd/logger.h index 7a8490d57a..18d07b5f38 100644 --- a/system/loggerd/logger.h +++ b/system/loggerd/logger.h @@ -7,31 +7,12 @@ #include "cereal/messaging/messaging.h" #include "common/util.h" #include "system/hardware/hw.h" +#include "system/loggerd/zstd_writer.h" -class RawFile { - public: - RawFile(const std::string &path) { - file = util::safe_fopen(path.c_str(), "wb"); - assert(file != nullptr); - } - ~RawFile() { - util::safe_fflush(file); - int err = fclose(file); - assert(err == 0); - } - inline void write(void* data, size_t size) { - int written = util::safe_fwrite(data, 1, size, file); - assert(written == size); - } - inline void write(kj::ArrayPtr array) { write(array.begin(), array.size()); } - - private: - FILE* file = nullptr; -}; +constexpr int LOG_COMPRESSION_LEVEL = 10; typedef cereal::Sentinel::SentinelType SentinelType; - class LoggerState { public: LoggerState(const std::string& log_root = Path::log_root()); @@ -48,8 +29,9 @@ protected: int part = -1, exit_signal = 0; std::string route_path, route_name, segment_path, lock_file; kj::Array init_data; - std::unique_ptr rlog, qlog; + std::unique_ptr rlog, qlog; }; kj::Array logger_build_init_data(); std::string logger_get_identifier(std::string key); +std::string zstd_decompress(const std::string &in); diff --git a/system/loggerd/tests/test_encoder.py b/system/loggerd/tests/test_encoder.py index cf38c8bc31..b24bfbe168 100644 --- a/system/loggerd/tests/test_encoder.py +++ b/system/loggerd/tests/test_encoder.py @@ -106,7 +106,7 @@ class TestEncoder: # Check encodeIdx if encode_idx_name is not None: - rlog_path = f"{route_prefix_path}--{i}/rlog" + rlog_path = f"{route_prefix_path}--{i}/rlog.zst" msgs = [m for m in LogReader(rlog_path) if m.which() == encode_idx_name] encode_msgs = [getattr(m, encode_idx_name) for m in msgs] diff --git a/system/loggerd/tests/test_logger.cc b/system/loggerd/tests/test_logger.cc index 2dae136e13..40a45a68d5 100644 --- a/system/loggerd/tests/test_logger.cc +++ b/system/loggerd/tests/test_logger.cc @@ -9,12 +9,13 @@ void verify_segment(const std::string &route_path, int segment, int max_segment, SentinelType end_sentinel = segment == max_segment - 1 ? SentinelType::END_OF_ROUTE : SentinelType::END_OF_SEGMENT; REQUIRE(!util::file_exists(segment_path + "/rlog.lock")); - for (const char *fn : {"/rlog", "/qlog"}) { + for (const char *fn : {"/rlog.zst", "/qlog.zst"}) { const std::string log_file = segment_path + fn; std::string log = util::read_file(log_file); REQUIRE(!log.empty()); + std::string decompressed_log = zstd_decompress(log); int event_cnt = 0, i = 0; - kj::ArrayPtr words((capnp::word *)log.data(), log.size() / sizeof(capnp::word)); + kj::ArrayPtr words((capnp::word *)decompressed_log.data(), decompressed_log.size() / sizeof(capnp::word)); while (words.size() > 0) { try { capnp::FlatArrayMessageReader reader(words); diff --git a/system/loggerd/tests/test_loggerd.py b/system/loggerd/tests/test_loggerd.py index 34abe553a1..179e237a65 100644 --- a/system/loggerd/tests/test_loggerd.py +++ b/system/loggerd/tests/test_loggerd.py @@ -142,7 +142,7 @@ class TestLoggerd: Params().put("RecordFront", "1") d = DEVICE_CAMERAS[("tici", "ar0231")] - expected_files = {"rlog", "qlog", "qcamera.ts", "fcamera.hevc", "dcamera.hevc", "ecamera.hevc"} + expected_files = {"rlog.zst", "qlog.zst", "qcamera.ts", "fcamera.hevc", "dcamera.hevc", "ecamera.hevc"} streams = [(VisionStreamType.VISION_STREAM_ROAD, (d.fcam.width, d.fcam.height, 2048*2346, 2048, 2048*1216), "roadCameraState"), (VisionStreamType.VISION_STREAM_DRIVER, (d.dcam.width, d.dcam.height, 2048*2346, 2048, 2048*1216), "driverCameraState"), (VisionStreamType.VISION_STREAM_WIDE_ROAD, (d.ecam.width, d.ecam.height, 2048*2346, 2048, 2048*1216), "wideRoadCameraState")] @@ -229,7 +229,7 @@ class TestLoggerd: random.sample(no_qlog_services, random.randint(2, min(10, len(no_qlog_services)))) sent_msgs = self._publish_random_messages(services) - qlog_path = os.path.join(self._get_latest_log_dir(), "qlog") + qlog_path = os.path.join(self._get_latest_log_dir(), "qlog.zst") lr = list(LogReader(qlog_path)) # check initData and sentinel @@ -255,7 +255,7 @@ class TestLoggerd: services = random.sample(CEREAL_SERVICES, random.randint(5, 10)) sent_msgs = self._publish_random_messages(services) - lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog"))) + lr = list(LogReader(os.path.join(self._get_latest_log_dir(), "rlog.zst"))) # check initData and sentinel self._check_init_data(lr) diff --git a/system/loggerd/tests/test_zstd_writer.cc b/system/loggerd/tests/test_zstd_writer.cc new file mode 100644 index 0000000000..f116bb2d5e --- /dev/null +++ b/system/loggerd/tests/test_zstd_writer.cc @@ -0,0 +1,38 @@ +#include + +#include +#include +#include + +#include "common/util.h" +#include "system/loggerd/logger.h" +#include "system/loggerd/zstd_writer.h" + +TEST_CASE("ZstdFileWriter writes and compresses data correctly in loops", "[ZstdFileWriter]") { + const std::string filename = "test_zstd_file.zst"; + const int iterations = 100; + const size_t dataSize = 1024; + + std::string totalTestData; + + // Step 1: Write compressed data to file in a loop + { + ZstdFileWriter writer(filename, LOG_COMPRESSION_LEVEL); + for (int i = 0; i < iterations; ++i) { + std::string testData = util::random_string(dataSize); + totalTestData.append(testData); + writer.write((void *)testData.c_str(), testData.size()); + } + } + + // Step 2: Decompress the file and verify the data + auto compressedContent = util::read_file(filename); + std::string decompressedData = zstd_decompress(compressedContent); + + // Step 3: Verify that the decompressed data matches the original accumulated data + REQUIRE(decompressedData.size() == totalTestData.size()); + REQUIRE(std::memcmp(decompressedData.data(), totalTestData.c_str(), totalTestData.size()) == 0); + + // Clean up the test file + std::remove(filename.c_str()); +} diff --git a/system/loggerd/zstd_writer.cc b/system/loggerd/zstd_writer.cc new file mode 100644 index 0000000000..69ca64479e --- /dev/null +++ b/system/loggerd/zstd_writer.cc @@ -0,0 +1,65 @@ + +#include "system/loggerd/zstd_writer.h" + +#include + +#include "common/util.h" + +// Constructor: Initializes compression stream and opens file +ZstdFileWriter::ZstdFileWriter(const std::string& filename, int compression_level) { + // Create the compression stream + cstream_ = ZSTD_createCStream(); + assert(cstream_); + + size_t initResult = ZSTD_initCStream(cstream_, compression_level); + assert(!ZSTD_isError(initResult)); + + input_cache_capacity_ = ZSTD_CStreamInSize(); + input_cache_.reserve(input_cache_capacity_); + output_buffer_.resize(ZSTD_CStreamOutSize()); + + file_ = util::safe_fopen(filename.c_str(), "wb"); + assert(file_ != nullptr); +} + +// Destructor: Finalizes compression and closes file +ZstdFileWriter::~ZstdFileWriter() { + flushCache(true); + util::safe_fflush(file_); + + int err = fclose(file_); + assert(err == 0); + + ZSTD_freeCStream(cstream_); +} + +// Compresses and writes data to file +void ZstdFileWriter::write(void* data, size_t size) { + // Add data to the input cache + input_cache_.insert(input_cache_.end(), (uint8_t*)data, (uint8_t*)data + size); + + // If the cache is full, compress and write to the file + if (input_cache_.size() >= input_cache_capacity_) { + flushCache(false); + } +} + +// Compress and flush the input cache to the file +void ZstdFileWriter::flushCache(bool last_chunk) { + ZSTD_inBuffer input = {input_cache_.data(), input_cache_.size(), 0}; + ZSTD_EndDirective mode = !last_chunk ? ZSTD_e_continue : ZSTD_e_end; + int finished = 0; + + do { + ZSTD_outBuffer output = {output_buffer_.data(), output_buffer_.size(), 0}; + size_t remaining = ZSTD_compressStream2(cstream_, &output, &input, mode); + assert(!ZSTD_isError(remaining)); + + size_t written = util::safe_fwrite(output_buffer_.data(), 1, output.pos, file_); + assert(written == output.pos); + + finished = last_chunk ? (remaining == 0) : (input.pos == input.size); + } while (!finished); + + input_cache_.clear(); // Clear cache after compression +} diff --git a/system/loggerd/zstd_writer.h b/system/loggerd/zstd_writer.h new file mode 100644 index 0000000000..b11deaab20 --- /dev/null +++ b/system/loggerd/zstd_writer.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +#include +#include +#include + +class ZstdFileWriter { +public: + ZstdFileWriter(const std::string &filename, int compression_level); + ~ZstdFileWriter(); + void write(void* data, size_t size); + inline void write(kj::ArrayPtr array) { write(array.begin(), array.size()); } + +private: + void flushCache(bool last_chunk); + + size_t input_cache_capacity_ = 0; + std::vector input_cache_; + std::vector output_buffer_; + ZSTD_CStream *cstream_; + FILE* file_ = nullptr; +}; diff --git a/tools/lib/logreader.py b/tools/lib/logreader.py index 5f7cdd3043..530091d70c 100755 --- a/tools/lib/logreader.py +++ b/tools/lib/logreader.py @@ -38,6 +38,14 @@ def save_log(dest, log_msgs, compress=True): with open(dest, "wb") as f: f.write(dat) +def decompress_stream(data: bytes): + dctx = zstd.ZstdDecompressor() + decompressed_data = b"" + + with dctx.stream_reader(data) as reader: + decompressed_data = reader.read() + + return decompressed_data class _LogFileReader: def __init__(self, fn, canonicalize=True, only_union_types=False, sort_by_time=False, dat=None): @@ -58,7 +66,7 @@ class _LogFileReader: dat = bz2.decompress(dat) elif ext == ".zst" or dat.startswith(b'\x28\xB5\x2F\xFD'): # https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#zstandard-frames - dat = zstd.decompress(dat) + dat = decompress_stream(dat) ents = capnp_log.Event.read_multiple_bytes(dat) diff --git a/tools/longitudinal_maneuvers/generate_report.py b/tools/longitudinal_maneuvers/generate_report.py index e5972495b7..8c16e30d56 100755 --- a/tools/longitudinal_maneuvers/generate_report.py +++ b/tools/longitudinal_maneuvers/generate_report.py @@ -159,7 +159,7 @@ if __name__ == '__main__': lr = LogReader(args.route) else: segs = [seg for seg in os.listdir(Paths.log_root()) if args.route in seg] - lr = LogReader([os.path.join(Paths.log_root(), seg, 'rlog') for seg in segs]) + lr = LogReader([os.path.join(Paths.log_root(), seg, 'rlog.zst') for seg in segs]) CP = lr.first('carParams') ID = lr.first('initData')