cabana: group CAN events by message id to reduce the time complex from O(N) to O(1) (#27601)

* refactor streams

* helper function xLessThan

* fix fill gaps
old-commit-hash: 74db5a4b42
beeps
Dean Lee 2 years ago committed by GitHub
parent 472d08fc2c
commit 35e5b4d927
  1. 2
      tools/cabana/binaryview.cc
  2. 84
      tools/cabana/chartswidget.cc
  3. 2
      tools/cabana/dbc.cc
  4. 6
      tools/cabana/dbc.h
  5. 1
      tools/cabana/dbcmanager.cc
  6. 43
      tools/cabana/historylog.cc
  7. 10
      tools/cabana/mainwin.cc
  8. 6
      tools/cabana/messageswidget.cc
  9. 103
      tools/cabana/streams/abstractstream.cc
  10. 20
      tools/cabana/streams/abstractstream.h
  11. 9
      tools/cabana/streams/livestream.cc
  12. 8
      tools/cabana/streams/livestream.h
  13. 13
      tools/cabana/streams/replaystream.cc
  14. 3
      tools/cabana/streams/replaystream.h
  15. 39
      tools/cabana/tools/findsimilarbits.cc
  16. 1
      tools/replay/replay.h

@ -422,7 +422,7 @@ void BinaryItemDelegate::drawBorder(QPainter* painter, const QStyleOptionViewIte
painter->setClipRegion(QRegion(rc).subtracted(subtract)); painter->setClipRegion(QRegion(rc).subtracted(subtract));
if (!subtract.isEmpty()) { if (!subtract.isEmpty()) {
// fill gaps inside corners. // fill gaps inside corners.
painter->setPen(QPen(border_color, 2)); painter->setPen(QPen(border_color, 2, Qt::SolidLine, Qt::SquareCap, Qt::MiterJoin));
for (auto &r : subtract) { for (auto &r : subtract) {
painter->drawRect(r); painter->drawRect(r);
} }

@ -16,6 +16,8 @@
#include <QtConcurrent> #include <QtConcurrent>
const int MAX_COLUMN_COUNT = 4; const int MAX_COLUMN_COUNT = 4;
static inline bool xLessThan(const QPointF &p, float x) { return p.x() < x; }
// ChartsWidget // ChartsWidget
ChartsWidget::ChartsWidget(QWidget *parent) : QFrame(parent) { ChartsWidget::ChartsWidget(QWidget *parent) : QFrame(parent) {
@ -135,13 +137,11 @@ void ChartsWidget::updateState() {
const double cur_sec = can->currentSec(); const double cur_sec = can->currentSec();
if (!is_zoomed) { if (!is_zoomed) {
double pos = (cur_sec - display_range.first) / std::max(1.0, (display_range.second - display_range.first)); double pos = (cur_sec - display_range.first) / std::max<float>(1.0, max_chart_range);
if (pos < 0 || pos > 0.8) { if (pos < 0 || pos > 0.8) {
display_range.first = std::max(0.0, cur_sec - max_chart_range * 0.1); display_range.first = std::max(0.0, cur_sec - max_chart_range * 0.1);
} }
auto events = can->events(); double max_sec = std::min(std::floor(display_range.first + max_chart_range), can->lastEventSecond());
double max_event_sec = events->empty() ? 0 : (events->back()->mono_time / 1e9 - can->routeStartTime());
double max_sec = std::min(std::floor(display_range.first + max_chart_range), max_event_sec);
display_range.first = std::max(0.0, max_sec - max_chart_range); display_range.first = std::max(0.0, max_sec - max_chart_range);
display_range.second = display_range.first + max_chart_range; display_range.second = display_range.first + max_chart_range;
} else if (cur_sec < zoomed_range.first || cur_sec >= zoomed_range.second) { } else if (cur_sec < zoomed_range.first || cur_sec >= zoomed_range.second) {
@ -326,6 +326,7 @@ ChartView::ChartView(QWidget *parent) : QChartView(nullptr, parent) {
setRenderHint(QPainter::Antialiasing); setRenderHint(QPainter::Antialiasing);
// TODO: enable zoomIn/seekTo in live streaming mode. // TODO: enable zoomIn/seekTo in live streaming mode.
setRubberBand(can->liveStreaming() ? QChartView::NoRubberBand : QChartView::HorizontalRubberBand); setRubberBand(can->liveStreaming() ? QChartView::NoRubberBand : QChartView::HorizontalRubberBand);
setMouseTracking(true);
QObject::connect(dbc(), &DBCManager::signalRemoved, this, &ChartView::signalRemoved); QObject::connect(dbc(), &DBCManager::signalRemoved, this, &ChartView::signalRemoved);
QObject::connect(dbc(), &DBCManager::signalUpdated, this, &ChartView::signalUpdated); QObject::connect(dbc(), &DBCManager::signalUpdated, this, &ChartView::signalUpdated);
@ -481,8 +482,8 @@ void ChartView::updatePlot(double cur, double min, double max) {
void ChartView::updateSeriesPoints() { void ChartView::updateSeriesPoints() {
// Show points when zoomed in enough // Show points when zoomed in enough
for (auto &s : sigs) { for (auto &s : sigs) {
auto begin = std::lower_bound(s.vals.begin(), s.vals.end(), axis_x->min(), [](auto &p, double x) { return p.x() < x; }); auto begin = std::lower_bound(s.vals.begin(), s.vals.end(), axis_x->min(), xLessThan);
auto end = std::lower_bound(begin, s.vals.end(), axis_x->max(), [](auto &p, double x) { return p.x() < x; }); auto end = std::lower_bound(begin, s.vals.end(), axis_x->max(), xLessThan);
int num_points = std::max<int>(end - begin, 1); int num_points = std::max<int>(end - begin, 1);
int pixels_per_point = width() / num_points; int pixels_per_point = width() / num_points;
@ -496,64 +497,33 @@ void ChartView::updateSeriesPoints() {
} }
void ChartView::updateSeries(const cabana::Signal *sig) { void ChartView::updateSeries(const cabana::Signal *sig) {
const auto events = can->events();
for (auto &s : sigs) { for (auto &s : sigs) {
if (!sig || s.sig == sig) { if (!sig || s.sig == sig) {
if (!can->liveStreaming()) { if (!can->liveStreaming()) {
s.vals.clear(); s.vals.clear();
s.step_vals.clear(); s.step_vals.clear();
s.vals.reserve(settings.max_cached_minutes * 60 * 100); // [n]seconds * 100hz
s.step_vals.reserve(settings.max_cached_minutes * 60 * 100 * 2);
s.last_value_mono_time = 0; s.last_value_mono_time = 0;
} }
s.series->setColor(getColor(s.sig)); s.series->setColor(getColor(s.sig));
struct Chunk { auto msgs = can->events().at(s.msg_id);
std::vector<Event *>::const_iterator first, second; auto first = std::upper_bound(msgs.cbegin(), msgs.cend(), CanEvent{.mono_time=s.last_value_mono_time});
QVector<QPointF> vals; int new_size = std::max<int>(s.vals.size() + std::distance(first, msgs.cend()), settings.max_cached_minutes * 60 * 100);
QVector<QPointF> step_vals; if (s.vals.capacity() <= new_size) {
}; s.vals.reserve(new_size * 2);
// split into one minitue chunks s.step_vals.reserve(new_size * 4);
QVector<Chunk> chunks;
Event begin_event(cereal::Event::Which::INIT_DATA, s.last_value_mono_time);
auto begin = std::upper_bound(events->begin(), events->end(), &begin_event, Event::lessThan());
for (auto it = begin, second = begin; it != events->end(); it = second) {
second = std::lower_bound(it, events->end(), (*it)->mono_time + 1e9 * 60, [](auto &e, uint64_t ts) { return e->mono_time < ts; });
chunks.push_back({it, second});
} }
QtConcurrent::blockingMap(chunks, [&](Chunk &chunk) { const double route_start_time = can->routeStartTime();
chunk.vals.reserve(60 * 100); // 100 hz for (auto end = msgs.cend(); first != end; ++first) {
chunk.step_vals.reserve(60 * 100 * 2); // 100 hz double value = get_raw_value(first->dat, first->size, *s.sig);
double route_start_time = can->routeStartTime(); double ts = first->mono_time / 1e9 - route_start_time; // seconds
for (auto it = chunk.first; it != chunk.second; ++it) { s.vals.append({ts, value});
if ((*it)->which == cereal::Event::Which::CAN) { if (!s.step_vals.empty()) {
for (const auto &c : (*it)->event.getCan()) { s.step_vals.append({ts, s.step_vals.back().y()});
if (s.msg_id.address == c.getAddress() && s.msg_id.source == c.getSrc()) {
auto dat = c.getDat();
double value = get_raw_value((uint8_t *)dat.begin(), dat.size(), *s.sig);
double ts = ((*it)->mono_time / (double)1e9) - route_start_time; // seconds
chunk.vals.push_back({ts, value});
if (!chunk.step_vals.empty()) {
chunk.step_vals.push_back({ts, chunk.step_vals.back().y()});
}
chunk.step_vals.push_back({ts,value});
}
}
}
}
});
for (auto &c : chunks) {
s.vals.append(c.vals);
if (!c.step_vals.empty()) {
if (!s.step_vals.empty()) {
s.step_vals.append({c.step_vals.first().x(), s.step_vals.back().y()});
}
s.step_vals.append(c.step_vals);
} }
} s.step_vals.append({ts, value});
if (events->size()) { s.last_value_mono_time = first->mono_time;
s.last_value_mono_time = events->back()->mono_time;
} }
if (!can->liveStreaming()) { if (!can->liveStreaming()) {
s.segment_tree.build(s.vals); s.segment_tree.build(s.vals);
@ -580,8 +550,8 @@ void ChartView::updateAxisY() {
unit.clear(); unit.clear();
} }
auto first = std::lower_bound(s.vals.begin(), s.vals.end(), axis_x->min(), [](auto &p, double x) { return p.x() < x; }); auto first = std::lower_bound(s.vals.begin(), s.vals.end(), axis_x->min(), xLessThan);
auto last = std::lower_bound(first, s.vals.end(), axis_x->max(), [](auto &p, double x) { return p.x() < x; }); auto last = std::lower_bound(first, s.vals.end(), axis_x->max(), xLessThan);
if (can->liveStreaming()) { if (can->liveStreaming()) {
for (auto it = first; it != last; ++it) { for (auto it = first; it != last; ++it) {
if (it->y() < min) min = it->y(); if (it->y() < min) min = it->y();
@ -826,8 +796,8 @@ void ChartView::drawForeground(QPainter *painter, const QRectF &rect) {
painter->setPen(Qt::NoPen); painter->setPen(Qt::NoPen);
for (auto &s : sigs) { for (auto &s : sigs) {
if (s.series->useOpenGL() && s.series->isVisible() && s.series->pointsVisible()) { if (s.series->useOpenGL() && s.series->isVisible() && s.series->pointsVisible()) {
auto first = std::lower_bound(s.vals.begin(), s.vals.end(), axis_x->min(), [](auto &p, double x) { return p.x() < x; }); auto first = std::lower_bound(s.vals.begin(), s.vals.end(), axis_x->min(), xLessThan);
auto last = std::lower_bound(first, s.vals.end(), axis_x->max(), [](auto &p, double x) { return p.x() < x; }); auto last = std::lower_bound(first, s.vals.end(), axis_x->max(), xLessThan);
for (auto it = first; it != last; ++it) { for (auto it = first; it != last; ++it) {
painter->setBrush(s.series->color()); painter->setBrush(s.series->color());
painter->drawEllipse(chart()->mapToPosition(*it), 4, 4); painter->drawEllipse(chart()->mapToPosition(*it), 4, 4);
@ -929,7 +899,7 @@ SeriesSelector::SeriesSelector(QString title, QWidget *parent) : QDialog(parent)
auto buttonBox = new QDialogButtonBox(QDialogButtonBox::Ok | QDialogButtonBox::Cancel); auto buttonBox = new QDialogButtonBox(QDialogButtonBox::Ok | QDialogButtonBox::Cancel);
main_layout->addWidget(buttonBox, 3, 2); main_layout->addWidget(buttonBox, 3, 2);
for (auto it = can->can_msgs.cbegin(); it != can->can_msgs.cend(); ++it) { for (auto it = can->last_msgs.cbegin(); it != can->last_msgs.cend(); ++it) {
if (auto m = dbc()->msg(it.key())) { if (auto m = dbc()->msg(it.key())) {
msgs_combo->addItem(QString("%1 (%2)").arg(m->name).arg(it.key().toString()), QVariant::fromValue(it.key())); msgs_combo->addItem(QString("%1 (%2)").arg(m->name).arg(it.key().toString()), QVariant::fromValue(it.key()));
} }

@ -27,7 +27,7 @@ static QVector<int> BIG_ENDIAN_START_BITS = []() {
return ret; return ret;
}(); }();
double get_raw_value(uint8_t *data, size_t data_size, const cabana::Signal &sig) { double get_raw_value(const uint8_t *data, size_t data_size, const cabana::Signal &sig) {
int64_t val = 0; int64_t val = 0;
int i = sig.msb / 8; int i = sig.msb / 8;

@ -38,6 +38,10 @@ struct MessageId {
uint qHash(const MessageId &item); uint qHash(const MessageId &item);
Q_DECLARE_METATYPE(MessageId); Q_DECLARE_METATYPE(MessageId);
template <>
struct std::hash<MessageId> {
std::size_t operator()(const MessageId &k) const noexcept { return qHash(k); }
};
typedef QList<std::pair<QString, QString>> ValueDescription; typedef QList<std::pair<QString, QString>> ValueDescription;
@ -72,7 +76,7 @@ namespace cabana {
} }
// Helper functions // Helper functions
double get_raw_value(uint8_t *data, size_t data_size, const cabana::Signal &sig); double get_raw_value(const uint8_t *data, size_t data_size, const cabana::Signal &sig);
int bigEndianStartBitsIndex(int start_bit); int bigEndianStartBitsIndex(int start_bit);
int bigEndianBitIndex(int index); int bigEndianBitIndex(int index);
void updateSigSizeParamsFromRange(cabana::Signal &s, int start_bit, int size); void updateSigSizeParamsFromRange(cabana::Signal &s, int start_bit, int size);

@ -212,4 +212,3 @@ DBCManager *dbc() {
static DBCManager dbc_manager(nullptr); static DBCManager dbc_manager(nullptr);
return &dbc_manager; return &dbc_manager;
} }

@ -119,40 +119,31 @@ template <class InputIt>
std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(InputIt first, InputIt last, uint64_t min_time) { std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(InputIt first, InputIt last, uint64_t min_time) {
std::deque<HistoryLogModel::Message> msgs; std::deque<HistoryLogModel::Message> msgs;
QVector<double> values(sigs.size()); QVector<double> values(sigs.size());
for (auto it = first; it != last && (*it)->mono_time > min_time; ++it) { for (; first != last && first->mono_time > min_time; ++first) {
if ((*it)->which == cereal::Event::Which::CAN) { for (int i = 0; i < sigs.size(); ++i) {
for (const auto &c : (*it)->event.getCan()) { values[i] = get_raw_value(first->dat, first->size, *sigs[i]);
if (msg_id.address == c.getAddress() && msg_id.source == c.getSrc()) { }
const auto dat = c.getDat(); if (!filter_cmp || filter_cmp(values[filter_sig_idx], filter_value)) {
for (int i = 0; i < sigs.size(); ++i) { auto &m = msgs.emplace_back();
values[i] = get_raw_value((uint8_t *)dat.begin(), dat.size(), *sigs[i]); m.mono_time = first->mono_time;
} m.data = QByteArray((const char *)first->dat, first->size);
if (!filter_cmp || filter_cmp(values[filter_sig_idx], filter_value)) { m.sig_values = values;
auto &m = msgs.emplace_back(); if (msgs.size() >= batch_size && min_time == 0) {
m.mono_time = (*it)->mono_time; return msgs;
m.data = QByteArray((char *)dat.begin(), dat.size());
m.sig_values = values;
if (msgs.size() >= batch_size && min_time == 0)
return msgs;
}
}
} }
} }
} }
return msgs; return msgs;
} }
template std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData<>(std::vector<const Event*>::iterator first, std::vector<const Event*>::iterator last, uint64_t min_time);
template std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData<>(std::vector<const Event*>::reverse_iterator first, std::vector<const Event*>::reverse_iterator last, uint64_t min_time);
std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_time, uint64_t min_time) { std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_time, uint64_t min_time) {
auto events = can->events(); const auto &events = can->events().at(msg_id);
const auto freq = can->lastMessage(msg_id).freq; const auto freq = can->lastMessage(msg_id).freq;
const bool update_colors = !display_signals_mode || sigs.empty(); const bool update_colors = !display_signals_mode || sigs.empty();
if (dynamic_mode) { if (dynamic_mode) {
auto first = std::upper_bound(events->rbegin(), events->rend(), from_time, [=](uint64_t ts, auto &e) { return e->mono_time < ts; }); auto first = std::upper_bound(events.rbegin(), events.rend(), CanEvent{.mono_time=from_time}, std::greater<CanEvent>());
auto msgs = fetchData(first, events->rend(), min_time); auto msgs = fetchData(first, events.rend(), min_time);
if (update_colors && (min_time > 0 || messages.empty())) { if (update_colors && (min_time > 0 || messages.empty())) {
for (auto it = msgs.rbegin(); it != msgs.rend(); ++it) { for (auto it = msgs.rbegin(); it != msgs.rend(); ++it) {
hex_colors.compute(it->data, it->mono_time / (double)1e9, freq); hex_colors.compute(it->data, it->mono_time / (double)1e9, freq);
@ -162,10 +153,10 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_ti
return msgs; return msgs;
} else { } else {
assert(min_time == 0); assert(min_time == 0);
auto first = std::upper_bound(events->begin(), events->end(), from_time, [=](uint64_t ts, auto &e) { return ts < e->mono_time; }); auto first = std::upper_bound(events.begin(), events.end(), CanEvent{.mono_time=from_time});
auto msgs = fetchData(first, events->end(), 0); auto msgs = fetchData(first, events.end(), 0);
if (update_colors) { if (update_colors) {
for (auto it = msgs.rbegin(); it != msgs.rend(); ++it) { for (auto it = msgs.begin(); it != msgs.end(); ++it) {
hex_colors.compute(it->data, it->mono_time / (double)1e9, freq); hex_colors.compute(it->data, it->mono_time / (double)1e9, freq);
it->colors = hex_colors.colors; it->colors = hex_colors.colors;
} }

@ -446,15 +446,7 @@ void MainWindow::updateDownloadProgress(uint64_t cur, uint64_t total, bool succe
} }
void MainWindow::updateStatus() { void MainWindow::updateStatus() {
float cached_minutes = 0; status_label->setText(tr("Cached Minutes:%1 FPS:%2").arg(settings.max_cached_minutes).arg(settings.fps));
if (!can->liveStreaming()) {
if (auto events = can->events(); !events->empty()) {
cached_minutes = (events->back()->mono_time - events->front()->mono_time) / (1e9 * 60);
}
} else {
settings.max_cached_minutes = settings.max_cached_minutes;
}
status_label->setText(tr("Cached Minutes:%1 FPS:%2").arg(cached_minutes, 0, 'f', 1).arg(settings.fps));
} }
void MainWindow::dockCharts(bool dock) { void MainWindow::dockCharts(bool dock) {

@ -160,7 +160,7 @@ void MessageListModel::setFilterString(const QString &string) {
filter_str = string; filter_str = string;
msgs.clear(); msgs.clear();
for (auto it = can->can_msgs.begin(); it != can->can_msgs.end(); ++it) { for (auto it = can->last_msgs.begin(); it != can->last_msgs.end(); ++it) {
if (filter_str.isEmpty() || contains(it.key(), filter_str)) { if (filter_str.isEmpty() || contains(it.key(), filter_str)) {
msgs.push_back(it.key()); msgs.push_back(it.key());
} }
@ -206,8 +206,8 @@ void MessageListModel::sortMessages() {
void MessageListModel::msgsReceived(const QHash<MessageId, CanData> *new_msgs) { void MessageListModel::msgsReceived(const QHash<MessageId, CanData> *new_msgs) {
int prev_row_count = msgs.size(); int prev_row_count = msgs.size();
if (filter_str.isEmpty() && msgs.size() != can->can_msgs.size()) { if (filter_str.isEmpty() && msgs.size() != can->last_msgs.size()) {
msgs = can->can_msgs.keys(); msgs = can->last_msgs.keys();
} }
if (msgs.size() != prev_row_count) { if (msgs.size() != prev_row_count) {
sortMessages(); sortMessages();

@ -1,6 +1,5 @@
#include "tools/cabana/streams/abstractstream.h" #include "tools/cabana/streams/abstractstream.h"
#include <QTimer> #include <QTimer>
#include <QtConcurrent>
AbstractStream *can = nullptr; AbstractStream *can = nullptr;
@ -13,7 +12,7 @@ AbstractStream::AbstractStream(QObject *parent, bool is_live_streaming) : is_liv
void AbstractStream::process(QHash<MessageId, CanData> *messages) { void AbstractStream::process(QHash<MessageId, CanData> *messages) {
for (auto it = messages->begin(); it != messages->end(); ++it) { for (auto it = messages->begin(); it != messages->end(); ++it) {
can_msgs[it.key()] = it.value(); last_msgs[it.key()] = it.value();
} }
emit updated(); emit updated();
emit msgsReceived(messages); emit msgsReceived(messages);
@ -62,66 +61,68 @@ bool AbstractStream::updateEvent(const Event *event) {
const CanData &AbstractStream::lastMessage(const MessageId &id) { const CanData &AbstractStream::lastMessage(const MessageId &id) {
static CanData empty_data; static CanData empty_data;
auto it = can_msgs.find(id); auto it = last_msgs.find(id);
return it != can_msgs.end() ? it.value() : empty_data; return it != last_msgs.end() ? it.value() : empty_data;
} }
static QHash<MessageId, CanData> parseEvents(std::vector<Event *>::const_reverse_iterator first,
std::vector<Event *>::const_reverse_iterator last, double route_start_time) {
QHash<MessageId, CanData> msgs;
msgs.reserve(500);
for (auto it = first; it != last; ++it) {
if ((*it)->which == cereal::Event::Which::CAN) {
for (const auto &c : (*it)->event.getCan()) {
auto &m = msgs[{.source = c.getSrc(), .address = c.getAddress()}];
if (++m.count == 1) {
m.ts = ((*it)->mono_time / 1e9) - route_start_time;
m.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size());
m.colors = QVector<QColor>(m.dat.size(), QColor(0, 0, 0, 0));
m.last_change_t = QVector<double>(m.dat.size(), m.ts);
m.bit_change_counts.resize(m.dat.size());
}
}
}
}
return msgs;
};
// it is thread safe to update data in updateLastMsgsTo. // it is thread safe to update data in updateLastMsgsTo.
// updateEvent will not be called before replayStream::seekedTo return. // updateEvent will not be called before replayStream::seekedTo return.
void AbstractStream::updateLastMsgsTo(double sec) { void AbstractStream::updateLastMsgsTo(double sec) {
uint64_t ts = (sec + routeStartTime()) * 1e9;
const uint64_t delta = std::max(std::ceil(sec / std::thread::hardware_concurrency()), 30.0) * 1e9;
const auto evs = events();
auto first = std::upper_bound(evs->crbegin(), evs->crend(), ts, [](uint64_t ts, auto &e) { return ts > e->mono_time; });
QFutureSynchronizer<QHash<MessageId, CanData>> synchronizer;
while(first != evs->crend()) {
ts = (*first)->mono_time > delta ? (*first)->mono_time - delta : 0;
auto last = std::lower_bound(first, evs->crend(), ts, [](auto &e, uint64_t ts) { return e->mono_time > ts; });
synchronizer.addFuture(QtConcurrent::run(parseEvents, first, last, routeStartTime()));
first = last;
}
synchronizer.waitForFinished();
new_msgs->clear(); new_msgs->clear();
change_trackers.clear(); change_trackers.clear();
can_msgs.clear(); last_msgs.clear();
counters.clear(); counters.clear();
for (const auto &f : synchronizer.futures()) {
auto msgs = f.result(); CanEvent last_event = {.mono_time = uint64_t((sec + routeStartTime()) * 1e9)};
for (auto it = msgs.cbegin(); it != msgs.cend(); ++it) { for (auto &[id, e] : events_) {
counters[it.key()] += it.value().count; auto it = std::lower_bound(e.crbegin(), e.crend(), last_event, std::greater<CanEvent>());
auto m = can_msgs.find(it.key()); if (it != e.crend()) {
if (m == can_msgs.end()) { auto &m = last_msgs[id];
m = can_msgs.insert(it.key(), it.value()); m.dat = QByteArray((const char *)it->dat, it->size);
} else { m.ts = it->mono_time / 1e9 - routeStartTime();
m.value().count += it.value().count; m.count = std::distance(it, e.crend());
} m.freq = m.count / std::max(1.0, m.ts);
m.value().freq = m.value().count / std::max(1.0, m.value().ts); m.last_change_t = QVector<double>(m.dat.size(), m.ts);
m.colors = QVector<QColor>(m.dat.size(), QColor(0, 0, 0, 0));
m.bit_change_counts = QVector<std::array<uint32_t, 8>>(m.dat.size());
counters[id] = m.count;
} }
} }
QTimer::singleShot(0, [this]() { QTimer::singleShot(0, [this]() {
emit updated(); emit updated();
emit msgsReceived(&can_msgs); emit msgsReceived(&last_msgs);
}); });
} }
void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<CanEvent>> &msgs,
std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last) {
for (; first != last; ++first) {
if ((*first)->which == cereal::Event::Which::CAN) {
for (const auto &c : (*first)->event.getCan()) {
auto dat = c.getDat();
auto &m = msgs[{.source = c.getSrc(), .address = c.getAddress()}].emplace_back();
m.size = std::min(dat.size(), std::size(m.dat));
memcpy(m.dat, (uint8_t *)dat.begin(), m.size);
m.mono_time = (*first)->mono_time;
}
last_event_ts = std::max(last_event_ts, (*first)->mono_time);
}
}
}
void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append) {
if (first == last) return;
if (append) {
parseEvents(events_, first, last);
} else {
std::unordered_map<MessageId, std::deque<CanEvent>> new_events;
parseEvents(new_events, first, last);
for (auto &[id, new_e] : new_events) {
auto &e = events_[id];
auto it = std::upper_bound(e.cbegin(), e.cend(), new_e.front());
e.insert(it, new_e.cbegin(), new_e.cend());
}
}
emit eventsMerged();
}

@ -1,7 +1,8 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <deque>
#include <unordered_map>
#include <QColor> #include <QColor>
#include <QHash> #include <QHash>
@ -20,6 +21,14 @@ struct CanData {
QVector<std::array<uint32_t, 8>> bit_change_counts; QVector<std::array<uint32_t, 8>> bit_change_counts;
}; };
struct CanEvent {
uint64_t mono_time;
uint8_t size;
uint8_t dat[64];
inline bool operator<(const CanEvent &r) const { return mono_time < r.mono_time; }
inline bool operator>(const CanEvent &r) const { return mono_time > r.mono_time; }
};
class AbstractStream : public QObject { class AbstractStream : public QObject {
Q_OBJECT Q_OBJECT
@ -27,6 +36,7 @@ public:
AbstractStream(QObject *parent, bool is_live_streaming); AbstractStream(QObject *parent, bool is_live_streaming);
virtual ~AbstractStream() {}; virtual ~AbstractStream() {};
inline bool liveStreaming() const { return is_live_streaming; } inline bool liveStreaming() const { return is_live_streaming; }
inline double lastEventSecond() const { return last_event_ts / 1e9 - routeStartTime(); }
virtual void seekTo(double ts) {} virtual void seekTo(double ts) {}
virtual QString routeName() const = 0; virtual QString routeName() const = 0;
virtual QString carFingerprint() const { return ""; } virtual QString carFingerprint() const { return ""; }
@ -37,11 +47,12 @@ public:
virtual const CanData &lastMessage(const MessageId &id); virtual const CanData &lastMessage(const MessageId &id);
virtual VisionStreamType visionStreamType() const { return VISION_STREAM_ROAD; } virtual VisionStreamType visionStreamType() const { return VISION_STREAM_ROAD; }
virtual const Route *route() const { return nullptr; } virtual const Route *route() const { return nullptr; }
virtual const std::vector<Event *> *events() const = 0;
virtual void setSpeed(float speed) {} virtual void setSpeed(float speed) {}
virtual bool isPaused() const { return false; } virtual bool isPaused() const { return false; }
virtual void pause(bool pause) {} virtual void pause(bool pause) {}
const std::unordered_map<MessageId, std::deque<CanEvent>> &events() const { return events_; }
virtual const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { return {}; } virtual const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { return {}; }
void mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append);
signals: signals:
void paused(); void paused();
@ -55,19 +66,22 @@ signals:
void sourcesUpdated(const QSet<uint8_t> &s); void sourcesUpdated(const QSet<uint8_t> &s);
public: public:
QHash<MessageId, CanData> can_msgs; QHash<MessageId, CanData> last_msgs;
QSet<uint8_t> sources; QSet<uint8_t> sources;
protected: protected:
virtual void process(QHash<MessageId, CanData> *); virtual void process(QHash<MessageId, CanData> *);
bool updateEvent(const Event *event); bool updateEvent(const Event *event);
void updateLastMsgsTo(double sec); void updateLastMsgsTo(double sec);
void parseEvents(std::unordered_map<MessageId, std::deque<CanEvent>> &msgs, std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last);
bool is_live_streaming = false; bool is_live_streaming = false;
std::atomic<bool> processing = false; std::atomic<bool> processing = false;
QHash<MessageId, uint32_t> counters; QHash<MessageId, uint32_t> counters;
std::unique_ptr<QHash<MessageId, CanData>> new_msgs; std::unique_ptr<QHash<MessageId, CanData>> new_msgs;
QHash<MessageId, ChangeTracker> change_trackers; QHash<MessageId, ChangeTracker> change_trackers;
std::unordered_map<MessageId, std::deque<CanEvent>> events_;
uint64_t last_event_ts = 0;
}; };
// A global pointer referring to the unique AbstractStream object // A global pointer referring to the unique AbstractStream object

@ -52,7 +52,7 @@ void LiveStream::handleEvent(Event *evt) {
auto it = std::upper_bound(received.cbegin(), received.cend(), current_ts, [](uint64_t ts, auto &e) { auto it = std::upper_bound(received.cbegin(), received.cend(), current_ts, [](uint64_t ts, auto &e) {
return ts < e->mono_time; return ts < e->mono_time;
}); });
if (it != can_events.cend()) { if (it != received.cend()) {
bool skip = (nanos_since_boot() - last_update_ts) < ((*it)->mono_time - current_ts) / speed_; bool skip = (nanos_since_boot() - last_update_ts) < ((*it)->mono_time - current_ts) / speed_;
if (skip) return; if (skip) return;
@ -68,16 +68,15 @@ void LiveStream::handleEvent(Event *evt) {
void LiveStream::process(QHash<MessageId, CanData> *last_messages) { void LiveStream::process(QHash<MessageId, CanData> *last_messages) {
{ {
std::lock_guard lk(lock); std::lock_guard lk(lock);
uint64_t last_ts = can_events.empty() ? 0 : can_events.back()->mono_time; auto first = std::upper_bound(received.cbegin(), received.cend(), last_event_ts, [](uint64_t ts, auto &e) {
auto first = std::upper_bound(received.cbegin(), received.cend(), last_ts, [](uint64_t ts, auto &e) {
return ts < e->mono_time; return ts < e->mono_time;
}); });
can_events.insert(can_events.end(), first, received.cend()); mergeEvents(first, received.cend(), true);
if (speed_ == 1) { if (speed_ == 1) {
received.clear(); received.clear();
messages.clear();
} }
} }
emit eventsMerged();
AbstractStream::process(last_messages); AbstractStream::process(last_messages);
} }

@ -16,27 +16,23 @@ public:
void setSpeed(float speed) override { speed_ = std::min<float>(1.0, speed); } void setSpeed(float speed) override { speed_ = std::min<float>(1.0, speed); }
bool isPaused() const override { return pause_; } bool isPaused() const override { return pause_; }
void pause(bool pause) override; void pause(bool pause) override;
const std::vector<Event *> *events() const override { return &can_events; }
protected: protected:
void process(QHash<MessageId, CanData> *) override;
virtual void handleEvent(Event *evt); virtual void handleEvent(Event *evt);
virtual void streamThread(); virtual void streamThread();
void process(QHash<MessageId, CanData> *) override;
struct Msg { struct Msg {
Msg(Message *m) { Msg(Message *m) {
event = ::new Event(aligned_buf.align(m)); event = ::new Event(aligned_buf.align(m));
delete m; delete m;
} }
~Msg() { ~Msg() { ::delete event; }
::delete event;
}
Event *event; Event *event;
AlignedBuffer aligned_buf; AlignedBuffer aligned_buf;
}; };
mutable std::mutex lock; mutable std::mutex lock;
std::vector<Event *> can_events;
std::vector<Event *> received; std::vector<Event *> received;
std::deque<Msg> messages; std::deque<Msg> messages;
std::atomic<uint64_t> start_ts = 0; std::atomic<uint64_t> start_ts = 0;

@ -14,13 +14,24 @@ static bool event_filter(const Event *e, void *opaque) {
return ((ReplayStream *)opaque)->eventFilter(e); return ((ReplayStream *)opaque)->eventFilter(e);
} }
void ReplayStream::mergeSegments() {
for (auto &[n, seg] : replay->segments()) {
if (seg && seg->isLoaded() && !processed_segments.count(n)) {
const auto &events = seg->log->events;
bool append = processed_segments.empty() || *processed_segments.rbegin() < n;
processed_segments.insert(n);
mergeEvents(events.cbegin(), events.cend(), append);
}
}
}
bool ReplayStream::loadRoute(const QString &route, const QString &data_dir) { bool ReplayStream::loadRoute(const QString &route, const QString &data_dir) {
replay.reset(new Replay(route, {"can", "roadEncodeIdx", "wideRoadEncodeIdx", "carParams"}, {}, nullptr, replay_flags, data_dir, this)); replay.reset(new Replay(route, {"can", "roadEncodeIdx", "wideRoadEncodeIdx", "carParams"}, {}, nullptr, replay_flags, data_dir, this));
replay->setSegmentCacheLimit(settings.max_cached_minutes); replay->setSegmentCacheLimit(settings.max_cached_minutes);
replay->installEventFilter(event_filter, this); replay->installEventFilter(event_filter, this);
QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo); QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo);
QObject::connect(replay.get(), &Replay::segmentsMerged, this, &AbstractStream::eventsMerged);
QObject::connect(replay.get(), &Replay::streamStarted, this, &AbstractStream::streamStarted); QObject::connect(replay.get(), &Replay::streamStarted, this, &AbstractStream::streamStarted);
QObject::connect(replay.get(), &Replay::segmentsMerged, this, &ReplayStream::mergeSegments);
if (replay->load()) { if (replay->load()) {
const auto &segments = replay->route()->segments(); const auto &segments = replay->route()->segments();
if (std::none_of(segments.begin(), segments.end(), [](auto &s) { return s.second.rlog.length() > 0; })) { if (std::none_of(segments.begin(), segments.end(), [](auto &s) { return s.second.rlog.length() > 0; })) {

@ -20,13 +20,14 @@ public:
inline double currentSec() const override { return replay->currentSeconds(); } inline double currentSec() const override { return replay->currentSeconds(); }
inline QDateTime currentDateTime() const override { return replay->currentDateTime(); } inline QDateTime currentDateTime() const override { return replay->currentDateTime(); }
inline const Route *route() const override { return replay->route(); } inline const Route *route() const override { return replay->route(); }
inline const std::vector<Event *> *events() const override { return replay->events(); }
inline void setSpeed(float speed) override { replay->setSpeed(speed); } inline void setSpeed(float speed) override { replay->setSpeed(speed); }
inline bool isPaused() const override { return replay->isPaused(); } inline bool isPaused() const override { return replay->isPaused(); }
void pause(bool pause) override; void pause(bool pause) override;
inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() override { return replay->getTimeline(); } inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() override { return replay->getTimeline(); }
private: private:
void mergeSegments();
std::unique_ptr<Replay> replay = nullptr; std::unique_ptr<Replay> replay = nullptr;
uint32_t replay_flags = REPLAY_FLAG_NONE; uint32_t replay_flags = REPLAY_FLAG_NONE;
std::set<int> processed_segments;
}; };

@ -19,7 +19,7 @@ FindSimilarBitsDlg::FindSimilarBitsDlg(QWidget *parent) : QDialog(parent, Qt::Wi
QHBoxLayout *form_layout = new QHBoxLayout(); QHBoxLayout *form_layout = new QHBoxLayout();
bus_combo = new QComboBox(this); bus_combo = new QComboBox(this);
QSet<uint8_t> bus_set; QSet<uint8_t> bus_set;
for (auto it = can->can_msgs.begin(); it != can->can_msgs.end(); ++it) { for (auto it = can->last_msgs.begin(); it != can->last_msgs.end(); ++it) {
bus_set << it.key().source; bus_set << it.key().source;
} }
for (uint8_t bus : bus_set) { for (uint8_t bus : bus_set) {
@ -102,29 +102,24 @@ void FindSimilarBitsDlg::find() {
QList<FindSimilarBitsDlg::mismatched_struct> FindSimilarBitsDlg::calcBits(uint8_t bus, uint32_t selected_address, int byte_idx, int bit_idx, int min_msgs_cnt) { QList<FindSimilarBitsDlg::mismatched_struct> FindSimilarBitsDlg::calcBits(uint8_t bus, uint32_t selected_address, int byte_idx, int bit_idx, int min_msgs_cnt) {
QHash<uint32_t, QVector<uint32_t>> mismatches; QHash<uint32_t, QVector<uint32_t>> mismatches;
QHash<uint32_t, uint32_t> msg_count; QHash<uint32_t, uint32_t> msg_count;
auto events = can->events();
int bit_to_find = -1; int bit_to_find = -1;
for (auto e : *events) { for (const auto &[id, msg] : can->events()) {
if (e->which == cereal::Event::Which::CAN) { if (id.source == bus) {
for (const auto &c : e->event.getCan()) { for (const auto &c : msg) {
if (c.getSrc() == bus) { if (id.address == selected_address && c.size > byte_idx) {
const auto dat = c.getDat(); bit_to_find = ((c.dat[byte_idx] >> (7 - bit_idx)) & 1) != 0;
uint32_t address = c.getAddress(); }
if (address == selected_address && dat.size() > byte_idx) { ++msg_count[id.address];
bit_to_find = ((dat[byte_idx] >> (7 - bit_idx)) & 1) != 0; if (bit_to_find == -1) continue;
}
++msg_count[address];
if (bit_to_find == -1) continue;
auto &mismatched = mismatches[address]; auto &mismatched = mismatches[id.address];
if (mismatched.size() < dat.size() * 8) { if (mismatched.size() < c.size * 8) {
mismatched.resize(dat.size() * 8); mismatched.resize(c.size * 8);
} }
for (int i = 0; i < dat.size(); ++i) { for (int i = 0; i < c.size; ++i) {
for (int j = 0; j < 8; ++j) { for (int j = 0; j < 8; ++j) {
int bit = ((dat[i] >> (7 - j)) & 1) != 0; int bit = ((c.dat[i] >> (7 - j)) & 1) != 0;
mismatched[i * 8 + j] += (bit != bit_to_find); mismatched[i * 8 + j] += (bit != bit_to_find);
}
} }
} }
} }

@ -71,6 +71,7 @@ public:
inline void setSpeed(float speed) { speed_ = speed; } inline void setSpeed(float speed) { speed_ = speed; }
inline float getSpeed() const { return speed_; } inline float getSpeed() const { return speed_; }
inline const std::vector<Event *> *events() const { return events_.get(); } inline const std::vector<Event *> *events() const { return events_.get(); }
inline const std::map<int, std::unique_ptr<Segment>> &segments() const { return segments_; };
inline const std::string &carFingerprint() const { return car_fingerprint_; } inline const std::string &carFingerprint() const { return car_fingerprint_; }
inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() {
std::lock_guard lk(timeline_lock); std::lock_guard lk(timeline_lock);

Loading…
Cancel
Save