diff --git a/tools/cabana/streams/livestream.cc b/tools/cabana/streams/livestream.cc index bc5b811f44..b0af9f3281 100644 --- a/tools/cabana/streams/livestream.cc +++ b/tools/cabana/streams/livestream.cc @@ -4,8 +4,11 @@ LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), if (!zmq_address.isEmpty()) { setenv("ZMQ", "1", 1); } - updateCachedNS(); - QObject::connect(&settings, &Settings::changed, this, &LiveStream::updateCachedNS); + + timer = new QTimer(this); + timer->callOnTimeout(this, &LiveStream::removeExpiredEvents); + timer->start(3 * 1000); + stream_thread = new QThread(this); QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); }); QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater); @@ -37,23 +40,17 @@ void LiveStream::streamThread() { AlignedBuffer *buf = messages.emplace_back(new AlignedBuffer()); Event *evt = ::new Event(buf->align(msg)); delete msg; - { std::lock_guard lk(lock); can_events.push_back(evt); - if ((evt->mono_time - can_events.front()->mono_time) > cache_ns) { - ::delete can_events.front(); - delete messages.front(); - can_events.pop_front(); - messages.pop_front(); - } } + if (start_ts == 0) { start_ts = evt->mono_time; emit streamStarted(); } current_ts = evt->mono_time; - if (start_ts > current_ts) { + if (current_ts < start_ts) { qDebug() << "stream is looping back to old time stamp"; start_ts = current_ts.load(); } @@ -62,9 +59,23 @@ void LiveStream::streamThread() { } } -const std::vector *LiveStream::events() const { +void LiveStream::removeExpiredEvents() { std::lock_guard lk(lock); + if (can_events.size() > 0) { + const uint64_t max_ns = settings.max_cached_minutes * 60 * 1e9; + const uint64_t last_ns = can_events.back()->mono_time; + while (!can_events.empty() && (last_ns - can_events.front()->mono_time) > max_ns) { + ::delete can_events.front(); + delete messages.front(); + can_events.pop_front(); + messages.pop_front(); + } + } +} + +const std::vector *LiveStream::events() const { events_vector.clear(); + std::lock_guard lk(lock); events_vector.reserve(can_events.size()); std::copy(can_events.begin(), can_events.end(), std::back_inserter(events_vector)); return &events_vector; diff --git a/tools/cabana/streams/livestream.h b/tools/cabana/streams/livestream.h index 95c6e3811c..db1638d761 100644 --- a/tools/cabana/streams/livestream.h +++ b/tools/cabana/streams/livestream.h @@ -1,5 +1,6 @@ #pragma once +#include #include "tools/cabana/streams/abstractstream.h" class LiveStream : public AbstractStream { @@ -17,7 +18,7 @@ public: protected: void streamThread(); - void updateCachedNS() { cache_ns = (settings.max_cached_minutes * 60) * 1e9; } + void removeExpiredEvents(); mutable std::mutex lock; mutable std::vector events_vector; @@ -25,7 +26,7 @@ protected: std::deque messages; std::atomic start_ts = 0; std::atomic current_ts = 0; - std::atomic cache_ns = 0; const QString zmq_address; QThread *stream_thread; + QTimer *timer; };