From bbd1648f0561b7b3270a2bc7b416841ac10fd9db Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Tue, 23 Apr 2024 10:21:42 +0800 Subject: [PATCH] replay: optimize memory usage with `MonotonicBuffer` (#32278) Optimize Memory Usage with MonotonicBuffe --- tools/cabana/streams/abstractstream.h | 1 + tools/cabana/utils/util.cc | 23 ---------------------- tools/cabana/utils/util.h | 16 --------------- tools/replay/logreader.cc | 28 +++++++++++++++++---------- tools/replay/logreader.h | 4 ++++ tools/replay/replay.cc | 7 ++++++- tools/replay/replay.h | 1 + tools/replay/route.cc | 5 +++-- tools/replay/route.h | 4 +++- tools/replay/util.cc | 24 +++++++++++++++++++++++ tools/replay/util.h | 16 +++++++++++++++ 11 files changed, 76 insertions(+), 53 deletions(-) diff --git a/tools/cabana/streams/abstractstream.h b/tools/cabana/streams/abstractstream.h index 10ffcb184a..18c00cb8b6 100644 --- a/tools/cabana/streams/abstractstream.h +++ b/tools/cabana/streams/abstractstream.h @@ -13,6 +13,7 @@ #include "cereal/messaging/messaging.h" #include "tools/cabana/dbc/dbcmanager.h" #include "tools/cabana/utils/util.h" +#include "tools/replay/util.h" struct CanData { void compute(const MessageId &msg_id, const uint8_t *dat, const int size, double current_sec, diff --git a/tools/cabana/utils/util.cc b/tools/cabana/utils/util.cc index a5f6cf0f5e..f85ea6d105 100644 --- a/tools/cabana/utils/util.cc +++ b/tools/cabana/utils/util.cc @@ -263,26 +263,3 @@ QString signalToolTip(const cabana::Signal *sig) { )").arg(sig->name).arg(sig->start_bit).arg(sig->size).arg(sig->msb).arg(sig->lsb) .arg(sig->is_little_endian ? "Y" : "N").arg(sig->is_signed ? "Y" : "N"); } - -// MonotonicBuffer - -void *MonotonicBuffer::allocate(size_t bytes, size_t alignment) { - assert(bytes > 0); - void *p = std::align(alignment, bytes, current_buf, available); - if (p == nullptr) { - available = next_buffer_size = std::max(next_buffer_size, bytes); - current_buf = buffers.emplace_back(std::aligned_alloc(alignment, next_buffer_size)); - next_buffer_size *= growth_factor; - p = current_buf; - } - - current_buf = (char *)current_buf + bytes; - available -= bytes; - return p; -} - -MonotonicBuffer::~MonotonicBuffer() { - for (auto buf : buffers) { - free(buf); - } -} diff --git a/tools/cabana/utils/util.h b/tools/cabana/utils/util.h index 158321f784..218b8eeb51 100644 --- a/tools/cabana/utils/util.h +++ b/tools/cabana/utils/util.h @@ -2,7 +2,6 @@ #include #include -#include #include #include @@ -160,20 +159,5 @@ private: QSocketNotifier *sn; }; -class MonotonicBuffer { -public: - MonotonicBuffer(size_t initial_size) : next_buffer_size(initial_size) {} - ~MonotonicBuffer(); - void *allocate(size_t bytes, size_t alignment = 16ul); - void deallocate(void *p) {} - -private: - void *current_buf = nullptr; - size_t next_buffer_size = 0; - size_t available = 0; - std::deque buffers; - static constexpr float growth_factor = 1.5; -}; - int num_decimals(double num); QString signalToolTip(const cabana::Signal *sig); diff --git a/tools/replay/logreader.cc b/tools/replay/logreader.cc index f52ef4a4eb..0f1638145f 100644 --- a/tools/replay/logreader.cc +++ b/tools/replay/logreader.cc @@ -1,18 +1,19 @@ #include "tools/replay/logreader.h" #include +#include #include "tools/replay/filereader.h" #include "tools/replay/util.h" bool LogReader::load(const std::string &url, std::atomic *abort, bool local_cache, int chunk_size, int retries) { - raw_ = FileReader(local_cache, chunk_size, retries).read(url, abort); - if (raw_.empty()) return false; + std::string data = FileReader(local_cache, chunk_size, retries).read(url, abort); + if (!data.empty() && url.find(".bz2") != std::string::npos) + data = decompressBZ2(data, abort); - if (url.find(".bz2") != std::string::npos) { - raw_ = decompressBZ2(raw_, abort); - if (raw_.empty()) return false; - } - return load(raw_.data(), raw_.size(), abort); + bool success = !data.empty() && load(data.data(), data.size(), abort); + if (filters_.empty()) + raw_ = std::move(data); + return success; } bool LogReader::load(const char *data, size_t size, std::atomic *abort) { @@ -23,9 +24,18 @@ bool LogReader::load(const char *data, size_t size, std::atomic *abort) { capnp::FlatArrayMessageReader reader(words); auto event = reader.getRoot(); auto which = event.which(); - uint64_t mono_time = event.getLogMonoTime(); auto event_data = kj::arrayPtr(words.begin(), reader.getEnd()); + words = kj::arrayPtr(reader.getEnd(), words.end()); + + if (!filters_.empty()) { + if (which >= filters_.size() || !filters_[which]) + continue; + auto buf = buffer_.allocate(event_data.size() * sizeof(capnp::word)); + memcpy(buf, event_data.begin(), event_data.size() * sizeof(capnp::word)); + event_data = kj::arrayPtr((const capnp::word *)buf, event_data.size()); + } + uint64_t mono_time = event.getLogMonoTime(); const Event &evt = events.emplace_back(which, mono_time, event_data); // Add encodeIdx packet again as a frame packet for the video stream if (evt.which == cereal::Event::ROAD_ENCODE_IDX || @@ -37,8 +47,6 @@ bool LogReader::load(const char *data, size_t size, std::atomic *abort) { } events.emplace_back(which, mono_time, event_data, idx.getSegmentNum()); } - - words = kj::arrayPtr(reader.getEnd(), words.end()); } } catch (const kj::Exception &e) { rWarning("Failed to parse log : %s.\nRetrieved %zu events from corrupt log", e.getDescription().cStr(), events.size()); diff --git a/tools/replay/logreader.h b/tools/replay/logreader.h index 56633c191b..782c00b90a 100644 --- a/tools/replay/logreader.h +++ b/tools/replay/logreader.h @@ -5,6 +5,7 @@ #include "cereal/gen/cpp/log.capnp.h" #include "system/camerad/cameras/camera_common.h" +#include "tools/replay/util.h" const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam}; const int MAX_CAMERAS = std::size(ALL_CAMERAS); @@ -26,6 +27,7 @@ public: class LogReader { public: + LogReader(const std::vector &filters = {}) { filters_ = filters; } bool load(const std::string &url, std::atomic *abort = nullptr, bool local_cache = false, int chunk_size = -1, int retries = 0); bool load(const char *data, size_t size, std::atomic *abort = nullptr); @@ -33,4 +35,6 @@ public: private: std::string raw_; + std::vector filters_; + MonotonicBuffer buffer_{1024 * 1024}; }; diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index f5339050a8..43aef7b881 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -27,6 +27,11 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s sockets_[which] = name.c_str(); } } + if (!allow.isEmpty()) { + for (int i = 0; i < sockets_.size(); ++i) { + filters_.push_back(i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]); + } + } std::vector s; std::copy_if(sockets_.begin(), sockets_.end(), std::back_inserter(s), @@ -259,7 +264,7 @@ void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator auto it = std::find_if(begin, end, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); }); if (it != end && !it->second) { rDebug("loading segment %d...", it->first); - it->second = std::make_unique(it->first, route_->at(it->first), flags_); + it->second = std::make_unique(it->first, route_->at(it->first), flags_, filters_); QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); return true; } diff --git a/tools/replay/replay.h b/tools/replay/replay.h index b8f7852e4f..e3f321e1d7 100644 --- a/tools/replay/replay.h +++ b/tools/replay/replay.h @@ -135,6 +135,7 @@ protected: SubMaster *sm = nullptr; std::unique_ptr pm; std::vector sockets_; + std::vector filters_; std::unique_ptr route_; std::unique_ptr camera_server_; std::atomic flags_ = REPLAY_FLAG_NONE; diff --git a/tools/replay/route.cc b/tools/replay/route.cc index f2a0754da1..1c7010eb8a 100644 --- a/tools/replay/route.cc +++ b/tools/replay/route.cc @@ -131,7 +131,8 @@ void Route::addFileToSegment(int n, const QString &file) { // class Segment -Segment::Segment(int n, const SegmentFile &files, uint32_t flags) : seg_num(n), flags(flags) { +Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector &filters) + : seg_num(n), flags(flags), filters_(filters) { // [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog const std::array file_list = { (flags & REPLAY_FLAG_QCAMERA) || files.road_cam.isEmpty() ? files.qcamera : files.road_cam, @@ -161,7 +162,7 @@ void Segment::loadFile(int id, const std::string file) { frames[id] = std::make_unique(); success = frames[id]->load(file, flags & REPLAY_FLAG_NO_HW_DECODER, &abort_, local_cache, 20 * 1024 * 1024, 3); } else { - log = std::make_unique(); + log = std::make_unique(filters_); success = log->load(file, &abort_, local_cache, 0, 3); } diff --git a/tools/replay/route.h b/tools/replay/route.h index 654c084ff2..f956497804 100644 --- a/tools/replay/route.h +++ b/tools/replay/route.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -55,7 +56,7 @@ class Segment : public QObject { Q_OBJECT public: - Segment(int n, const SegmentFile &files, uint32_t flags); + Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector &filters = {}); ~Segment(); inline bool isLoaded() const { return !loading_ && !abort_; } @@ -73,4 +74,5 @@ protected: std::atomic loading_ = 0; QFutureSynchronizer synchronizer_; uint32_t flags; + std::vector filters_; }; diff --git a/tools/replay/util.cc b/tools/replay/util.cc index 3ecd4297fd..c8203fd79d 100644 --- a/tools/replay/util.cc +++ b/tools/replay/util.cc @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -354,3 +355,26 @@ std::string sha256(const std::string &str) { SHA256_Final(hash, &sha256); return util::hexdump(hash, SHA256_DIGEST_LENGTH); } + +// MonotonicBuffer + +void *MonotonicBuffer::allocate(size_t bytes, size_t alignment) { + assert(bytes > 0); + void *p = std::align(alignment, bytes, current_buf, available); + if (p == nullptr) { + available = next_buffer_size = std::max(next_buffer_size, bytes); + current_buf = buffers.emplace_back(std::aligned_alloc(alignment, next_buffer_size)); + next_buffer_size *= growth_factor; + p = current_buf; + } + + current_buf = (char *)current_buf + bytes; + available -= bytes; + return p; +} + +MonotonicBuffer::~MonotonicBuffer() { + for (auto buf : buffers) { + free(buf); + } +} diff --git a/tools/replay/util.h b/tools/replay/util.h index fdb1dbf0f8..750c133555 100644 --- a/tools/replay/util.h +++ b/tools/replay/util.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -20,6 +21,21 @@ void logMessage(ReplyMsgType type, const char* fmt, ...); #define rWarning(fmt, ...) ::logMessage(ReplyMsgType::Warning, fmt, ## __VA_ARGS__) #define rError(fmt, ...) ::logMessage(ReplyMsgType::Critical , fmt, ## __VA_ARGS__) +class MonotonicBuffer { +public: + MonotonicBuffer(size_t initial_size) : next_buffer_size(initial_size) {} + ~MonotonicBuffer(); + void *allocate(size_t bytes, size_t alignment = 16ul); + void deallocate(void *p) {} + +private: + void *current_buf = nullptr; + size_t next_buffer_size = 0; + size_t available = 0; + std::deque buffers; + static constexpr float growth_factor = 1.5; +}; + std::string sha256(const std::string &str); void precise_nano_sleep(int64_t nanoseconds); std::string decompressBZ2(const std::string &in, std::atomic *abort = nullptr);