replay: optimize memory usage with `MonotonicBuffer` (#32278)

Optimize Memory Usage with MonotonicBuffe
pull/32255/head
Dean Lee 1 year ago committed by GitHub
parent 7f916f2e9d
commit bbd1648f05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      tools/cabana/streams/abstractstream.h
  2. 23
      tools/cabana/utils/util.cc
  3. 16
      tools/cabana/utils/util.h
  4. 28
      tools/replay/logreader.cc
  5. 4
      tools/replay/logreader.h
  6. 7
      tools/replay/replay.cc
  7. 1
      tools/replay/replay.h
  8. 5
      tools/replay/route.cc
  9. 4
      tools/replay/route.h
  10. 24
      tools/replay/util.cc
  11. 16
      tools/replay/util.h

@ -13,6 +13,7 @@
#include "cereal/messaging/messaging.h"
#include "tools/cabana/dbc/dbcmanager.h"
#include "tools/cabana/utils/util.h"
#include "tools/replay/util.h"
struct CanData {
void compute(const MessageId &msg_id, const uint8_t *dat, const int size, double current_sec,

@ -263,26 +263,3 @@ QString signalToolTip(const cabana::Signal *sig) {
)").arg(sig->name).arg(sig->start_bit).arg(sig->size).arg(sig->msb).arg(sig->lsb)
.arg(sig->is_little_endian ? "Y" : "N").arg(sig->is_signed ? "Y" : "N");
}
// MonotonicBuffer
void *MonotonicBuffer::allocate(size_t bytes, size_t alignment) {
assert(bytes > 0);
void *p = std::align(alignment, bytes, current_buf, available);
if (p == nullptr) {
available = next_buffer_size = std::max(next_buffer_size, bytes);
current_buf = buffers.emplace_back(std::aligned_alloc(alignment, next_buffer_size));
next_buffer_size *= growth_factor;
p = current_buf;
}
current_buf = (char *)current_buf + bytes;
available -= bytes;
return p;
}
MonotonicBuffer::~MonotonicBuffer() {
for (auto buf : buffers) {
free(buf);
}
}

@ -2,7 +2,6 @@
#include <array>
#include <cmath>
#include <deque>
#include <vector>
#include <utility>
@ -160,20 +159,5 @@ private:
QSocketNotifier *sn;
};
class MonotonicBuffer {
public:
MonotonicBuffer(size_t initial_size) : next_buffer_size(initial_size) {}
~MonotonicBuffer();
void *allocate(size_t bytes, size_t alignment = 16ul);
void deallocate(void *p) {}
private:
void *current_buf = nullptr;
size_t next_buffer_size = 0;
size_t available = 0;
std::deque<void *> buffers;
static constexpr float growth_factor = 1.5;
};
int num_decimals(double num);
QString signalToolTip(const cabana::Signal *sig);

@ -1,18 +1,19 @@
#include "tools/replay/logreader.h"
#include <algorithm>
#include <utility>
#include "tools/replay/filereader.h"
#include "tools/replay/util.h"
bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool local_cache, int chunk_size, int retries) {
raw_ = FileReader(local_cache, chunk_size, retries).read(url, abort);
if (raw_.empty()) return false;
std::string data = FileReader(local_cache, chunk_size, retries).read(url, abort);
if (!data.empty() && url.find(".bz2") != std::string::npos)
data = decompressBZ2(data, abort);
if (url.find(".bz2") != std::string::npos) {
raw_ = decompressBZ2(raw_, abort);
if (raw_.empty()) return false;
}
return load(raw_.data(), raw_.size(), abort);
bool success = !data.empty() && load(data.data(), data.size(), abort);
if (filters_.empty())
raw_ = std::move(data);
return success;
}
bool LogReader::load(const char *data, size_t size, std::atomic<bool> *abort) {
@ -23,9 +24,18 @@ bool LogReader::load(const char *data, size_t size, std::atomic<bool> *abort) {
capnp::FlatArrayMessageReader reader(words);
auto event = reader.getRoot<cereal::Event>();
auto which = event.which();
uint64_t mono_time = event.getLogMonoTime();
auto event_data = kj::arrayPtr(words.begin(), reader.getEnd());
words = kj::arrayPtr(reader.getEnd(), words.end());
if (!filters_.empty()) {
if (which >= filters_.size() || !filters_[which])
continue;
auto buf = buffer_.allocate(event_data.size() * sizeof(capnp::word));
memcpy(buf, event_data.begin(), event_data.size() * sizeof(capnp::word));
event_data = kj::arrayPtr((const capnp::word *)buf, event_data.size());
}
uint64_t mono_time = event.getLogMonoTime();
const Event &evt = events.emplace_back(which, mono_time, event_data);
// Add encodeIdx packet again as a frame packet for the video stream
if (evt.which == cereal::Event::ROAD_ENCODE_IDX ||
@ -37,8 +47,6 @@ bool LogReader::load(const char *data, size_t size, std::atomic<bool> *abort) {
}
events.emplace_back(which, mono_time, event_data, idx.getSegmentNum());
}
words = kj::arrayPtr(reader.getEnd(), words.end());
}
} catch (const kj::Exception &e) {
rWarning("Failed to parse log : %s.\nRetrieved %zu events from corrupt log", e.getDescription().cStr(), events.size());

@ -5,6 +5,7 @@
#include "cereal/gen/cpp/log.capnp.h"
#include "system/camerad/cameras/camera_common.h"
#include "tools/replay/util.h"
const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
const int MAX_CAMERAS = std::size(ALL_CAMERAS);
@ -26,6 +27,7 @@ public:
class LogReader {
public:
LogReader(const std::vector<bool> &filters = {}) { filters_ = filters; }
bool load(const std::string &url, std::atomic<bool> *abort = nullptr,
bool local_cache = false, int chunk_size = -1, int retries = 0);
bool load(const char *data, size_t size, std::atomic<bool> *abort = nullptr);
@ -33,4 +35,6 @@ public:
private:
std::string raw_;
std::vector<bool> filters_;
MonotonicBuffer buffer_{1024 * 1024};
};

@ -27,6 +27,11 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
sockets_[which] = name.c_str();
}
}
if (!allow.isEmpty()) {
for (int i = 0; i < sockets_.size(); ++i) {
filters_.push_back(i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]);
}
}
std::vector<const char *> s;
std::copy_if(sockets_.begin(), sockets_.end(), std::back_inserter(s),
@ -259,7 +264,7 @@ void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator
auto it = std::find_if(begin, end, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); });
if (it != end && !it->second) {
rDebug("loading segment %d...", it->first);
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_);
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_, filters_);
QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
return true;
}

