|
|
|
@ -4,8 +4,11 @@ LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), |
|
|
|
|
if (!zmq_address.isEmpty()) { |
|
|
|
|
setenv("ZMQ", "1", 1); |
|
|
|
|
} |
|
|
|
|
updateCachedNS(); |
|
|
|
|
QObject::connect(&settings, &Settings::changed, this, &LiveStream::updateCachedNS); |
|
|
|
|
|
|
|
|
|
timer = new QTimer(this); |
|
|
|
|
timer->callOnTimeout(this, &LiveStream::removeExpiredEvents); |
|
|
|
|
timer->start(3 * 1000); |
|
|
|
|
|
|
|
|
|
stream_thread = new QThread(this); |
|
|
|
|
QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); }); |
|
|
|
|
QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater); |
|
|
|
@ -37,23 +40,17 @@ void LiveStream::streamThread() { |
|
|
|
|
AlignedBuffer *buf = messages.emplace_back(new AlignedBuffer()); |
|
|
|
|
Event *evt = ::new Event(buf->align(msg)); |
|
|
|
|
delete msg; |
|
|
|
|
|
|
|
|
|
{ |
|
|
|
|
std::lock_guard lk(lock); |
|
|
|
|
can_events.push_back(evt); |
|
|
|
|
if ((evt->mono_time - can_events.front()->mono_time) > cache_ns) { |
|
|
|
|
::delete can_events.front(); |
|
|
|
|
delete messages.front(); |
|
|
|
|
can_events.pop_front(); |
|
|
|
|
messages.pop_front(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (start_ts == 0) { |
|
|
|
|
start_ts = evt->mono_time; |
|
|
|
|
emit streamStarted(); |
|
|
|
|
} |
|
|
|
|
current_ts = evt->mono_time; |
|
|
|
|
if (start_ts > current_ts) { |
|
|
|
|
if (current_ts < start_ts) { |
|
|
|
|
qDebug() << "stream is looping back to old time stamp"; |
|
|
|
|
start_ts = current_ts.load(); |
|
|
|
|
} |
|
|
|
@ -62,9 +59,23 @@ void LiveStream::streamThread() { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const std::vector<Event *> *LiveStream::events() const { |
|
|
|
|
void LiveStream::removeExpiredEvents() { |
|
|
|
|
std::lock_guard lk(lock); |
|
|
|
|
if (can_events.size() > 0) { |
|
|
|
|
const uint64_t max_ns = settings.max_cached_minutes * 60 * 1e9; |
|
|
|
|
const uint64_t last_ns = can_events.back()->mono_time; |
|
|
|
|
while (!can_events.empty() && (last_ns - can_events.front()->mono_time) > max_ns) { |
|
|
|
|
::delete can_events.front(); |
|
|
|
|
delete messages.front(); |
|
|
|
|
can_events.pop_front(); |
|
|
|
|
messages.pop_front(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const std::vector<Event *> *LiveStream::events() const { |
|
|
|
|
events_vector.clear(); |
|
|
|
|
std::lock_guard lk(lock); |
|
|
|
|
events_vector.reserve(can_events.size()); |
|
|
|
|
std::copy(can_events.begin(), can_events.end(), std::back_inserter(events_vector)); |
|
|
|
|
return &events_vector; |
|
|
|
|