#include "tools/cabana/streams/livestream.h" #include LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) { stream_thread = new QThread(this); QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); }); QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater); QTimer::singleShot(0, [this]() { stream_thread->start(); }); } LiveStream::~LiveStream() { stream_thread->requestInterruption(); stream_thread->quit(); stream_thread->wait(); } void LiveStream::streamThread() { if (!zmq_address.isEmpty()) { setenv("ZMQ", "1", 1); } std::unique_ptr context(Context::create()); std::string address = zmq_address.isEmpty() ? "127.0.0.1" : zmq_address.toStdString(); std::unique_ptr sock(SubSocket::create(context.get(), "can", address)); assert(sock != NULL); sock->setTimeout(50); // run as fast as messages come in while (!QThread::currentThread()->isInterruptionRequested()) { Message *msg = sock->receive(true); if (!msg) { QThread::msleep(50); continue; } std::lock_guard lk(lock); handleEvent(messages.emplace_back(msg).event); // TODO: write stream to log file to replay it with cabana --data_dir flag. } } void LiveStream::handleEvent(Event *evt) { if (start_ts == 0 || evt->mono_time < start_ts) { if (evt->mono_time < start_ts) { qDebug() << "stream is looping back to old time stamp"; } start_ts = current_ts = evt->mono_time; emit streamStarted(); } received.push_back(evt); if (!pause_) { if (speed_ < 1 && last_update_ts > 0) { auto it = std::upper_bound(received.cbegin(), received.cend(), current_ts, [](uint64_t ts, auto &e) { return ts < e->mono_time; }); if (it != received.cend()) { bool skip = (nanos_since_boot() - last_update_ts) < ((*it)->mono_time - current_ts) / speed_; if (skip) return; evt = *it; } } current_ts = evt->mono_time; last_update_ts = nanos_since_boot(); updateEvent(evt); } } void LiveStream::process(QHash *last_messages) { { std::lock_guard lk(lock); auto first = std::upper_bound(received.cbegin(), received.cend(), last_event_ts, [](uint64_t ts, auto &e) { return ts < e->mono_time; }); mergeEvents(first, received.cend(), true); if (speed_ == 1) { received.clear(); messages.clear(); } } AbstractStream::process(last_messages); } void LiveStream::pause(bool pause) { pause_ = pause; emit paused(); }