@ -135,6 +135,7 @@ protected:
SubMaster *sm = nullptr;
std::unique_ptr<PubMaster> pm;
std::vector<const char*> sockets_;
std::vector<bool> filters_;
std::unique_ptr<Route> route_;
std::unique_ptr<CameraServer> camera_server_;
std::atomic<uint32_t> flags_ = REPLAY_FLAG_NONE;

@ -131,7 +131,8 @@ void Route::addFileToSegment(int n, const QString &file) {
// class Segment
Segment::Segment(int n, const SegmentFile &files, uint32_t flags) : seg_num(n), flags(flags) {
Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector<bool> &filters)
: seg_num(n), flags(flags), filters_(filters) {
// [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog
const std::array file_list = {
(flags & REPLAY_FLAG_QCAMERA) || files.road_cam.isEmpty() ? files.qcamera : files.road_cam,
@ -161,7 +162,7 @@ void Segment::loadFile(int id, const std::string file) {
frames[id] = std::make_unique<FrameReader>();
success = frames[id]->load(file, flags & REPLAY_FLAG_NO_HW_DECODER, &abort_, local_cache, 20 * 1024 * 1024, 3);
} else {
log = std::make_unique<LogReader>();
log = std::make_unique<LogReader>(filters_);
success = log->load(file, &abort_, local_cache, 0, 3);
}

@ -3,6 +3,7 @@
#include <map>
#include <memory>
#include <string>
#include <vector>
#include <QDateTime>
#include <QFutureSynchronizer>
@ -55,7 +56,7 @@ class Segment : public QObject {
Q_OBJECT
public:
Segment(int n, const SegmentFile &files, uint32_t flags);
Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector<bool> &filters = {});
~Segment();
inline bool isLoaded() const { return !loading_ && !abort_; }
@ -73,4 +74,5 @@ protected:
std::atomic<int> loading_ = 0;
QFutureSynchronizer<void> synchronizer_;
uint32_t flags;
std::vector<bool> filters_;
};

@ -5,6 +5,7 @@
#include <openssl/sha.h>
#include <cassert>
#include <algorithm>
#include <cmath>
#include <cstdarg>
#include <cstring>
@ -354,3 +355,26 @@ std::string sha256(const std::string &str) {
SHA256_Final(hash, &sha256);
return util::hexdump(hash, SHA256_DIGEST_LENGTH);
}
// MonotonicBuffer
void *MonotonicBuffer::allocate(size_t bytes, size_t alignment) {
assert(bytes > 0);
void *p = std::align(alignment, bytes, current_buf, available);
if (p == nullptr) {
available = next_buffer_size = std::max(next_buffer_size, bytes);
current_buf = buffers.emplace_back(std::aligned_alloc(alignment, next_buffer_size));
next_buffer_size *= growth_factor;
p = current_buf;
}
current_buf = (char *)current_buf + bytes;
available -= bytes;
return p;
}
MonotonicBuffer::~MonotonicBuffer() {
for (auto buf : buffers) {
free(buf);
}
}

@ -1,6 +1,7 @@
#pragma once
#include <atomic>
#include <deque>
#include <functional>
#include <string>
@ -20,6 +21,21 @@ void logMessage(ReplyMsgType type, const char* fmt, ...);
#define rWarning(fmt, ...) ::logMessage(ReplyMsgType::Warning, fmt, ## __VA_ARGS__)
#define rError(fmt, ...) ::logMessage(ReplyMsgType::Critical , fmt, ## __VA_ARGS__)
class MonotonicBuffer {
public:
MonotonicBuffer(size_t initial_size) : next_buffer_size(initial_size) {}
~MonotonicBuffer();
void *allocate(size_t bytes, size_t alignment = 16ul);
void deallocate(void *p) {}
private:
void *current_buf = nullptr;
size_t next_buffer_size = 0;
size_t available = 0;
std::deque<void *> buffers;
static constexpr float growth_factor = 1.5;
};
std::string sha256(const std::string &str);
void precise_nano_sleep(int64_t nanoseconds);
std::string decompressBZ2(const std::string &in, std::atomic<bool> *abort = nullptr);

Loading…
Cancel
Save