diff --git a/tools/cabana/canmessages.cc b/tools/cabana/canmessages.cc index 9959ba7313..ded8be5fa3 100644 --- a/tools/cabana/canmessages.cc +++ b/tools/cabana/canmessages.cc @@ -1,7 +1,4 @@ #include "tools/cabana/canmessages.h" - -#include - #include "tools/cabana/dbcmanager.h" CANMessages *can = nullptr; @@ -25,6 +22,7 @@ bool CANMessages::loadRoute(const QString &route, const QString &data_dir, uint3 replay = new Replay(route, {"can", "roadEncodeIdx", "wideRoadEncodeIdx", "carParams"}, {}, nullptr, replay_flags, data_dir, this); replay->setSegmentCacheLimit(settings.cached_segment_limit); replay->installEventFilter(event_filter, this); + QObject::connect(replay, &Replay::seekedTo, this, &CANMessages::seekedTo); QObject::connect(replay, &Replay::segmentsMerged, this, &CANMessages::eventsMerged); QObject::connect(replay, &Replay::streamStarted, this, &CANMessages::streamStarted); if (replay->load()) { @@ -69,17 +67,13 @@ void CANMessages::process(QHash *messages) { } bool CANMessages::eventFilter(const Event *event) { - static std::unique_ptr> new_msgs; + static std::unique_ptr new_msgs = std::make_unique>(); static double prev_update_ts = 0; if (event->which == cereal::Event::Which::CAN) { - if (!new_msgs) { - new_msgs.reset(new QHash); - new_msgs->reserve(1000); - } - double current_sec = replay->currentSeconds(); if (counters_begin_sec == 0 || counters_begin_sec >= current_sec) { + new_msgs->clear(); counters.clear(); counters_begin_sec = current_sec; } @@ -87,40 +81,29 @@ bool CANMessages::eventFilter(const Event *event) { auto can_events = event->event.getCan(); for (const auto &c : can_events) { QString id = QString("%1:%2").arg(c.getSrc()).arg(c.getAddress(), 1, 16); - - std::lock_guard lk(lock); - auto &list = received_msgs[id]; - while (list.size() > settings.can_msg_log_size) { - list.pop_back(); - } - CanData &data = list.emplace_front(); + CanData &data = (*new_msgs)[id]; data.ts = current_sec; - data.dat.append((char *)c.getDat().begin(), c.getDat().size()); - + data.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size()); data.count = ++counters[id]; if (double delta = (current_sec - counters_begin_sec); delta > 0) { data.freq = data.count / delta; } - (*new_msgs)[id] = data; } double ts = millis_since_boot(); - if ((ts - prev_update_ts) > (1000.0 / settings.fps) && !processing) { + if ((ts - prev_update_ts) > (1000.0 / settings.fps) && !processing && !new_msgs->isEmpty()) { // delay posting CAN message if UI thread is busy processing = true; prev_update_ts = ts; // use pointer to avoid data copy in queued connection. emit received(new_msgs.release()); + new_msgs.reset(new QHash); + new_msgs->reserve(100); } } return true; } -const std::deque CANMessages::messages(const QString &id) { - std::lock_guard lk(lock); - return received_msgs[id]; -} - void CANMessages::seekTo(double ts) { replay->seekTo(std::max(double(0), ts), false); counters_begin_sec = 0; diff --git a/tools/cabana/canmessages.h b/tools/cabana/canmessages.h index f9103aa96b..47f6008686 100644 --- a/tools/cabana/canmessages.h +++ b/tools/cabana/canmessages.h @@ -1,8 +1,6 @@ #pragma once #include -#include -#include #include #include @@ -37,7 +35,6 @@ public: inline double totalSeconds() const { return replay->totalSeconds(); } inline double routeStartTime() const { return replay->routeStartTime() / (double)1e9; } inline double currentSec() const { return replay->currentSeconds(); } - const std::deque messages(const QString &id); inline const CanData &lastMessage(const QString &id) { return can_msgs[id]; } inline const Route* route() const { return replay->route(); } @@ -48,6 +45,7 @@ public: inline const std::vector> getTimeline() { return replay->getTimeline(); } signals: + void seekedTo(double sec); void streamStarted(); void eventsMerged(); void updated(); @@ -62,11 +60,9 @@ protected: void settingChanged(); Replay *replay = nullptr; - std::mutex lock; std::atomic counters_begin_sec = 0; std::atomic processing = false; QHash counters; - QHash> received_msgs; }; inline QString toHex(const QByteArray &dat) { diff --git a/tools/cabana/dbcmanager.cc b/tools/cabana/dbcmanager.cc index 1b33c4cc42..6d59cba7b6 100644 --- a/tools/cabana/dbcmanager.cc +++ b/tools/cabana/dbcmanager.cc @@ -99,6 +99,7 @@ void DBCManager::removeSignal(const QString &id, const QString &sig_name) { std::pair DBCManager::parseId(const QString &id) { const auto list = id.split(':'); + if (list.size() != 2) return {0, 0}; return {list[0].toInt(), list[1].toUInt(nullptr, 16)}; } diff --git a/tools/cabana/historylog.cc b/tools/cabana/historylog.cc index ec9a0b011c..37001dd582 100644 --- a/tools/cabana/historylog.cc +++ b/tools/cabana/historylog.cc @@ -5,14 +5,19 @@ // HistoryLogModel +HistoryLogModel::HistoryLogModel(QObject *parent) : QAbstractTableModel(parent) { + QObject::connect(can, &CANMessages::seekedTo, [this]() { + if (!msg_id.isEmpty()) setMessage(msg_id); + }); +} + QVariant HistoryLogModel::data(const QModelIndex &index, int role) const { if (role == Qt::DisplayRole) { const auto &m = messages[index.row()]; if (index.column() == 0) { - return QString::number(m.ts, 'f', 2); + return QString::number((m.mono_time / (double)1e9) - can->routeStartTime(), 'f', 2); } - return !sigs.empty() ? QString::number(get_raw_value((uint8_t *)m.dat.data(), m.dat.size(), *sigs[index.column() - 1])) - : toHex(m.dat); + return !sigs.empty() ? QString::number(m.sig_values[index.column() - 1]) : toHex(m.data); } else if (role == Qt::FontRole && index.column() == 1 && sigs.empty()) { return QFontDatabase::systemFont(QFontDatabase::FixedFont); } @@ -24,6 +29,7 @@ void HistoryLogModel::setMessage(const QString &message_id) { msg_id = message_id; sigs.clear(); messages.clear(); + has_more_data = true; if (auto dbc_msg = dbc()->msg(message_id)) { sigs = dbc_msg->getSignals(); } @@ -48,23 +54,60 @@ QVariant HistoryLogModel::headerData(int section, Qt::Orientation orientation, i } void HistoryLogModel::updateState() { - int prev_row_count = messages.size(); if (!msg_id.isEmpty()) { - messages = can->messages(msg_id); - } - int delta = messages.size() - prev_row_count; - if (delta > 0) { - beginInsertRows({}, prev_row_count, messages.size() - 1); - endInsertRows(); - } else if (delta < 0) { - beginRemoveRows({}, messages.size(), prev_row_count - 1); - endRemoveRows(); + uint64_t last_mono_time = messages.empty() ? 0 : messages.front().mono_time; + auto new_msgs = fetchData(last_mono_time, (can->currentSec() + can->routeStartTime()) * 1e9); + if ((has_more_data = !new_msgs.empty())) { + beginInsertRows({}, 0, new_msgs.size() - 1); + messages.insert(messages.begin(), std::move_iterator(new_msgs.begin()), std::move_iterator(new_msgs.end())); + endInsertRows(); + } } +} + +void HistoryLogModel::fetchMore(const QModelIndex &parent) { if (!messages.empty()) { - emit dataChanged(index(0, 0), index(rowCount() - 1, columnCount() - 1), {Qt::DisplayRole}); + auto new_msgs = fetchData(0, messages.back().mono_time); + if ((has_more_data = !new_msgs.empty())) { + beginInsertRows({}, messages.size(), messages.size() + new_msgs.size() - 1); + messages.insert(messages.end(), std::move_iterator(new_msgs.begin()), std::move_iterator(new_msgs.end())); + endInsertRows(); + } } } +std::deque HistoryLogModel::fetchData(uint64_t min_mono_time, uint64_t max_mono_time) { + auto events = can->events(); + auto it = std::lower_bound(events->begin(), events->end(), max_mono_time, [=](auto &e, uint64_t ts) { + return e->mono_time < ts; + }); + if (it == events->end() || it == events->begin()) + return {}; + + std::deque msgs; + const auto [src, address] = DBCManager::parseId(msg_id); + uint32_t cnt = 0; + for (--it; it != events->begin() && (*it)->mono_time > min_mono_time; --it) { + if ((*it)->which == cereal::Event::Which::CAN) { + for (const auto &c : (*it)->event.getCan()) { + if (src == c.getSrc() && address == c.getAddress()) { + const auto dat = c.getDat(); + auto &m = msgs.emplace_back(); + m.mono_time = (*it)->mono_time; + m.data.append((char *)dat.begin(), dat.size()); + m.sig_values.reserve(sigs.size()); + for (const Signal *sig : sigs) { + m.sig_values.push_back(get_raw_value((uint8_t *)dat.begin(), dat.size(), *sig)); + } + if (++cnt >= batch_size && min_mono_time == 0) + return msgs; + } + } + } + } + return msgs; +} + // HeaderView QSize HeaderView::sectionSizeFromContents(int logicalIndex) const { @@ -98,7 +141,3 @@ HistoryLog::HistoryLog(QWidget *parent) : QTableView(parent) { setFrameShape(QFrame::NoFrame); setSizePolicy(QSizePolicy::Preferred, QSizePolicy::Expanding); } - -int HistoryLog::sizeHintForColumn(int column) const { - return -1; -} diff --git a/tools/cabana/historylog.h b/tools/cabana/historylog.h index 9ca6f427c7..5a9903823f 100644 --- a/tools/cabana/historylog.h +++ b/tools/cabana/historylog.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -15,17 +16,27 @@ public: class HistoryLogModel : public QAbstractTableModel { public: - HistoryLogModel(QObject *parent) : QAbstractTableModel(parent) {} + HistoryLogModel(QObject *parent); void setMessage(const QString &message_id); void updateState(); QVariant headerData(int section, Qt::Orientation orientation, int role = Qt::DisplayRole) const override; QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const override; + void fetchMore(const QModelIndex &parent) override; + inline bool canFetchMore(const QModelIndex &parent) const override { return has_more_data; } int rowCount(const QModelIndex &parent = QModelIndex()) const override { return messages.size(); } int columnCount(const QModelIndex &parent = QModelIndex()) const override { return std::max(1ul, sigs.size()) + 1; } -private: + struct Message { + uint64_t mono_time = 0; + QVector sig_values; + QByteArray data; + }; + + std::deque fetchData(uint64_t min_mono_time, uint64_t max_mono_time); QString msg_id; - std::deque messages; + bool has_more_data = true; + const int batch_size = 50; + std::deque messages; std::vector sigs; }; @@ -36,6 +47,7 @@ public: void updateState() { model->updateState(); } private: - int sizeHintForColumn(int column) const override; + int sizeHintForColumn(int column) const override { return -1; }; + void showEvent(QShowEvent *event) override { model->setMessage(model->msg_id); }; HistoryLogModel *model; }; diff --git a/tools/cabana/settings.cc b/tools/cabana/settings.cc index c90830973b..63e26f3808 100644 --- a/tools/cabana/settings.cc +++ b/tools/cabana/settings.cc @@ -15,7 +15,6 @@ Settings::Settings() { void Settings::save() { QSettings s("settings", QSettings::IniFormat); s.setValue("fps", fps); - s.setValue("log_size", can_msg_log_size); s.setValue("cached_segment", cached_segment_limit); s.setValue("chart_height", chart_height); s.setValue("max_chart_x_range", max_chart_x_range); @@ -26,7 +25,6 @@ void Settings::save() { void Settings::load() { QSettings s("settings", QSettings::IniFormat); fps = s.value("fps", 10).toInt(); - can_msg_log_size = s.value("log_size", 50).toInt(); cached_segment_limit = s.value("cached_segment", 3).toInt(); chart_height = s.value("chart_height", 200).toInt(); max_chart_x_range = s.value("max_chart_x_range", 3 * 60).toInt(); @@ -46,12 +44,6 @@ SettingsDlg::SettingsDlg(QWidget *parent) : QDialog(parent) { fps->setValue(settings.fps); form_layout->addRow("FPS", fps); - log_size = new QSpinBox(this); - log_size->setRange(50, 500); - log_size->setSingleStep(10); - log_size->setValue(settings.can_msg_log_size); - form_layout->addRow(tr("Signal history log size"), log_size); - cached_segment = new QSpinBox(this); cached_segment->setRange(3, 60); cached_segment->setSingleStep(1); @@ -80,7 +72,6 @@ SettingsDlg::SettingsDlg(QWidget *parent) : QDialog(parent) { void SettingsDlg::save() { settings.fps = fps->value(); - settings.can_msg_log_size = log_size->value(); settings.cached_segment_limit = cached_segment->value(); settings.chart_height = chart_height->value(); settings.max_chart_x_range = max_chart_x_range->value() * 60; diff --git a/tools/cabana/settings.h b/tools/cabana/settings.h index ee6541798d..1db92fe231 100644 --- a/tools/cabana/settings.h +++ b/tools/cabana/settings.h @@ -14,7 +14,6 @@ public: void load(); int fps = 10; - int can_msg_log_size = 50; int cached_segment_limit = 3; int chart_height = 200; int max_chart_x_range = 3 * 60; // 3 minutes @@ -32,7 +31,6 @@ public: SettingsDlg(QWidget *parent); void save(); QSpinBox *fps; - QSpinBox *log_size ; QSpinBox *cached_segment; QSpinBox *chart_height; QSpinBox *max_chart_x_range; diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index 33541aeb74..a01371abe1 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -109,6 +109,7 @@ void Replay::seekTo(double seconds, bool relative) { cur_mono_time_ = route_start_ts_ + seconds * 1e9; return isSegmentMerged(seg); }); + emit seekedTo(seconds); queueSegment(); } @@ -209,11 +210,17 @@ void Replay::segmentLoadFinished(bool success) { void Replay::queueSegment() { if (segments_.empty()) return; - SegmentMap::iterator cur, end; - cur = end = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first)); - for (int i = 0; end != segments_.end() && i <= segment_cache_limit + FORWARD_FETCH_SEGS; ++i) { + SegmentMap::iterator begin, cur; + begin = cur = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first)); + int distance = std::max(std::ceil(segment_cache_limit / 2.0) - 1, segment_cache_limit - std::distance(cur, segments_.end())); + for (int i = 0; begin != segments_.begin() && i < distance; ++i) { + --begin; + } + auto end = begin; + for (int i = 0; end != segments_.end() && i < segment_cache_limit; ++i) { ++end; } + // load one segment at a time for (auto it = cur; it != end; ++it) { auto &[n, seg] = *it; @@ -227,12 +234,6 @@ void Replay::queueSegment() { } } - const auto &cur_segment = cur->second; - // merge the previous adjacent segment if it's loaded - auto begin = segments_.find(cur_segment->seg_num - 1); - if (begin == segments_.end() || !(begin->second && begin->second->isLoaded())) { - begin = cur; - } mergeSegments(begin, end); // free segments out of current semgnt window. @@ -240,6 +241,7 @@ void Replay::queueSegment() { std::for_each(end, segments_.end(), [](auto &e) { e.second.reset(nullptr); }); // start stream thread + const auto &cur_segment = cur->second; if (stream_thread_ == nullptr && cur_segment->isLoaded()) { startStream(cur_segment.get()); emit streamStarted(); @@ -247,12 +249,13 @@ void Replay::queueSegment() { } void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { - // merge 3 segments in sequence. std::vector segments_need_merge; size_t new_events_size = 0; - for (auto it = begin; it != end && it->second && it->second->isLoaded() && segments_need_merge.size() < segment_cache_limit; ++it) { - segments_need_merge.push_back(it->first); - new_events_size += it->second->log->events.size(); + for (auto it = begin; it != end; ++it) { + if (it->second && it->second->isLoaded()) { + segments_need_merge.push_back(it->first); + new_events_size += it->second->log->events.size(); + } } if (segments_need_merge != segments_merged_) { diff --git a/tools/replay/replay.h b/tools/replay/replay.h index 88c285125a..2c68443df0 100644 --- a/tools/replay/replay.h +++ b/tools/replay/replay.h @@ -10,7 +10,7 @@ const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36"; // one segment uses about 100M of memory -constexpr int FORWARD_FETCH_SEGS = 3; +constexpr int MIN_SEGMENTS_CACHE = 5; enum REPLAY_FLAGS { REPLAY_FLAG_NONE = 0x0000, @@ -58,7 +58,7 @@ public: event_filter = filter; } inline int segmentCacheLimit() const { return segment_cache_limit; } - inline void setSegmentCacheLimit(int n) { segment_cache_limit = std::max(3, n); } + inline void setSegmentCacheLimit(int n) { segment_cache_limit = std::max(MIN_SEGMENTS_CACHE, n); } inline bool hasFlag(REPLAY_FLAGS flag) const { return flags_ & flag; } inline void addFlag(REPLAY_FLAGS flag) { flags_ |= flag; } inline void removeFlag(REPLAY_FLAGS flag) { flags_ &= ~flag; } @@ -79,6 +79,7 @@ public: signals: void streamStarted(); void segmentsMerged(); + void seekedTo(double sec); protected slots: void segmentLoadFinished(bool success); @@ -133,5 +134,5 @@ protected: float speed_ = 1.0; replayEventFilter event_filter = nullptr; void *filter_opaque = nullptr; - int segment_cache_limit = 3; + int segment_cache_limit = MIN_SEGMENTS_CACHE; };