diff --git a/tools/cabana/streams/abstractstream.cc b/tools/cabana/streams/abstractstream.cc index 3e631d3709..6af831a1d0 100644 --- a/tools/cabana/streams/abstractstream.cc +++ b/tools/cabana/streams/abstractstream.cc @@ -1,5 +1,7 @@ #include "tools/cabana/streams/abstractstream.h" +#include + AbstractStream *can = nullptr; AbstractStream::AbstractStream(QObject *parent, bool is_live_streaming) : is_live_streaming(is_live_streaming), QObject(parent) { @@ -59,39 +61,59 @@ const CanData &AbstractStream::lastMessage(const MessageId &id) { return it != can_msgs.end() ? it.value() : empty_data; } -void AbstractStream::updateLastMsgsTo(double sec) { - QHash last_msgs; - last_msgs.reserve(can_msgs.size()); - double route_start_time = routeStartTime(); - uint64_t last_ts = (sec + route_start_time) * 1e9; - auto evs = events(); - auto last = std::upper_bound(evs->rbegin(), evs->rend(), last_ts, [](uint64_t ts, auto &e) { return e->mono_time < ts; }); - for (auto it = last; it != evs->rend(); ++it) { +static QHash parseEvents(std::vector::const_reverse_iterator first, + std::vector::const_reverse_iterator last, double route_start_time) { + QHash msgs; + msgs.reserve(500); + for (auto it = first; it != last; ++it) { if ((*it)->which == cereal::Event::Which::CAN) { for (const auto &c : (*it)->event.getCan()) { - auto &m = last_msgs[{.source = c.getSrc(), .address = c.getAddress()}]; + auto &m = msgs[{.source = c.getSrc(), .address = c.getAddress()}]; if (++m.count == 1) { m.ts = ((*it)->mono_time / 1e9) - route_start_time; m.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size()); m.colors = QVector(m.dat.size(), QColor(0, 0, 0, 0)); m.last_change_t = QVector(m.dat.size(), m.ts); m.bit_change_counts.resize(m.dat.size()); - } else { - m.freq = m.count / std::max(1.0, m.ts); } } } } + return msgs; +}; + +// it is thread safe to update data in updateLastMsgsTo. +// updateEvent will not be called before replayStream::seekedTo return. +void AbstractStream::updateLastMsgsTo(double sec) { + uint64_t ts = (sec + routeStartTime()) * 1e9; + const uint64_t delta = std::max(std::ceil(sec / std::thread::hardware_concurrency()), 30.0) * 1e9; + const auto evs = events(); + auto first = std::upper_bound(evs->crbegin(), evs->crend(), ts, [](uint64_t ts, auto &e) { return ts > e->mono_time; }); + QFutureSynchronizer> synchronizer; + while(first != evs->crend()) { + ts = (*first)->mono_time > delta ? (*first)->mono_time - delta : 0; + auto last = std::lower_bound(first, evs->crend(), ts, [](auto &e, uint64_t ts) { return e->mono_time > ts; }); + synchronizer.addFuture(QtConcurrent::run(parseEvents, first, last, routeStartTime())); + first = last; + } + synchronizer.waitForFinished(); - // it is thread safe to update data here. - // updateEvent will not be called before replayStream::seekedTo return. new_msgs->clear(); change_trackers.clear(); - counters.clear(); can_msgs.clear(); - for (auto it = last_msgs.cbegin(); it != last_msgs.cend(); ++it) { - can_msgs[it.key()] = it.value(); - counters[it.key()] = it.value().count; + counters.clear(); + for (const auto &f : synchronizer.futures()) { + auto msgs = f.result(); + for (auto it = msgs.cbegin(); it != msgs.cend(); ++it) { + counters[it.key()] += it.value().count; + auto m = can_msgs.find(it.key()); + if (m == can_msgs.end()) { + m = can_msgs.insert(it.key(), it.value()); + } else { + m.value().count += it.value().count; + } + m.value().freq = m.value().count / std::max(1.0, m.value().ts); + } } emit updated(); emit msgsReceived(&can_msgs);