#include "tools/cabana/streams/abstractstream.h" AbstractStream *can = nullptr; AbstractStream::AbstractStream(QObject *parent, bool is_live_streaming) : is_live_streaming(is_live_streaming), QObject(parent) { can = this; new_msgs = std::make_unique>(); QObject::connect(this, &AbstractStream::received, this, &AbstractStream::process, Qt::QueuedConnection); QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo); } void AbstractStream::process(QHash *messages) { for (auto it = messages->begin(); it != messages->end(); ++it) { can_msgs[it.key()] = it.value(); } emit updated(); emit msgsReceived(messages); delete messages; processing = false; } bool AbstractStream::updateEvent(const Event *event) { static double prev_update_ts = 0; if (event->which == cereal::Event::Which::CAN) { double current_sec = event->mono_time / 1e9 - routeStartTime(); for (const auto &c : event->event.getCan()) { QString id = QString("%1:%2").arg(c.getSrc()).arg(c.getAddress(), 1, 16); CanData &data = (*new_msgs)[id]; data.ts = current_sec; data.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size()); data.count = ++counters[id]; data.freq = data.count / std::max(1.0, current_sec); change_trackers[id].compute(data.dat, data.ts, data.freq); data.colors = change_trackers[id].colors; data.last_change_t = change_trackers[id].last_change_t; } double ts = millis_since_boot(); if ((ts - prev_update_ts) > (1000.0 / settings.fps) && !processing && !new_msgs->isEmpty()) { // delay posting CAN message if UI thread is busy processing = true; prev_update_ts = ts; // use pointer to avoid data copy in queued connection. emit received(new_msgs.release()); new_msgs.reset(new QHash); new_msgs->reserve(100); } } return true; } const CanData &AbstractStream::lastMessage(const QString &id) { static CanData empty_data; auto it = can_msgs.find(id); return it != can_msgs.end() ? it.value() : empty_data; } void AbstractStream::updateLastMsgsTo(double sec) { QHash, CanData> last_msgs; // Much faster than QHash last_msgs.reserve(can_msgs.size()); double route_start_time = routeStartTime(); uint64_t last_ts = (sec + route_start_time) * 1e9; auto last = std::upper_bound(events()->rbegin(), events()->rend(), last_ts, [](uint64_t ts, auto &e) { return e->mono_time < ts; }); for (auto it = last; it != events()->rend(); ++it) { if ((*it)->which == cereal::Event::Which::CAN) { for (const auto &c : (*it)->event.getCan()) { auto &m = last_msgs[{c.getSrc(), c.getAddress()}]; if (++m.count == 1) { m.ts = ((*it)->mono_time / 1e9) - route_start_time; m.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size()); m.colors = QVector(m.dat.size(), QColor(0, 0, 0, 0)); m.last_change_t = QVector(m.dat.size(), m.ts); } else { m.freq = m.count / std::max(1.0, m.ts); } } } } // it is thread safe to update data here. // updateEvent will not be called before replayStream::seekedTo return. new_msgs->clear(); change_trackers.clear(); counters.clear(); can_msgs.clear(); for (auto it = last_msgs.cbegin(); it != last_msgs.cend(); ++it) { QString msg_id = QString("%1:%2").arg(it.key().first).arg(it.key().second, 1, 16); can_msgs[msg_id] = it.value(); counters[msg_id] = it.value().count; } emit updated(); emit msgsReceived(&can_msgs); }