diff --git a/tools/cabana/mainwin.cc b/tools/cabana/mainwin.cc index d2e4ff9ca7..e49148160d 100644 --- a/tools/cabana/mainwin.cc +++ b/tools/cabana/mainwin.cc @@ -126,15 +126,14 @@ void MainWindow::createDockWindows() { // splitter between video and charts video_splitter = new QSplitter(Qt::Vertical, this); - if (!can->liveStreaming()) { - video_widget = new VideoWidget(this); - video_splitter->addWidget(video_widget); - QObject::connect(charts_widget, &ChartsWidget::rangeChanged, video_widget, &VideoWidget::rangeChanged); - } + video_widget = new VideoWidget(this); + video_splitter->addWidget(video_widget); + QObject::connect(charts_widget, &ChartsWidget::rangeChanged, video_widget, &VideoWidget::rangeChanged); + video_splitter->addWidget(charts_container); video_splitter->setStretchFactor(1, 1); video_splitter->restoreState(settings.video_splitter_state); - if (!can->liveStreaming() && video_splitter->sizes()[0] == 0) { + if (can->liveStreaming() || video_splitter->sizes()[0] == 0) { // display video at minimum size. video_splitter->setSizes({1, 1}); } diff --git a/tools/cabana/streams/livestream.cc b/tools/cabana/streams/livestream.cc index b0af9f3281..3edd2a7758 100644 --- a/tools/cabana/streams/livestream.cc +++ b/tools/cabana/streams/livestream.cc @@ -1,10 +1,6 @@ #include "tools/cabana/streams/livestream.h" LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) { - if (!zmq_address.isEmpty()) { - setenv("ZMQ", "1", 1); - } - timer = new QTimer(this); timer->callOnTimeout(this, &LiveStream::removeExpiredEvents); timer->start(3 * 1000); @@ -24,12 +20,14 @@ LiveStream::~LiveStream() { } void LiveStream::streamThread() { + if (!zmq_address.isEmpty()) { + setenv("ZMQ", "1", 1); + } std::unique_ptr context(Context::create()); std::string address = zmq_address.isEmpty() ? "127.0.0.1" : zmq_address.toStdString(); std::unique_ptr sock(SubSocket::create(context.get(), "can", address)); assert(sock != NULL); sock->setTimeout(50); - // run as fast as messages come in while (!QThread::currentThread()->isInterruptionRequested()) { Message *msg = sock->receive(true); @@ -40,22 +38,40 @@ void LiveStream::streamThread() { AlignedBuffer *buf = messages.emplace_back(new AlignedBuffer()); Event *evt = ::new Event(buf->align(msg)); delete msg; - { - std::lock_guard lk(lock); - can_events.push_back(evt); - } - if (start_ts == 0) { - start_ts = evt->mono_time; - emit streamStarted(); - } - current_ts = evt->mono_time; + handleEvent(evt); + // TODO: write stream to log file to replay it with cabana --data_dir flag. + } +} + +void LiveStream::handleEvent(Event *evt) { + std::lock_guard lk(lock); + can_events.push_back(evt); + + current_ts = evt->mono_time; + if (start_ts == 0 || current_ts < start_ts) { if (current_ts < start_ts) { qDebug() << "stream is looping back to old time stamp"; - start_ts = current_ts.load(); + } + start_ts = current_ts.load(); + emit streamStarted(); + } + + if (!pause_) { + if (speed_ < 1 && last_update_ts > 0) { + auto it = std::upper_bound(can_events.begin(), can_events.end(), last_update_event_ts, [](uint64_t ts, auto &e) { + return ts < e->mono_time; + }); + + if (it != can_events.end() && + (nanos_since_boot() - last_update_ts) < ((*it)->mono_time - last_update_event_ts) / speed_) { + return; + } + evt = (*it); } updateEvent(evt); - // TODO: write stream to log file to replay it with cabana --data_dir flag. + last_update_event_ts = evt->mono_time; + last_update_ts = nanos_since_boot(); } } @@ -80,3 +96,8 @@ const std::vector *LiveStream::events() const { std::copy(can_events.begin(), can_events.end(), std::back_inserter(events_vector)); return &events_vector; } + +void LiveStream::pause(bool pause) { + pause_ = pause; + emit paused(); +} diff --git a/tools/cabana/streams/livestream.h b/tools/cabana/streams/livestream.h index db1638d761..ba20032b67 100644 --- a/tools/cabana/streams/livestream.h +++ b/tools/cabana/streams/livestream.h @@ -8,17 +8,21 @@ class LiveStream : public AbstractStream { public: LiveStream(QObject *parent, QString address = {}); - ~LiveStream(); + virtual ~LiveStream(); inline QString routeName() const override { return QString("Live Streaming From %1").arg(zmq_address.isEmpty() ? "127.0.0.1" : zmq_address); } inline double routeStartTime() const override { return start_ts / (double)1e9; } inline double currentSec() const override { return (current_ts - start_ts) / (double)1e9; } + void setSpeed(float speed) override { speed_ = std::min(1.0, speed); } + bool isPaused() const override { return pause_; } + void pause(bool pause) override; const std::vector *events() const override; protected: - void streamThread(); - void removeExpiredEvents(); + virtual void handleEvent(Event *evt); + virtual void streamThread(); + virtual void removeExpiredEvents(); mutable std::mutex lock; mutable std::vector events_vector; @@ -26,6 +30,11 @@ protected: std::deque messages; std::atomic start_ts = 0; std::atomic current_ts = 0; + std::atomic speed_ = 1; + std::atomic pause_ = false; + uint64_t last_update_event_ts = 0; + uint64_t last_update_ts = 0; + const QString zmq_address; QThread *stream_thread; QTimer *timer; diff --git a/tools/cabana/videowidget.cc b/tools/cabana/videowidget.cc index 729302adaa..445a21c755 100644 --- a/tools/cabana/videowidget.cc +++ b/tools/cabana/videowidget.cc @@ -27,22 +27,9 @@ VideoWidget::VideoWidget(QWidget *parent) : QWidget(parent) { main_layout->addWidget(frame); QVBoxLayout *frame_layout = new QVBoxLayout(frame); - cam_widget = new CameraWidget("camerad", can->visionStreamType(), false); - frame_layout->addWidget(cam_widget); - - // slider controls - slider_layout = new QHBoxLayout(); - time_label = new ElidedLabel("00:00"); - time_label->setToolTip(tr("Click to set current time")); - slider_layout->addWidget(time_label); - - slider = new Slider(this); - slider->setSingleStep(0); - slider_layout->addWidget(slider); - - end_time_label = new QLabel(this); - slider_layout->addWidget(end_time_label); - frame_layout->addLayout(slider_layout); + if (!can->liveStreaming()) { + frame_layout->addWidget(createCameraWidget()); + } // btn controls QHBoxLayout *control_layout = new QHBoxLayout(); @@ -52,6 +39,8 @@ VideoWidget::VideoWidget(QWidget *parent) : QWidget(parent) { QButtonGroup *group = new QButtonGroup(this); group->setExclusive(true); for (float speed : {0.1, 0.5, 1., 2.}) { + if (can->liveStreaming() && speed > 1) continue; + QPushButton *btn = new QPushButton(QString("%1x").arg(speed), this); btn->setCheckable(true); QObject::connect(btn, &QPushButton::clicked, [=]() { can->setSpeed(speed); }); @@ -61,6 +50,33 @@ VideoWidget::VideoWidget(QWidget *parent) : QWidget(parent) { } frame_layout->addLayout(control_layout); + QObject::connect(play_btn, &QPushButton::clicked, []() { can->pause(!can->isPaused()); }); + QObject::connect(can, &AbstractStream::paused, this, &VideoWidget::updatePlayBtnState); + QObject::connect(can, &AbstractStream::resume, this, &VideoWidget::updatePlayBtnState); + updatePlayBtnState(); +} + +QWidget *VideoWidget::createCameraWidget() { + QWidget *w = new QWidget(this); + QVBoxLayout *l = new QVBoxLayout(w); + l->setContentsMargins(0, 0, 0, 0); + cam_widget = new CameraWidget("camerad", can->visionStreamType(), false); + l->addWidget(cam_widget); + + // slider controls + slider_layout = new QHBoxLayout(); + time_label = new ElidedLabel("00:00"); + time_label->setToolTip(tr("Click to set current time")); + slider_layout->addWidget(time_label); + + slider = new Slider(this); + slider->setSingleStep(0); + slider_layout->addWidget(slider); + + end_time_label = new QLabel(this); + slider_layout->addWidget(end_time_label); + l->addLayout(slider_layout); + cam_widget->setMinimumHeight(100); cam_widget->setSizePolicy(QSizePolicy::Preferred, QSizePolicy::MinimumExpanding); setSizePolicy(QSizePolicy::Preferred, QSizePolicy::Maximum); @@ -69,15 +85,12 @@ VideoWidget::VideoWidget(QWidget *parent) : QWidget(parent) { QObject::connect(slider, &QSlider::sliderReleased, [this]() { can->seekTo(slider->value() / 1000.0); }); QObject::connect(slider, &QSlider::valueChanged, [=](int value) { time_label->setText(formatTime(value / 1000)); }); QObject::connect(cam_widget, &CameraWidget::clicked, []() { can->pause(!can->isPaused()); }); - QObject::connect(play_btn, &QPushButton::clicked, []() { can->pause(!can->isPaused()); }); QObject::connect(can, &AbstractStream::updated, this, &VideoWidget::updateState); - QObject::connect(can, &AbstractStream::paused, this, &VideoWidget::updatePlayBtnState); - QObject::connect(can, &AbstractStream::resume, this, &VideoWidget::updatePlayBtnState); QObject::connect(can, &AbstractStream::streamStarted, [this]() { end_time_label->setText(formatTime(can->totalSeconds())); slider->setRange(0, can->totalSeconds() * 1000); }); - updatePlayBtnState(); + return w; } void VideoWidget::timeLabelClicked() { @@ -101,6 +114,8 @@ void VideoWidget::timeLabelClicked() { } void VideoWidget::rangeChanged(double min, double max, bool is_zoomed) { + if (can->liveStreaming()) return; + if (!is_zoomed) { min = 0; max = can->totalSeconds(); diff --git a/tools/cabana/videowidget.h b/tools/cabana/videowidget.h index 392db23512..51197eedd6 100644 --- a/tools/cabana/videowidget.h +++ b/tools/cabana/videowidget.h @@ -49,6 +49,7 @@ protected: void updateState(); void updatePlayBtnState(); void timeLabelClicked(); + QWidget *createCameraWidget(); CameraWidget *cam_widget; QLabel *end_time_label;