cabana: eliminate deep copy of events in live stream mode (#27588)

old-commit-hash: f00c108acb
beeps
Dean Lee 2 years ago committed by GitHub
parent 9512d91684
commit ee692c5bec
  1. 23
      tools/cabana/chartswidget.cc
  2. 2
      tools/cabana/chartswidget.h
  3. 2
      tools/cabana/streams/abstractstream.h
  4. 51
      tools/cabana/streams/livestream.cc
  5. 24
      tools/cabana/streams/livestream.h

@ -108,14 +108,14 @@ ChartsWidget::ChartsWidget(QWidget *parent) : QFrame(parent) {
void ChartsWidget::eventsMerged() { void ChartsWidget::eventsMerged() {
{ {
assert(!can->liveStreaming());
QFutureSynchronizer<void> future_synchronizer; QFutureSynchronizer<void> future_synchronizer;
const auto events = can->events();
for (auto c : charts) { for (auto c : charts) {
future_synchronizer.addFuture(QtConcurrent::run(c, &ChartView::updateSeries, nullptr, events, true)); future_synchronizer.addFuture(QtConcurrent::run(c, &ChartView::updateSeries, nullptr));
} }
} }
updateState(); if (can->isPaused()) {
updateState();
}
} }
void ChartsWidget::zoomIn(double min, double max) { void ChartsWidget::zoomIn(double min, double max) {
@ -133,20 +133,13 @@ void ChartsWidget::zoomReset() {
void ChartsWidget::updateState() { void ChartsWidget::updateState() {
if (charts.isEmpty()) return; if (charts.isEmpty()) return;
const auto events = can->events();
if (can->liveStreaming()) {
// appends incoming events to the end of series
for (auto c : charts) {
c->updateSeries(nullptr, events, false);
}
}
const double cur_sec = can->currentSec(); const double cur_sec = can->currentSec();
if (!is_zoomed) { if (!is_zoomed) {
double pos = (cur_sec - display_range.first) / std::max(1.0, (display_range.second - display_range.first)); double pos = (cur_sec - display_range.first) / std::max(1.0, (display_range.second - display_range.first));
if (pos < 0 || pos > 0.8) { if (pos < 0 || pos > 0.8) {
display_range.first = std::max(0.0, cur_sec - max_chart_range * 0.1); display_range.first = std::max(0.0, cur_sec - max_chart_range * 0.1);
} }
auto events = can->events();
double max_event_sec = events->empty() ? 0 : (events->back()->mono_time / 1e9 - can->routeStartTime()); double max_event_sec = events->empty() ? 0 : (events->back()->mono_time / 1e9 - can->routeStartTime());
double max_sec = std::min(std::floor(display_range.first + max_chart_range), max_event_sec); double max_sec = std::min(std::floor(display_range.first + max_chart_range), max_event_sec);
display_range.first = std::max(0.0, max_sec - max_chart_range); display_range.first = std::max(0.0, max_sec - max_chart_range);
@ -502,11 +495,11 @@ void ChartView::updateSeriesPoints() {
} }
} }
void ChartView::updateSeries(const cabana::Signal *sig, const std::vector<Event *> *events, bool clear) { void ChartView::updateSeries(const cabana::Signal *sig) {
events = events ? events : can->events(); const auto events = can->events();
for (auto &s : sigs) { for (auto &s : sigs) {
if (!sig || s.sig == sig) { if (!sig || s.sig == sig) {
if (clear) { if (!can->liveStreaming()) {
s.vals.clear(); s.vals.clear();
s.step_vals.clear(); s.step_vals.clear();
s.vals.reserve(settings.max_cached_minutes * 60 * 100); // [n]seconds * 100hz s.vals.reserve(settings.max_cached_minutes * 60 * 100); // [n]seconds * 100hz

@ -32,7 +32,7 @@ public:
ChartView(QWidget *parent = nullptr); ChartView(QWidget *parent = nullptr);
void addSeries(const MessageId &msg_id, const cabana::Signal *sig); void addSeries(const MessageId &msg_id, const cabana::Signal *sig);
bool hasSeries(const MessageId &msg_id, const cabana::Signal *sig) const; bool hasSeries(const MessageId &msg_id, const cabana::Signal *sig) const;
void updateSeries(const cabana::Signal *sig = nullptr, const std::vector<Event*> *events = nullptr, bool clear = true); void updateSeries(const cabana::Signal *sig = nullptr);
void updatePlot(double cur, double min, double max); void updatePlot(double cur, double min, double max);
void setSeriesType(SeriesType type); void setSeriesType(SeriesType type);
void updatePlotArea(int left); void updatePlotArea(int left);

@ -59,7 +59,7 @@ public:
QSet<uint8_t> sources; QSet<uint8_t> sources;
protected: protected:
void process(QHash<MessageId, CanData> *); virtual void process(QHash<MessageId, CanData> *);
bool updateEvent(const Event *event); bool updateEvent(const Event *event);
void updateLastMsgsTo(double sec); void updateLastMsgsTo(double sec);

@ -1,10 +1,8 @@
#include "tools/cabana/streams/livestream.h" #include "tools/cabana/streams/livestream.h"
LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) { #include <QTimer>
timer = new QTimer(this);
timer->callOnTimeout(this, &LiveStream::removeExpiredEvents);
timer->start(3 * 1000);
LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) {
stream_thread = new QThread(this); stream_thread = new QThread(this);
QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); }); QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); });
QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater); QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater);
@ -15,8 +13,6 @@ LiveStream::~LiveStream() {
stream_thread->requestInterruption(); stream_thread->requestInterruption();
stream_thread->quit(); stream_thread->quit();
stream_thread->wait(); stream_thread->wait();
for (Event *e : can_events) ::delete e;
for (auto m : messages) delete m;
} }
void LiveStream::streamThread() { void LiveStream::streamThread() {
@ -35,11 +31,8 @@ void LiveStream::streamThread() {
QThread::msleep(50); QThread::msleep(50);
continue; continue;
} }
AlignedBuffer *buf = messages.emplace_back(new AlignedBuffer()); std::lock_guard lk(lock);
Event *evt = ::new Event(buf->align(msg)); handleEvent(messages.emplace_back(msg).event);
delete msg;
handleEvent(evt);
// TODO: write stream to log file to replay it with cabana --data_dir flag. // TODO: write stream to log file to replay it with cabana --data_dir flag.
} }
} }
@ -53,11 +46,10 @@ void LiveStream::handleEvent(Event *evt) {
emit streamStarted(); emit streamStarted();
} }
std::lock_guard lk(lock); received.push_back(evt);
can_events.push_back(evt);
if (!pause_) { if (!pause_) {
if (speed_ < 1 && last_update_ts > 0) { if (speed_ < 1 && last_update_ts > 0) {
auto it = std::upper_bound(can_events.cbegin(), can_events.cend(), current_ts, [](uint64_t ts, auto &e) { auto it = std::upper_bound(received.cbegin(), received.cend(), current_ts, [](uint64_t ts, auto &e) {
return ts < e->mono_time; return ts < e->mono_time;
}); });
if (it != can_events.cend()) { if (it != can_events.cend()) {
@ -73,27 +65,20 @@ void LiveStream::handleEvent(Event *evt) {
} }
} }
void LiveStream::removeExpiredEvents() { void LiveStream::process(QHash<MessageId, CanData> *last_messages) {
std::lock_guard lk(lock); {
if (can_events.size() > 0) { std::lock_guard lk(lock);
const uint64_t max_ns = settings.max_cached_minutes * 60 * 1e9; uint64_t last_ts = can_events.empty() ? 0 : can_events.back()->mono_time;
const uint64_t last_ns = can_events.back()->mono_time; auto first = std::upper_bound(received.cbegin(), received.cend(), last_ts, [](uint64_t ts, auto &e) {
while (!can_events.empty() && (last_ns - can_events.front()->mono_time) > max_ns) { return ts < e->mono_time;
::delete can_events.front(); });
delete messages.front(); can_events.insert(can_events.end(), first, received.cend());
can_events.pop_front(); if (speed_ == 1) {
messages.pop_front(); received.clear();
} }
} }
} emit eventsMerged();
AbstractStream::process(last_messages);
const std::vector<Event *> *LiveStream::events() const {
std::lock_guard lk(lock);
if (events_vector.capacity() <= can_events.size()) {
events_vector.reserve(can_events.size() * 2);
}
events_vector.assign(can_events.begin(), can_events.end());
return &events_vector;
} }
void LiveStream::pause(bool pause) { void LiveStream::pause(bool pause) {

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <QTimer>
#include "tools/cabana/streams/abstractstream.h" #include "tools/cabana/streams/abstractstream.h"
class LiveStream : public AbstractStream { class LiveStream : public AbstractStream {
@ -17,17 +16,29 @@ public:
void setSpeed(float speed) override { speed_ = std::min<float>(1.0, speed); } void setSpeed(float speed) override { speed_ = std::min<float>(1.0, speed); }
bool isPaused() const override { return pause_; } bool isPaused() const override { return pause_; }
void pause(bool pause) override; void pause(bool pause) override;
const std::vector<Event *> *events() const override; const std::vector<Event *> *events() const override { return &can_events; }
protected: protected:
void process(QHash<MessageId, CanData> *) override;
virtual void handleEvent(Event *evt); virtual void handleEvent(Event *evt);
virtual void streamThread(); virtual void streamThread();
virtual void removeExpiredEvents();
struct Msg {
Msg(Message *m) {
event = ::new Event(aligned_buf.align(m));
delete m;
}
~Msg() {
::delete event;
}
Event *event;
AlignedBuffer aligned_buf;
};
mutable std::mutex lock; mutable std::mutex lock;
mutable std::vector<Event *> events_vector; std::vector<Event *> can_events;
std::deque<Event *> can_events; std::vector<Event *> received;
std::deque<AlignedBuffer *> messages; std::deque<Msg> messages;
std::atomic<uint64_t> start_ts = 0; std::atomic<uint64_t> start_ts = 0;
std::atomic<uint64_t> current_ts = 0; std::atomic<uint64_t> current_ts = 0;
std::atomic<float> speed_ = 1; std::atomic<float> speed_ = 1;
@ -36,5 +47,4 @@ protected:
const QString zmq_address; const QString zmq_address;
QThread *stream_thread; QThread *stream_thread;
QTimer *timer;
}; };

Loading…
Cancel
Save