Cabana: display all logs in log view (#26659)

* fetch more

* clear log in showevent

* fix wrong time value

* check list size

* fix canmessages::process

* cache all events

* improve segment cache

* cleanup
old-commit-hash: f49520db0f
taco
Dean Lee 2 years ago committed by GitHub
parent 725e0665df
commit 79170305dc
  1. 33
      tools/cabana/canmessages.cc
  2. 6
      tools/cabana/canmessages.h
  3. 1
      tools/cabana/dbcmanager.cc
  4. 75
      tools/cabana/historylog.cc
  5. 20
      tools/cabana/historylog.h
  6. 9
      tools/cabana/settings.cc
  7. 2
      tools/cabana/settings.h
  8. 29
      tools/replay/replay.cc
  9. 7
      tools/replay/replay.h

@ -1,7 +1,4 @@
#include "tools/cabana/canmessages.h"
#include <QSettings>
#include "tools/cabana/dbcmanager.h"
CANMessages *can = nullptr;
@ -25,6 +22,7 @@ bool CANMessages::loadRoute(const QString &route, const QString &data_dir, uint3
replay = new Replay(route, {"can", "roadEncodeIdx", "wideRoadEncodeIdx", "carParams"}, {}, nullptr, replay_flags, data_dir, this);
replay->setSegmentCacheLimit(settings.cached_segment_limit);
replay->installEventFilter(event_filter, this);
QObject::connect(replay, &Replay::seekedTo, this, &CANMessages::seekedTo);
QObject::connect(replay, &Replay::segmentsMerged, this, &CANMessages::eventsMerged);
QObject::connect(replay, &Replay::streamStarted, this, &CANMessages::streamStarted);
if (replay->load()) {
@ -69,17 +67,13 @@ void CANMessages::process(QHash<QString, CanData> *messages) {
}
bool CANMessages::eventFilter(const Event *event) {
static std::unique_ptr<QHash<QString, CanData>> new_msgs;
static std::unique_ptr new_msgs = std::make_unique<QHash<QString, CanData>>();
static double prev_update_ts = 0;
if (event->which == cereal::Event::Which::CAN) {
if (!new_msgs) {
new_msgs.reset(new QHash<QString, CanData>);
new_msgs->reserve(1000);
}
double current_sec = replay->currentSeconds();
if (counters_begin_sec == 0 || counters_begin_sec >= current_sec) {
new_msgs->clear();
counters.clear();
counters_begin_sec = current_sec;
}
@ -87,40 +81,29 @@ bool CANMessages::eventFilter(const Event *event) {
auto can_events = event->event.getCan();
for (const auto &c : can_events) {
QString id = QString("%1:%2").arg(c.getSrc()).arg(c.getAddress(), 1, 16);
std::lock_guard lk(lock);
auto &list = received_msgs[id];
while (list.size() > settings.can_msg_log_size) {
list.pop_back();
}
CanData &data = list.emplace_front();
CanData &data = (*new_msgs)[id];
data.ts = current_sec;
data.dat.append((char *)c.getDat().begin(), c.getDat().size());
data.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size());
data.count = ++counters[id];
if (double delta = (current_sec - counters_begin_sec); delta > 0) {
data.freq = data.count / delta;
}
(*new_msgs)[id] = data;
}
double ts = millis_since_boot();
if ((ts - prev_update_ts) > (1000.0 / settings.fps) && !processing) {
if ((ts - prev_update_ts) > (1000.0 / settings.fps) && !processing && !new_msgs->isEmpty()) {
// delay posting CAN message if UI thread is busy
processing = true;
prev_update_ts = ts;
// use pointer to avoid data copy in queued connection.
emit received(new_msgs.release());
new_msgs.reset(new QHash<QString, CanData>);
new_msgs->reserve(100);
}
}
return true;
}
const std::deque<CanData> CANMessages::messages(const QString &id) {
std::lock_guard lk(lock);
return received_msgs[id];
}
void CANMessages::seekTo(double ts) {
replay->seekTo(std::max(double(0), ts), false);
counters_begin_sec = 0;

@ -1,8 +1,6 @@
#pragma once
#include <atomic>
#include <deque>
#include <mutex>
#include <QColor>
#include <QHash>
@ -37,7 +35,6 @@ public:
inline double totalSeconds() const { return replay->totalSeconds(); }
inline double routeStartTime() const { return replay->routeStartTime() / (double)1e9; }
inline double currentSec() const { return replay->currentSeconds(); }
const std::deque<CanData> messages(const QString &id);
inline const CanData &lastMessage(const QString &id) { return can_msgs[id]; }
inline const Route* route() const { return replay->route(); }
@ -48,6 +45,7 @@ public:
inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { return replay->getTimeline(); }
signals:
void seekedTo(double sec);
void streamStarted();
void eventsMerged();
void updated();
@ -62,11 +60,9 @@ protected:
void settingChanged();
Replay *replay = nullptr;
std::mutex lock;
std::atomic<double> counters_begin_sec = 0;
std::atomic<bool> processing = false;
QHash<QString, uint32_t> counters;
QHash<QString, std::deque<CanData>> received_msgs;
};
inline QString toHex(const QByteArray &dat) {

@ -99,6 +99,7 @@ void DBCManager::removeSignal(const QString &id, const QString &sig_name) {
std::pair<uint8_t, uint32_t> DBCManager::parseId(const QString &id) {
const auto list = id.split(':');
if (list.size() != 2) return {0, 0};
return {list[0].toInt(), list[1].toUInt(nullptr, 16)};
}

@ -5,14 +5,19 @@
// HistoryLogModel
HistoryLogModel::HistoryLogModel(QObject *parent) : QAbstractTableModel(parent) {
QObject::connect(can, &CANMessages::seekedTo, [this]() {
if (!msg_id.isEmpty()) setMessage(msg_id);
});
}
QVariant HistoryLogModel::data(const QModelIndex &index, int role) const {
if (role == Qt::DisplayRole) {
const auto &m = messages[index.row()];
if (index.column() == 0) {
return QString::number(m.ts, 'f', 2);
return QString::number((m.mono_time / (double)1e9) - can->routeStartTime(), 'f', 2);
}
return !sigs.empty() ? QString::number(get_raw_value((uint8_t *)m.dat.data(), m.dat.size(), *sigs[index.column() - 1]))
: toHex(m.dat);
return !sigs.empty() ? QString::number(m.sig_values[index.column() - 1]) : toHex(m.data);
} else if (role == Qt::FontRole && index.column() == 1 && sigs.empty()) {
return QFontDatabase::systemFont(QFontDatabase::FixedFont);
}
@ -24,6 +29,7 @@ void HistoryLogModel::setMessage(const QString &message_id) {
msg_id = message_id;
sigs.clear();
messages.clear();
has_more_data = true;
if (auto dbc_msg = dbc()->msg(message_id)) {
sigs = dbc_msg->getSignals();
}
@ -48,23 +54,60 @@ QVariant HistoryLogModel::headerData(int section, Qt::Orientation orientation, i
}
void HistoryLogModel::updateState() {
int prev_row_count = messages.size();
if (!msg_id.isEmpty()) {
messages = can->messages(msg_id);
}
int delta = messages.size() - prev_row_count;
if (delta > 0) {
beginInsertRows({}, prev_row_count, messages.size() - 1);
endInsertRows();
} else if (delta < 0) {
beginRemoveRows({}, messages.size(), prev_row_count - 1);
endRemoveRows();
uint64_t last_mono_time = messages.empty() ? 0 : messages.front().mono_time;
auto new_msgs = fetchData(last_mono_time, (can->currentSec() + can->routeStartTime()) * 1e9);
if ((has_more_data = !new_msgs.empty())) {
beginInsertRows({}, 0, new_msgs.size() - 1);
messages.insert(messages.begin(), std::move_iterator(new_msgs.begin()), std::move_iterator(new_msgs.end()));
endInsertRows();
}
}
}
void HistoryLogModel::fetchMore(const QModelIndex &parent) {
if (!messages.empty()) {
emit dataChanged(index(0, 0), index(rowCount() - 1, columnCount() - 1), {Qt::DisplayRole});
auto new_msgs = fetchData(0, messages.back().mono_time);
if ((has_more_data = !new_msgs.empty())) {
beginInsertRows({}, messages.size(), messages.size() + new_msgs.size() - 1);
messages.insert(messages.end(), std::move_iterator(new_msgs.begin()), std::move_iterator(new_msgs.end()));
endInsertRows();
}
}
}
std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t min_mono_time, uint64_t max_mono_time) {
auto events = can->events();
auto it = std::lower_bound(events->begin(), events->end(), max_mono_time, [=](auto &e, uint64_t ts) {
return e->mono_time < ts;
});
if (it == events->end() || it == events->begin())
return {};
std::deque<HistoryLogModel::Message> msgs;
const auto [src, address] = DBCManager::parseId(msg_id);
uint32_t cnt = 0;
for (--it; it != events->begin() && (*it)->mono_time > min_mono_time; --it) {
if ((*it)->which == cereal::Event::Which::CAN) {
for (const auto &c : (*it)->event.getCan()) {
if (src == c.getSrc() && address == c.getAddress()) {
const auto dat = c.getDat();
auto &m = msgs.emplace_back();
m.mono_time = (*it)->mono_time;
m.data.append((char *)dat.begin(), dat.size());
m.sig_values.reserve(sigs.size());
for (const Signal *sig : sigs) {
m.sig_values.push_back(get_raw_value((uint8_t *)dat.begin(), dat.size(), *sig));
}
if (++cnt >= batch_size && min_mono_time == 0)
return msgs;
}
}
}
}
return msgs;
}
// HeaderView
QSize HeaderView::sectionSizeFromContents(int logicalIndex) const {
@ -98,7 +141,3 @@ HistoryLog::HistoryLog(QWidget *parent) : QTableView(parent) {
setFrameShape(QFrame::NoFrame);
setSizePolicy(QSizePolicy::Preferred, QSizePolicy::Expanding);
}
int HistoryLog::sizeHintForColumn(int column) const {
return -1;
}

@ -1,5 +1,6 @@
#pragma once
#include <deque>
#include <QHeaderView>
#include <QTableView>
@ -15,17 +16,27 @@ public:
class HistoryLogModel : public QAbstractTableModel {
public:
HistoryLogModel(QObject *parent) : QAbstractTableModel(parent) {}
HistoryLogModel(QObject *parent);
void setMessage(const QString &message_id);
void updateState();
QVariant headerData(int section, Qt::Orientation orientation, int role = Qt::DisplayRole) const override;
QVariant data(const QModelIndex &index, int role = Qt::DisplayRole) const override;
void fetchMore(const QModelIndex &parent) override;
inline bool canFetchMore(const QModelIndex &parent) const override { return has_more_data; }
int rowCount(const QModelIndex &parent = QModelIndex()) const override { return messages.size(); }
int columnCount(const QModelIndex &parent = QModelIndex()) const override { return std::max(1ul, sigs.size()) + 1; }
private:
struct Message {
uint64_t mono_time = 0;
QVector<double> sig_values;
QByteArray data;
};
std::deque<Message> fetchData(uint64_t min_mono_time, uint64_t max_mono_time);
QString msg_id;
std::deque<CanData> messages;
bool has_more_data = true;
const int batch_size = 50;
std::deque<Message> messages;
std::vector<const Signal*> sigs;
};
@ -36,6 +47,7 @@ public:
void updateState() { model->updateState(); }
private:
int sizeHintForColumn(int column) const override;
int sizeHintForColumn(int column) const override { return -1; };
void showEvent(QShowEvent *event) override { model->setMessage(model->msg_id); };
HistoryLogModel *model;
};

@ -15,7 +15,6 @@ Settings::Settings() {
void Settings::save() {
QSettings s("settings", QSettings::IniFormat);
s.setValue("fps", fps);
s.setValue("log_size", can_msg_log_size);
s.setValue("cached_segment", cached_segment_limit);
s.setValue("chart_height", chart_height);
s.setValue("max_chart_x_range", max_chart_x_range);
@ -26,7 +25,6 @@ void Settings::save() {
void Settings::load() {
QSettings s("settings", QSettings::IniFormat);
fps = s.value("fps", 10).toInt();
can_msg_log_size = s.value("log_size", 50).toInt();
cached_segment_limit = s.value("cached_segment", 3).toInt();
chart_height = s.value("chart_height", 200).toInt();
max_chart_x_range = s.value("max_chart_x_range", 3 * 60).toInt();
@ -46,12 +44,6 @@ SettingsDlg::SettingsDlg(QWidget *parent) : QDialog(parent) {
fps->setValue(settings.fps);
form_layout->addRow("FPS", fps);
log_size = new QSpinBox(this);
log_size->setRange(50, 500);
log_size->setSingleStep(10);
log_size->setValue(settings.can_msg_log_size);
form_layout->addRow(tr("Signal history log size"), log_size);
cached_segment = new QSpinBox(this);
cached_segment->setRange(3, 60);
cached_segment->setSingleStep(1);
@ -80,7 +72,6 @@ SettingsDlg::SettingsDlg(QWidget *parent) : QDialog(parent) {
void SettingsDlg::save() {
settings.fps = fps->value();
settings.can_msg_log_size = log_size->value();
settings.cached_segment_limit = cached_segment->value();
settings.chart_height = chart_height->value();
settings.max_chart_x_range = max_chart_x_range->value() * 60;

@ -14,7 +14,6 @@ public:
void load();
int fps = 10;
int can_msg_log_size = 50;
int cached_segment_limit = 3;
int chart_height = 200;
int max_chart_x_range = 3 * 60; // 3 minutes
@ -32,7 +31,6 @@ public:
SettingsDlg(QWidget *parent);
void save();
QSpinBox *fps;
QSpinBox *log_size ;
QSpinBox *cached_segment;
QSpinBox *chart_height;
QSpinBox *max_chart_x_range;

@ -109,6 +109,7 @@ void Replay::seekTo(double seconds, bool relative) {
cur_mono_time_ = route_start_ts_ + seconds * 1e9;
return isSegmentMerged(seg);
});
emit seekedTo(seconds);
queueSegment();
}
@ -209,11 +210,17 @@ void Replay::segmentLoadFinished(bool success) {
void Replay::queueSegment() {
if (segments_.empty()) return;
SegmentMap::iterator cur, end;
cur = end = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first));
for (int i = 0; end != segments_.end() && i <= segment_cache_limit + FORWARD_FETCH_SEGS; ++i) {
SegmentMap::iterator begin, cur;
begin = cur = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first));
int distance = std::max<int>(std::ceil(segment_cache_limit / 2.0) - 1, segment_cache_limit - std::distance(cur, segments_.end()));
for (int i = 0; begin != segments_.begin() && i < distance; ++i) {
--begin;
}
auto end = begin;
for (int i = 0; end != segments_.end() && i < segment_cache_limit; ++i) {
++end;
}
// load one segment at a time
for (auto it = cur; it != end; ++it) {
auto &[n, seg] = *it;
@ -227,12 +234,6 @@ void Replay::queueSegment() {
}
}
const auto &cur_segment = cur->second;
// merge the previous adjacent segment if it's loaded
auto begin = segments_.find(cur_segment->seg_num - 1);
if (begin == segments_.end() || !(begin->second && begin->second->isLoaded())) {
begin = cur;
}
mergeSegments(begin, end);
// free segments out of current semgnt window.
@ -240,6 +241,7 @@ void Replay::queueSegment() {
std::for_each(end, segments_.end(), [](auto &e) { e.second.reset(nullptr); });
// start stream thread
const auto &cur_segment = cur->second;
if (stream_thread_ == nullptr && cur_segment->isLoaded()) {
startStream(cur_segment.get());
emit streamStarted();
@ -247,12 +249,13 @@ void Replay::queueSegment() {
}
void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
// merge 3 segments in sequence.
std::vector<int> segments_need_merge;
size_t new_events_size = 0;
for (auto it = begin; it != end && it->second && it->second->isLoaded() && segments_need_merge.size() < segment_cache_limit; ++it) {
segments_need_merge.push_back(it->first);
new_events_size += it->second->log->events.size();
for (auto it = begin; it != end; ++it) {
if (it->second && it->second->isLoaded()) {
segments_need_merge.push_back(it->first);
new_events_size += it->second->log->events.size();
}
}
if (segments_need_merge != segments_merged_) {

@ -10,7 +10,7 @@
const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36";
// one segment uses about 100M of memory
constexpr int FORWARD_FETCH_SEGS = 3;
constexpr int MIN_SEGMENTS_CACHE = 5;
enum REPLAY_FLAGS {
REPLAY_FLAG_NONE = 0x0000,
@ -58,7 +58,7 @@ public:
event_filter = filter;
}
inline int segmentCacheLimit() const { return segment_cache_limit; }
inline void setSegmentCacheLimit(int n) { segment_cache_limit = std::max(3, n); }
inline void setSegmentCacheLimit(int n) { segment_cache_limit = std::max(MIN_SEGMENTS_CACHE, n); }
inline bool hasFlag(REPLAY_FLAGS flag) const { return flags_ & flag; }
inline void addFlag(REPLAY_FLAGS flag) { flags_ |= flag; }
inline void removeFlag(REPLAY_FLAGS flag) { flags_ &= ~flag; }
@ -79,6 +79,7 @@ public:
signals:
void streamStarted();
void segmentsMerged();
void seekedTo(double sec);
protected slots:
void segmentLoadFinished(bool success);
@ -133,5 +134,5 @@ protected:
float speed_ = 1.0;
replayEventFilter event_filter = nullptr;
void *filter_opaque = nullptr;
int segment_cache_limit = 3;
int segment_cache_limit = MIN_SEGMENTS_CACHE;
};

Loading…
Cancel
Save