cabana: use a monotonic buffer to allocate CanEvent (#29652)

* use a monotonic buffer to allocate CanEvent

* set the next buffer size to 6MB

* static

* cleanup

* use const iter

* rename to insert_pos
pull/29660/head
Dean Lee 2 years ago committed by GitHub
parent 0767a6dee5
commit 57ad4f02f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      tools/cabana/streams/abstractstream.cc
  2. 2
      tools/cabana/streams/abstractstream.h
  3. 26
      tools/cabana/util.cc
  4. 16
      tools/cabana/util.h

@ -5,6 +5,8 @@
#include <QTimer>
static const int EVENT_NEXT_BUFFER_SIZE = 6 * 1024 * 1024; // 6MB
AbstractStream *can = nullptr;
StreamNotifier *StreamNotifier::instance() {
@ -12,8 +14,11 @@ StreamNotifier *StreamNotifier::instance() {
return &notifier;
}
AbstractStream::AbstractStream(QObject *parent) : new_msgs(new QHash<MessageId, CanData>()), QObject(parent) {
AbstractStream::AbstractStream(QObject *parent) : QObject(parent) {
assert(parent != nullptr);
new_msgs = std::make_unique<QHash<MessageId, CanData>>();
event_buffer = std::make_unique<MonotonicBuffer>(EVENT_NEXT_BUFFER_SIZE);
QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo);
QObject::connect(&settings, &Settings::changed, this, &AbstractStream::updateMasks);
QObject::connect(dbc(), &DBCManager::DBCFileChanged, this, &AbstractStream::updateMasks);
@ -129,37 +134,25 @@ void AbstractStream::updateLastMsgsTo(double sec) {
}
void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last) {
size_t memory_size = 0;
size_t events_cnt = 0;
for (auto it = first; it != last; ++it) {
if ((*it)->which == cereal::Event::Which::CAN) {
for (const auto &c : (*it)->event.getCan()) {
memory_size += sizeof(CanEvent) + sizeof(uint8_t) * c.getDat().size();
++events_cnt;
}
}
}
if (memory_size == 0) return;
static std::unordered_map<MessageId, std::deque<const CanEvent *>> new_events_map;
static std::vector<const CanEvent *> new_events;
new_events_map.clear();
new_events.clear();
char *ptr = memory_blocks.emplace_back(new char[memory_size]).get();
std::unordered_map<MessageId, std::deque<const CanEvent *>> new_events_map;
std::vector<const CanEvent *> new_events;
new_events.reserve(events_cnt);
for (auto it = first; it != last; ++it) {
if ((*it)->which == cereal::Event::Which::CAN) {
uint64_t ts = (*it)->mono_time;
for (const auto &c : (*it)->event.getCan()) {
CanEvent *e = (CanEvent *)ptr;
auto dat = c.getDat();
CanEvent *e = (CanEvent *)event_buffer->allocate(sizeof(CanEvent) + sizeof(uint8_t) * dat.size());
e->src = c.getSrc();
e->address = c.getAddress();
e->mono_time = ts;
auto dat = c.getDat();
e->size = dat.size();
memcpy(e->dat, (uint8_t *)dat.begin(), e->size);
new_events_map[{.source = e->src, .address = e->address}].push_back(e);
new_events.push_back(e);
ptr += sizeof(CanEvent) + sizeof(uint8_t) * e->size;
}
}
}
@ -168,17 +161,14 @@ void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std
return l->mono_time < r->mono_time;
};
bool append = new_events.front()->mono_time > lastest_event_ts;
for (auto &[id, new_e] : new_events_map) {
auto &e = events_[id];
auto pos = append ? e.end()
: std::upper_bound(e.cbegin(), e.cend(), new_e.front(), compare);
e.insert(pos, new_e.cbegin(), new_e.cend());
auto insert_pos = std::upper_bound(e.cbegin(), e.cend(), new_e.front(), compare);
e.insert(insert_pos, new_e.cbegin(), new_e.cend());
}
auto pos = append ? all_events_.end()
: std::upper_bound(all_events_.begin(), all_events_.end(), new_events.front(), compare);
all_events_.insert(pos, new_events.cbegin(), new_events.cend());
auto insert_pos = std::upper_bound(all_events_.cbegin(), all_events_.cend(), new_events.front(), compare);
all_events_.insert(insert_pos, new_events.cbegin(), new_events.cend());
lastest_event_ts = all_events_.back()->mono_time;
emit eventsMerged();

@ -101,7 +101,7 @@ protected:
QHash<MessageId, CanData> all_msgs;
std::unordered_map<MessageId, std::vector<const CanEvent *>> events_;
std::vector<const CanEvent *> all_events_;
std::deque<std::unique_ptr<char[]>> memory_blocks;
std::unique_ptr<MonotonicBuffer> event_buffer;
std::mutex mutex;
std::unordered_map<MessageId, std::vector<uint8_t>> masks;
};

@ -4,12 +4,13 @@
#include <array>
#include <csignal>
#include <limits>
#include <memory>
#include <string>
#include <sys/socket.h>
#include <unistd.h>
#include <QDebug>
#include <QColor>
#include <QDebug>
#include <QFontDatabase>
#include <QLocale>
#include <QPainter>
@ -264,3 +265,26 @@ QString signalToolTip(const cabana::Signal *sig) {
)").arg(sig->name).arg(sig->start_bit).arg(sig->size).arg(sig->msb).arg(sig->lsb)
.arg(sig->is_little_endian ? "Y" : "N").arg(sig->is_signed ? "Y" : "N");
}
// MonotonicBuffer
void *MonotonicBuffer::allocate(size_t bytes, size_t alignment) {
assert(bytes > 0);
void *p = std::align(alignment, bytes, current_buf, available);
if (p == nullptr) {
available = next_buffer_size = std::max(next_buffer_size, bytes);
current_buf = buffers.emplace_back(std::aligned_alloc(alignment, next_buffer_size));
next_buffer_size *= growth_factor;
p = current_buf;
}
current_buf = (char *)current_buf + bytes;
available -= bytes;
return p;
}
MonotonicBuffer::~MonotonicBuffer() {
for (auto buf : buffers) {
free(buf);
}
}

@ -1,6 +1,7 @@
#pragma once
#include <cmath>
#include <deque>
#include <vector>
#include <utility>
@ -153,5 +154,20 @@ private:
QSocketNotifier *sn;
};
class MonotonicBuffer {
public:
MonotonicBuffer(size_t initial_size) : next_buffer_size(initial_size) {}
~MonotonicBuffer();
void *allocate(size_t bytes, size_t alignment = 16ul);
void deallocate(void *p) {}
private:
void *current_buf = nullptr;
size_t next_buffer_size = 0;
size_t available = 0;
std::deque<void *> buffers;
static constexpr float growth_factor = 1.5;
};
int num_decimals(double num);
QString signalToolTip(const cabana::Signal *sig);

Loading…
Cancel
Save