From 43e4b84fecd7589a89a80f6599caacef4026e478 Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Fri, 3 Feb 2023 12:39:16 +0800 Subject: [PATCH] cabana: syc the last messages after seeking (#27149) * fix wrong last message time * update last messages after seekto * cleanup * remove src,address from CanData * merge master merge master old-commit-hash: 41304db1e661fa6593d777f0f17dfd60e6f909c8 --- tools/cabana/historylog.cc | 6 +-- tools/cabana/messageswidget.cc | 7 +-- tools/cabana/streams/abstractstream.cc | 64 +++++++++++++++++++------- tools/cabana/streams/abstractstream.h | 10 ++-- tools/cabana/streams/replaystream.cc | 6 --- tools/cabana/streams/replaystream.h | 2 +- tools/replay/replay.cc | 2 +- 7 files changed, 62 insertions(+), 35 deletions(-) diff --git a/tools/cabana/historylog.cc b/tools/cabana/historylog.cc index ab5c6cbbe0..95bd94e76b 100644 --- a/tools/cabana/historylog.cc +++ b/tools/cabana/historylog.cc @@ -79,7 +79,7 @@ void HistoryLogModel::setFilter(int sig_idx, const QString &value, std::function void HistoryLogModel::updateState() { if (!msg_id.isEmpty()) { - uint64_t current_time = (can->currentSec() + can->routeStartTime()) * 1e9; + uint64_t current_time = (can->lastMessage(msg_id).ts + can->routeStartTime()) * 1e9 + 1; auto new_msgs = dynamic_mode ? fetchData(current_time, last_fetch_time) : fetchData(0); if ((has_more_data = !new_msgs.empty())) { beginInsertRows({}, 0, new_msgs.size() - 1); @@ -109,7 +109,7 @@ std::deque HistoryLogModel::fetchData(InputIt first, I for (auto it = first; it != last && (*it)->mono_time > min_time; ++it) { if ((*it)->which == cereal::Event::Which::CAN) { for (const auto &c : (*it)->event.getCan()) { - if (src == c.getSrc() && address == c.getAddress()) { + if (address == c.getAddress() && src == c.getSrc()) { const auto dat = c.getDat(); for (int i = 0; i < sigs.size(); ++i) { values[i] = get_raw_value((uint8_t *)dat.begin(), dat.size(), *(sigs[i])); @@ -140,7 +140,7 @@ std::deque HistoryLogModel::fetchData(uint64_t from_ti if (dynamic_mode) { auto first = std::upper_bound(events->rbegin(), events->rend(), from_time, [=](uint64_t ts, auto &e) { return e->mono_time < ts; }); auto msgs = fetchData(first, events->rend(), min_time); - if (update_colors && min_time > 0) { + if (update_colors && (min_time > 0 || messages.empty())) { for (auto it = msgs.rbegin(); it != msgs.rend(); ++it) { hex_colors.compute(it->data, it->mono_time / (double)1e9, freq); it->colors = hex_colors.colors; diff --git a/tools/cabana/messageswidget.cc b/tools/cabana/messageswidget.cc index 64f56ae73f..9d0fc23e4d 100644 --- a/tools/cabana/messageswidget.cc +++ b/tools/cabana/messageswidget.cc @@ -106,11 +106,12 @@ QVariant MessageListModel::data(const QModelIndex &index, int role) const { } } else if (role == Qt::UserRole && index.column() == 4) { QList colors; + colors.reserve(can_data.dat.size()); for (int i = 0; i < can_data.dat.size(); i++){ if (suppressed_bytes.contains({id, i})) { colors.append(QColor(255, 255, 255, 0)); } else { - colors.append(can_data.colors[i]); + colors.append(i < can_data.colors.size() ? can_data.colors[i] : QColor(255, 255, 255, 0)); } } return colors; @@ -152,8 +153,8 @@ void MessageListModel::sortMessages() { }); } else if (sort_column == 1) { std::sort(msgs.begin(), msgs.end(), [this](auto &l, auto &r) { - auto ll = std::tuple{can->lastMessage(l).src, can->lastMessage(l).address, l}; - auto rr = std::tuple{can->lastMessage(r).src, can->lastMessage(r).address, r}; + auto ll = DBCManager::parseId(l); + auto rr = DBCManager::parseId(r); return sort_order == Qt::AscendingOrder ? ll < rr : ll > rr; }); } else if (sort_column == 2) { diff --git a/tools/cabana/streams/abstractstream.cc b/tools/cabana/streams/abstractstream.cc index 308e5556c6..8a77e0eb39 100644 --- a/tools/cabana/streams/abstractstream.cc +++ b/tools/cabana/streams/abstractstream.cc @@ -4,7 +4,9 @@ AbstractStream *can = nullptr; AbstractStream::AbstractStream(QObject *parent, bool is_live_streaming) : is_live_streaming(is_live_streaming), QObject(parent) { can = this; + new_msgs = std::make_unique>(); QObject::connect(this, &AbstractStream::received, this, &AbstractStream::process, Qt::QueuedConnection); + QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo); } void AbstractStream::process(QHash *messages) { @@ -18,30 +20,17 @@ void AbstractStream::process(QHash *messages) { } bool AbstractStream::updateEvent(const Event *event) { - static std::unique_ptr new_msgs = std::make_unique>(); - static QHash change_trackers; static double prev_update_ts = 0; if (event->which == cereal::Event::Which::CAN) { - double current_sec = currentSec(); - if (counters_begin_sec == 0 || counters_begin_sec >= current_sec) { - new_msgs->clear(); - counters.clear(); - counters_begin_sec = current_sec; - } - - auto can_events = event->event.getCan(); - for (const auto &c : can_events) { + double current_sec = event->mono_time / 1e9 - routeStartTime(); + for (const auto &c : event->event.getCan()) { QString id = QString("%1:%2").arg(c.getSrc()).arg(c.getAddress(), 1, 16); CanData &data = (*new_msgs)[id]; data.ts = current_sec; - data.src = c.getSrc(); - data.address = c.getAddress(); data.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size()); data.count = ++counters[id]; - if (double delta = (current_sec - counters_begin_sec); delta > 0) { - data.freq = data.count / delta; - } + data.freq = data.count / std::max(1.0, current_sec); change_trackers[id].compute(data.dat, data.ts, data.freq); data.colors = change_trackers[id].colors; data.last_change_t = change_trackers[id].last_change_t; @@ -60,3 +49,46 @@ bool AbstractStream::updateEvent(const Event *event) { } return true; } + +const CanData &AbstractStream::lastMessage(const QString &id) { + static CanData empty_data; + auto it = can_msgs.find(id); + return it != can_msgs.end() ? it.value() : empty_data; +} + +void AbstractStream::updateLastMsgsTo(double sec) { + QHash, CanData> last_msgs; // Much faster than QHash + last_msgs.reserve(can_msgs.size()); + double route_start_time = routeStartTime(); + uint64_t last_ts = (sec + route_start_time) * 1e9; + auto last = std::upper_bound(events()->rbegin(), events()->rend(), last_ts, [](uint64_t ts, auto &e) { return e->mono_time < ts; }); + for (auto it = last; it != events()->rend(); ++it) { + if ((*it)->which == cereal::Event::Which::CAN) { + for (const auto &c : (*it)->event.getCan()) { + auto &m = last_msgs[{c.getSrc(), c.getAddress()}]; + if (++m.count == 1) { + m.ts = ((*it)->mono_time / 1e9) - route_start_time; + m.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size()); + m.colors = QVector(m.dat.size(), QColor(0, 0, 0, 0)); + m.last_change_t = QVector(m.dat.size(), m.ts); + } else { + m.freq = m.count / std::max(1.0, m.ts); + } + } + } + } + + // it is thread safe to update data here. + // updateEvent will not be called before replayStream::seekedTo return. + new_msgs->clear(); + change_trackers.clear(); + counters.clear(); + can_msgs.clear(); + for (auto it = last_msgs.cbegin(); it != last_msgs.cend(); ++it) { + QString msg_id = QString("%1:%2").arg(it.key().first).arg(it.key().second, 1, 16); + can_msgs[msg_id] = it.value(); + counters[msg_id] = it.value().count; + } + emit updated(); + emit msgsReceived(&can_msgs); +} diff --git a/tools/cabana/streams/abstractstream.h b/tools/cabana/streams/abstractstream.h index e16e11a14b..0dbb3d96a6 100644 --- a/tools/cabana/streams/abstractstream.h +++ b/tools/cabana/streams/abstractstream.h @@ -11,8 +11,6 @@ struct CanData { double ts = 0.; - uint8_t src = 0; - uint32_t address = 0; uint32_t count = 0; uint32_t freq = 0; QByteArray dat; @@ -34,7 +32,7 @@ public: virtual double routeStartTime() const { return 0; } virtual double currentSec() const = 0; virtual QDateTime currentDateTime() const { return {}; } - virtual const CanData &lastMessage(const QString &id) { return can_msgs[id]; } + virtual const CanData &lastMessage(const QString &id); virtual VisionStreamType visionStreamType() const { return VISION_STREAM_ROAD; } virtual const Route *route() const { return nullptr; } virtual const std::vector *events() const = 0; @@ -54,16 +52,18 @@ signals: void received(QHash *); public: - QMap can_msgs; + QHash can_msgs; protected: void process(QHash *); bool updateEvent(const Event *event); + void updateLastMsgsTo(double sec); bool is_live_streaming = false; - std::atomic counters_begin_sec = 0; std::atomic processing = false; QHash counters; + std::unique_ptr> new_msgs; + QHash change_trackers; }; // A global pointer referring to the unique AbstractStream object diff --git a/tools/cabana/streams/replaystream.cc b/tools/cabana/streams/replaystream.cc index fd58f7a409..72c4a13048 100644 --- a/tools/cabana/streams/replaystream.cc +++ b/tools/cabana/streams/replaystream.cc @@ -42,12 +42,6 @@ bool ReplayStream::eventFilter(const Event *event) { return true; } -void ReplayStream::seekTo(double ts) { - replay->seekTo(std::max(double(0), ts), false); - counters_begin_sec = 0; - emit updated(); -} - void ReplayStream::pause(bool pause) { replay->pause(pause); emit(pause ? paused() : resume()); diff --git a/tools/cabana/streams/replaystream.h b/tools/cabana/streams/replaystream.h index 1688915212..a9a74e33b5 100644 --- a/tools/cabana/streams/replaystream.h +++ b/tools/cabana/streams/replaystream.h @@ -12,7 +12,7 @@ public: ~ReplayStream(); bool loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags = REPLAY_FLAG_NONE); bool eventFilter(const Event *event); - void seekTo(double ts) override; + void seekTo(double ts) override { replay->seekTo(std::max(double(0), ts), false); }; inline QString routeName() const override { return replay->route()->name(); } inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); } inline VisionStreamType visionStreamType() const override { return replay->hasFlag(REPLAY_FLAG_ECAM) ? VISION_STREAM_WIDE_ROAD : VISION_STREAM_ROAD; } diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index 5c90777bbc..178b116a87 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -114,9 +114,9 @@ void Replay::seekTo(double seconds, bool relative) { rInfo("seeking to %d s, segment %d", (int)seconds, seg); current_segment_ = seg; cur_mono_time_ = route_start_ts_ + seconds * 1e9; + emit seekedTo(seconds); return isSegmentMerged(seg); }); - emit seekedTo(seconds); queueSegment(); }