dragonpilot - 基於 openpilot 的開源駕駛輔助系統
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

88 lines
2.7 KiB

#include "tools/cabana/streams/livestream.h"
#include <QTimer>
LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) {
stream_thread = new QThread(this);
QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); });
QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater);
QTimer::singleShot(0, [this]() { stream_thread->start(); });
}
LiveStream::~LiveStream() {
stream_thread->requestInterruption();
stream_thread->quit();
stream_thread->wait();
}
void LiveStream::streamThread() {
if (!zmq_address.isEmpty()) {
setenv("ZMQ", "1", 1);
}
std::unique_ptr<Context> context(Context::create());
std::string address = zmq_address.isEmpty() ? "127.0.0.1" : zmq_address.toStdString();
std::unique_ptr<SubSocket> sock(SubSocket::create(context.get(), "can", address));
assert(sock != NULL);
sock->setTimeout(50);
// run as fast as messages come in
while (!QThread::currentThread()->isInterruptionRequested()) {
Message *msg = sock->receive(true);
if (!msg) {
QThread::msleep(50);
continue;
}
std::lock_guard lk(lock);
handleEvent(messages.emplace_back(msg).event);
// TODO: write stream to log file to replay it with cabana --data_dir flag.
}
}
void LiveStream::handleEvent(Event *evt) {
if (start_ts == 0 || evt->mono_time < start_ts) {
if (evt->mono_time < start_ts) {
qDebug() << "stream is looping back to old time stamp";
}
start_ts = current_ts = evt->mono_time;
emit streamStarted();
}
received.push_back(evt);
if (!pause_) {
if (speed_ < 1 && last_update_ts > 0) {
auto it = std::upper_bound(received.cbegin(), received.cend(), current_ts, [](uint64_t ts, auto &e) {
return ts < e->mono_time;
});
if (it != can_events.cend()) {
bool skip = (nanos_since_boot() - last_update_ts) < ((*it)->mono_time - current_ts) / speed_;
if (skip) return;
evt = *it;
}
}
current_ts = evt->mono_time;
last_update_ts = nanos_since_boot();
updateEvent(evt);
}
}
void LiveStream::process(QHash<MessageId, CanData> *last_messages) {
{
std::lock_guard lk(lock);
uint64_t last_ts = can_events.empty() ? 0 : can_events.back()->mono_time;
auto first = std::upper_bound(received.cbegin(), received.cend(), last_ts, [](uint64_t ts, auto &e) {
return ts < e->mono_time;
});
can_events.insert(can_events.end(), first, received.cend());
if (speed_ == 1) {
received.clear();
}
}
emit eventsMerged();
AbstractStream::process(last_messages);
}
void LiveStream::pause(bool pause) {
pause_ = pause;
emit paused();
}