diff --git a/tools/cabana/chartswidget.cc b/tools/cabana/chartswidget.cc index 184d41ca0b..61afb871ae 100644 --- a/tools/cabana/chartswidget.cc +++ b/tools/cabana/chartswidget.cc @@ -108,14 +108,14 @@ ChartsWidget::ChartsWidget(QWidget *parent) : QFrame(parent) { void ChartsWidget::eventsMerged() { { - assert(!can->liveStreaming()); QFutureSynchronizer future_synchronizer; - const auto events = can->events(); for (auto c : charts) { - future_synchronizer.addFuture(QtConcurrent::run(c, &ChartView::updateSeries, nullptr, events, true)); + future_synchronizer.addFuture(QtConcurrent::run(c, &ChartView::updateSeries, nullptr)); } } - updateState(); + if (can->isPaused()) { + updateState(); + } } void ChartsWidget::zoomIn(double min, double max) { @@ -133,20 +133,13 @@ void ChartsWidget::zoomReset() { void ChartsWidget::updateState() { if (charts.isEmpty()) return; - const auto events = can->events(); - if (can->liveStreaming()) { - // appends incoming events to the end of series - for (auto c : charts) { - c->updateSeries(nullptr, events, false); - } - } - const double cur_sec = can->currentSec(); if (!is_zoomed) { double pos = (cur_sec - display_range.first) / std::max(1.0, (display_range.second - display_range.first)); if (pos < 0 || pos > 0.8) { display_range.first = std::max(0.0, cur_sec - max_chart_range * 0.1); } + auto events = can->events(); double max_event_sec = events->empty() ? 0 : (events->back()->mono_time / 1e9 - can->routeStartTime()); double max_sec = std::min(std::floor(display_range.first + max_chart_range), max_event_sec); display_range.first = std::max(0.0, max_sec - max_chart_range); @@ -502,11 +495,11 @@ void ChartView::updateSeriesPoints() { } } -void ChartView::updateSeries(const cabana::Signal *sig, const std::vector *events, bool clear) { - events = events ? events : can->events(); +void ChartView::updateSeries(const cabana::Signal *sig) { + const auto events = can->events(); for (auto &s : sigs) { if (!sig || s.sig == sig) { - if (clear) { + if (!can->liveStreaming()) { s.vals.clear(); s.step_vals.clear(); s.vals.reserve(settings.max_cached_minutes * 60 * 100); // [n]seconds * 100hz diff --git a/tools/cabana/chartswidget.h b/tools/cabana/chartswidget.h index bdc924ff83..c7470a6f12 100644 --- a/tools/cabana/chartswidget.h +++ b/tools/cabana/chartswidget.h @@ -32,7 +32,7 @@ public: ChartView(QWidget *parent = nullptr); void addSeries(const MessageId &msg_id, const cabana::Signal *sig); bool hasSeries(const MessageId &msg_id, const cabana::Signal *sig) const; - void updateSeries(const cabana::Signal *sig = nullptr, const std::vector *events = nullptr, bool clear = true); + void updateSeries(const cabana::Signal *sig = nullptr); void updatePlot(double cur, double min, double max); void setSeriesType(SeriesType type); void updatePlotArea(int left); diff --git a/tools/cabana/streams/abstractstream.h b/tools/cabana/streams/abstractstream.h index de7c394303..d1cf9d8037 100644 --- a/tools/cabana/streams/abstractstream.h +++ b/tools/cabana/streams/abstractstream.h @@ -59,7 +59,7 @@ public: QSet sources; protected: - void process(QHash *); + virtual void process(QHash *); bool updateEvent(const Event *event); void updateLastMsgsTo(double sec); diff --git a/tools/cabana/streams/livestream.cc b/tools/cabana/streams/livestream.cc index 592d883c4c..256cfc5a3e 100644 --- a/tools/cabana/streams/livestream.cc +++ b/tools/cabana/streams/livestream.cc @@ -1,10 +1,8 @@ #include "tools/cabana/streams/livestream.h" -LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) { - timer = new QTimer(this); - timer->callOnTimeout(this, &LiveStream::removeExpiredEvents); - timer->start(3 * 1000); +#include +LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) { stream_thread = new QThread(this); QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); }); QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater); @@ -15,8 +13,6 @@ LiveStream::~LiveStream() { stream_thread->requestInterruption(); stream_thread->quit(); stream_thread->wait(); - for (Event *e : can_events) ::delete e; - for (auto m : messages) delete m; } void LiveStream::streamThread() { @@ -35,11 +31,8 @@ void LiveStream::streamThread() { QThread::msleep(50); continue; } - AlignedBuffer *buf = messages.emplace_back(new AlignedBuffer()); - Event *evt = ::new Event(buf->align(msg)); - delete msg; - - handleEvent(evt); + std::lock_guard lk(lock); + handleEvent(messages.emplace_back(msg).event); // TODO: write stream to log file to replay it with cabana --data_dir flag. } } @@ -53,11 +46,10 @@ void LiveStream::handleEvent(Event *evt) { emit streamStarted(); } - std::lock_guard lk(lock); - can_events.push_back(evt); + received.push_back(evt); if (!pause_) { if (speed_ < 1 && last_update_ts > 0) { - auto it = std::upper_bound(can_events.cbegin(), can_events.cend(), current_ts, [](uint64_t ts, auto &e) { + auto it = std::upper_bound(received.cbegin(), received.cend(), current_ts, [](uint64_t ts, auto &e) { return ts < e->mono_time; }); if (it != can_events.cend()) { @@ -73,27 +65,20 @@ void LiveStream::handleEvent(Event *evt) { } } -void LiveStream::removeExpiredEvents() { - std::lock_guard lk(lock); - if (can_events.size() > 0) { - const uint64_t max_ns = settings.max_cached_minutes * 60 * 1e9; - const uint64_t last_ns = can_events.back()->mono_time; - while (!can_events.empty() && (last_ns - can_events.front()->mono_time) > max_ns) { - ::delete can_events.front(); - delete messages.front(); - can_events.pop_front(); - messages.pop_front(); +void LiveStream::process(QHash *last_messages) { + { + std::lock_guard lk(lock); + uint64_t last_ts = can_events.empty() ? 0 : can_events.back()->mono_time; + auto first = std::upper_bound(received.cbegin(), received.cend(), last_ts, [](uint64_t ts, auto &e) { + return ts < e->mono_time; + }); + can_events.insert(can_events.end(), first, received.cend()); + if (speed_ == 1) { + received.clear(); } } -} - -const std::vector *LiveStream::events() const { - std::lock_guard lk(lock); - if (events_vector.capacity() <= can_events.size()) { - events_vector.reserve(can_events.size() * 2); - } - events_vector.assign(can_events.begin(), can_events.end()); - return &events_vector; + emit eventsMerged(); + AbstractStream::process(last_messages); } void LiveStream::pause(bool pause) { diff --git a/tools/cabana/streams/livestream.h b/tools/cabana/streams/livestream.h index 43f67ae95f..657d7e407f 100644 --- a/tools/cabana/streams/livestream.h +++ b/tools/cabana/streams/livestream.h @@ -1,6 +1,5 @@ #pragma once -#include #include "tools/cabana/streams/abstractstream.h" class LiveStream : public AbstractStream { @@ -17,17 +16,29 @@ public: void setSpeed(float speed) override { speed_ = std::min(1.0, speed); } bool isPaused() const override { return pause_; } void pause(bool pause) override; - const std::vector *events() const override; + const std::vector *events() const override { return &can_events; } protected: + void process(QHash *) override; virtual void handleEvent(Event *evt); virtual void streamThread(); - virtual void removeExpiredEvents(); + + struct Msg { + Msg(Message *m) { + event = ::new Event(aligned_buf.align(m)); + delete m; + } + ~Msg() { + ::delete event; + } + Event *event; + AlignedBuffer aligned_buf; + }; mutable std::mutex lock; - mutable std::vector events_vector; - std::deque can_events; - std::deque messages; + std::vector can_events; + std::vector received; + std::deque messages; std::atomic start_ts = 0; std::atomic current_ts = 0; std::atomic speed_ = 1; @@ -36,5 +47,4 @@ protected: const QString zmq_address; QThread *stream_thread; - QTimer *timer; };