#include "tools/cabana/streams/livestream.h" LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) { if (!zmq_address.isEmpty()) { setenv("ZMQ", "1", 1); } timer = new QTimer(this); timer->callOnTimeout(this, &LiveStream::removeExpiredEvents); timer->start(3 * 1000); stream_thread = new QThread(this); QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); }); QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater); stream_thread->start(); } LiveStream::~LiveStream() { stream_thread->requestInterruption(); stream_thread->quit(); stream_thread->wait(); for (Event *e : can_events) ::delete e; for (auto m : messages) delete m; } void LiveStream::streamThread() { std::unique_ptr context(Context::create()); std::string address = zmq_address.isEmpty() ? "127.0.0.1" : zmq_address.toStdString(); std::unique_ptr sock(SubSocket::create(context.get(), "can", address)); assert(sock != NULL); sock->setTimeout(50); // run as fast as messages come in while (!QThread::currentThread()->isInterruptionRequested()) { Message *msg = sock->receive(true); if (!msg) { QThread::msleep(50); continue; } AlignedBuffer *buf = messages.emplace_back(new AlignedBuffer()); Event *evt = ::new Event(buf->align(msg)); delete msg; { std::lock_guard lk(lock); can_events.push_back(evt); } if (start_ts == 0) { start_ts = evt->mono_time; emit streamStarted(); } current_ts = evt->mono_time; if (current_ts < start_ts) { qDebug() << "stream is looping back to old time stamp"; start_ts = current_ts.load(); } updateEvent(evt); // TODO: write stream to log file to replay it with cabana --data_dir flag. } } void LiveStream::removeExpiredEvents() { std::lock_guard lk(lock); if (can_events.size() > 0) { const uint64_t max_ns = settings.max_cached_minutes * 60 * 1e9; const uint64_t last_ns = can_events.back()->mono_time; while (!can_events.empty() && (last_ns - can_events.front()->mono_time) > max_ns) { ::delete can_events.front(); delete messages.front(); can_events.pop_front(); messages.pop_front(); } } } const std::vector *LiveStream::events() const { events_vector.clear(); std::lock_guard lk(lock); events_vector.reserve(can_events.size()); std::copy(can_events.begin(), can_events.end(), std::back_inserter(events_vector)); return &events_vector; }