cabana: speed up parsing can events (#27874)

speed up parsing can events

cleanup include
mqb-freewheeling
Dean Lee 3 years ago committed by GitHub
parent cca654cfa9
commit f15cb01137
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      tools/cabana/historylog.cc
  2. 2
      tools/cabana/historylog.h
  3. 121
      tools/cabana/streams/abstractstream.cc
  4. 12
      tools/cabana/streams/abstractstream.h
  5. 60
      tools/cabana/util.cc
  6. 17
      tools/cabana/util.h

@ -39,7 +39,7 @@ void HistoryLogModel::refresh(bool fetch_message) {
last_fetch_time = 0; last_fetch_time = 0;
has_more_data = true; has_more_data = true;
messages.clear(); messages.clear();
hex_colors.clear(); hex_colors = {};
if (fetch_message) { if (fetch_message) {
updateState(); updateState();
} }
@ -146,7 +146,7 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_ti
auto msgs = fetchData(first, events.rend(), min_time); auto msgs = fetchData(first, events.rend(), min_time);
if (update_colors && (min_time > 0 || messages.empty())) { if (update_colors && (min_time > 0 || messages.empty())) {
for (auto it = msgs.rbegin(); it != msgs.rend(); ++it) { for (auto it = msgs.rbegin(); it != msgs.rend(); ++it) {
hex_colors.compute(it->data, it->mono_time / (double)1e9, freq); hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, freq);
it->colors = hex_colors.colors; it->colors = hex_colors.colors;
} }
} }
@ -157,7 +157,7 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_ti
auto msgs = fetchData(first, events.end(), 0); auto msgs = fetchData(first, events.end(), 0);
if (update_colors) { if (update_colors) {
for (auto it = msgs.begin(); it != msgs.end(); ++it) { for (auto it = msgs.begin(); it != msgs.end(); ++it) {
hex_colors.compute(it->data, it->mono_time / (double)1e9, freq); hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, freq);
it->colors = hex_colors.colors; it->colors = hex_colors.colors;
} }
} }

@ -54,7 +54,7 @@ public:
std::deque<Message> fetchData(uint64_t from_time, uint64_t min_time = 0); std::deque<Message> fetchData(uint64_t from_time, uint64_t min_time = 0);
MessageId msg_id; MessageId msg_id;
ChangeTracker hex_colors; CanData hex_colors;
bool has_more_data = true; bool has_more_data = true;
const int batch_size = 50; const int batch_size = 50;
int filter_sig_idx = -1; int filter_sig_idx = -1;

@ -11,8 +11,14 @@ AbstractStream::AbstractStream(QObject *parent, bool is_live_streaming) : is_liv
} }
void AbstractStream::process(QHash<MessageId, CanData> *messages) { void AbstractStream::process(QHash<MessageId, CanData> *messages) {
auto prev_src_size = sources.size();
for (auto it = messages->begin(); it != messages->end(); ++it) { for (auto it = messages->begin(); it != messages->end(); ++it) {
last_msgs[it.key()] = it.value(); const auto &id = it.key();
last_msgs[id] = it.value();
sources.insert(id.source);
}
if (sources.size() != prev_src_size) {
emit sourcesUpdated(sources);
} }
emit updated(); emit updated();
emit msgsReceived(messages); emit msgsReceived(messages);
@ -22,34 +28,24 @@ void AbstractStream::process(QHash<MessageId, CanData> *messages) {
bool AbstractStream::updateEvent(const Event *event) { bool AbstractStream::updateEvent(const Event *event) {
static double prev_update_ts = 0; static double prev_update_ts = 0;
if (event->which == cereal::Event::Which::CAN) { if (event->which == cereal::Event::Which::CAN) {
double current_sec = event->mono_time / 1e9 - routeStartTime(); double current_sec = event->mono_time / 1e9 - routeStartTime();
for (const auto &c : event->event.getCan()) { for (const auto &c : event->event.getCan()) {
MessageId id = {.source = c.getSrc(), .address = c.getAddress()}; MessageId id = {.source = c.getSrc(), .address = c.getAddress()};
CanData &data = (*new_msgs)[id]; const auto dat = c.getDat();
data.ts = current_sec; all_msgs[id].compute((const char *)dat.begin(), dat.size(), current_sec);
data.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size()); if (!new_msgs->contains(id)) {
data.count = ++counters[id]; new_msgs->insert(id, {});
data.freq = data.count / std::max(1.0, current_sec);
auto &tracker = change_trackers[id];
tracker.compute(data.dat, data.ts, data.freq);
data.colors = tracker.colors;
data.last_change_t = tracker.last_change_t;
data.bit_change_counts = tracker.bit_change_counts;
if (!sources.contains(id.source)) {
sources.insert(id.source);
emit sourcesUpdated(sources);
} }
} }
double ts = millis_since_boot(); double ts = millis_since_boot();
if ((ts - prev_update_ts) > (1000.0 / settings.fps) && !processing && !new_msgs->isEmpty()) {
// delay posting CAN message if UI thread is busy // delay posting CAN message if UI thread is busy
if ((ts - prev_update_ts) > (1000.0 / settings.fps) && !processing && !new_msgs->isEmpty()) {
processing = true; processing = true;
prev_update_ts = ts; prev_update_ts = ts;
for (auto it = new_msgs->begin(); it != new_msgs->end(); ++it) {
it.value() = all_msgs[it.key()];
}
// use pointer to avoid data copy in queued connection. // use pointer to avoid data copy in queued connection.
emit received(new_msgs.release()); emit received(new_msgs.release());
new_msgs.reset(new QHash<MessageId, CanData>); new_msgs.reset(new QHash<MessageId, CanData>);
@ -69,25 +65,22 @@ const CanData &AbstractStream::lastMessage(const MessageId &id) {
// updateEvent will not be called before replayStream::seekedTo return. // updateEvent will not be called before replayStream::seekedTo return.
void AbstractStream::updateLastMsgsTo(double sec) { void AbstractStream::updateLastMsgsTo(double sec) {
new_msgs->clear(); new_msgs->clear();
change_trackers.clear(); all_msgs.clear();
last_msgs.clear(); last_msgs.clear();
counters.clear();
CanEvent last_event = {.mono_time = uint64_t((sec + routeStartTime()) * 1e9)}; CanEvent last_event = {.mono_time = uint64_t((sec + routeStartTime()) * 1e9)};
for (auto &[id, e] : events_) { for (auto &[id, e] : events_) {
auto it = std::lower_bound(e.crbegin(), e.crend(), last_event, std::greater<CanEvent>()); auto it = std::lower_bound(e.crbegin(), e.crend(), last_event, std::greater<CanEvent>());
if (it != e.crend()) { if (it != e.crend()) {
auto &m = last_msgs[id]; double ts = it->mono_time / 1e9 - routeStartTime();
m.dat = QByteArray((const char *)it->dat, it->size); auto &m = all_msgs[id];
m.ts = it->mono_time / 1e9 - routeStartTime(); m.compute((const char *)it->dat, it->size, ts);
m.count = std::distance(it, e.crend()); m.count = std::distance(it, e.crend());
m.freq = m.count / std::max(1.0, m.ts); m.freq = m.count / std::max(1.0, ts);
m.last_change_t = QVector<double>(m.dat.size(), m.ts);
m.colors = QVector<QColor>(m.dat.size(), QColor(0, 0, 0, 0));
m.bit_change_counts = QVector<std::array<uint32_t, 8>>(m.dat.size());
counters[id] = m.count;
} }
} }
last_msgs = all_msgs;
// use a timer to prevent recursive calls
QTimer::singleShot(0, [this]() { QTimer::singleShot(0, [this]() {
emit updated(); emit updated();
emit msgsReceived(&last_msgs); emit msgsReceived(&last_msgs);
@ -96,18 +89,20 @@ void AbstractStream::updateLastMsgsTo(double sec) {
void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<CanEvent>> &msgs, void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<CanEvent>> &msgs,
std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last) { std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last) {
uint64_t ts = 0;
for (; first != last; ++first) { for (; first != last; ++first) {
if ((*first)->which == cereal::Event::Which::CAN) { if ((*first)->which == cereal::Event::Which::CAN) {
ts = (*first)->mono_time;
for (const auto &c : (*first)->event.getCan()) { for (const auto &c : (*first)->event.getCan()) {
auto dat = c.getDat(); auto dat = c.getDat();
auto &m = msgs[{.source = c.getSrc(), .address = c.getAddress()}].emplace_back(); auto &m = msgs[{.source = c.getSrc(), .address = c.getAddress()}].emplace_back();
m.size = std::min(dat.size(), std::size(m.dat)); m.size = std::min(dat.size(), std::size(m.dat));
memcpy(m.dat, (uint8_t *)dat.begin(), m.size); memcpy(m.dat, (uint8_t *)dat.begin(), m.size);
m.mono_time = (*first)->mono_time; m.mono_time = ts;
} }
last_event_ts = std::max(last_event_ts, (*first)->mono_time);
} }
} }
last_event_ts = std::max(last_event_ts, ts);
} }
void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append) { void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append) {
@ -126,3 +121,67 @@ void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std
} }
emit eventsMerged(); emit eventsMerged();
} }
// CanData
constexpr int periodic_threshold = 10;
constexpr int start_alpha = 128;
constexpr float fade_time = 2.0;
const QColor CYAN = QColor(0, 187, 255, start_alpha);
const QColor RED = QColor(255, 0, 0, start_alpha);
const QColor GREYISH_BLUE = QColor(102, 86, 169, start_alpha / 2);
const QColor CYAN_LIGHTER = QColor(0, 187, 255, start_alpha).lighter(135);
const QColor RED_LIGHTER = QColor(255, 0, 0, start_alpha).lighter(135);
const QColor GREYISH_BLUE_LIGHTER = QColor(102, 86, 169, start_alpha / 2).lighter(135);
static inline QColor blend(const QColor &a, const QColor &b) {
return QColor((a.red() + b.red()) / 2, (a.green() + b.green()) / 2, (a.blue() + b.blue()) / 2, (a.alpha() + b.alpha()) / 2);
}
void CanData::compute(const char *can_data, const int size, double current_sec, uint32_t in_freq) {
ts = current_sec;
++count;
freq = in_freq == 0 ? count / std::max(1.0, current_sec) : in_freq;
if (dat.size() != size) {
dat.resize(size);
bit_change_counts.resize(size);
colors = QVector(size, QColor(0, 0, 0, 0));
last_change_t = QVector(size, ts);
} else {
bool lighter = settings.theme == DARK_THEME;
const QColor &cyan = !lighter ? CYAN : CYAN_LIGHTER;
const QColor &red = !lighter ? RED : RED_LIGHTER;
const QColor &greyish_blue = !lighter ? GREYISH_BLUE : GREYISH_BLUE_LIGHTER;
for (int i = 0; i < size; ++i) {
const uint8_t last = dat[i];
const uint8_t cur = can_data[i];
if (last != cur) {
double delta_t = ts - last_change_t[i];
if (delta_t * freq > periodic_threshold) {
// Last change was while ago, choose color based on delta up or down
colors[i] = (cur > last) ? cyan : red;
} else {
// Periodic changes
colors[i] = blend(colors[i], greyish_blue);
}
// Track bit level changes
const uint8_t tmp = (cur ^ last);
for (int bit = 0; bit < 8; bit++) {
if (tmp & (1 << bit)) {
bit_change_counts[i][bit] += 1;
}
}
last_change_t[i] = ts;
} else {
// Fade out
float alpha_delta = 1.0 / (freq + 1) / fade_time;
colors[i].setAlphaF(std::max(0.0, colors[i].alphaF() - alpha_delta));
}
}
}
memcpy(dat.data(), can_data, size);
}

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <array>
#include <atomic> #include <atomic>
#include <deque> #include <deque>
#include <unordered_map> #include <unordered_map>
@ -12,6 +13,8 @@
#include "tools/replay/replay.h" #include "tools/replay/replay.h"
struct CanData { struct CanData {
void compute(const char *dat, const int size, double current_sec, uint32_t in_freq = 0);
double ts = 0.; double ts = 0.;
uint32_t count = 0; uint32_t count = 0;
uint32_t freq = 0; uint32_t freq = 0;
@ -22,9 +25,9 @@ struct CanData {
}; };
struct CanEvent { struct CanEvent {
uint64_t mono_time; uint64_t mono_time = 0;
uint8_t size; uint8_t size = 0;
uint8_t dat[64]; uint8_t dat[64] = {};
inline bool operator<(const CanEvent &r) const { return mono_time < r.mono_time; } inline bool operator<(const CanEvent &r) const { return mono_time < r.mono_time; }
inline bool operator>(const CanEvent &r) const { return mono_time > r.mono_time; } inline bool operator>(const CanEvent &r) const { return mono_time > r.mono_time; }
}; };
@ -78,9 +81,8 @@ protected:
bool is_live_streaming = false; bool is_live_streaming = false;
std::atomic<bool> processing = false; std::atomic<bool> processing = false;
QHash<MessageId, uint32_t> counters;
std::unique_ptr<QHash<MessageId, CanData>> new_msgs; std::unique_ptr<QHash<MessageId, CanData>> new_msgs;
QHash<MessageId, ChangeTracker> change_trackers; QHash<MessageId, CanData> all_msgs;
std::unordered_map<MessageId, std::deque<CanEvent>> events_; std::unordered_map<MessageId, std::deque<CanEvent>> events_;
uint64_t last_event_ts = 0; uint64_t last_event_ts = 0;
}; };

@ -1,71 +1,11 @@
#include "tools/cabana/util.h" #include "tools/cabana/util.h"
#include <QApplication>
#include <QFontDatabase> #include <QFontDatabase>
#include <QPainter> #include <QPainter>
#include <QPixmapCache> #include <QPixmapCache>
#include <cmath>
#include <limits>
#include "selfdrive/ui/qt/util.h" #include "selfdrive/ui/qt/util.h"
static inline QColor blend(const QColor &a, const QColor &b) {
return QColor((a.red() + b.red()) / 2, (a.green() + b.green()) / 2, (a.blue() + b.blue()) / 2, (a.alpha() + b.alpha()) / 2);
}
void ChangeTracker::compute(const QByteArray &dat, double ts, uint32_t freq) {
if (prev_dat.size() != dat.size()) {
colors.resize(dat.size());
last_change_t.resize(dat.size());
bit_change_counts.resize(dat.size());
std::fill(colors.begin(), colors.end(), QColor(0, 0, 0, 0));
std::fill(last_change_t.begin(), last_change_t.end(), ts);
} else {
int factor = settings.theme == DARK_THEME ? 135 : 0;
QColor cyan = QColor(0, 187, 255, start_alpha).lighter(factor);
QColor red = QColor(255, 0, 0, start_alpha).lighter(factor);
QColor greyish_blue = QColor(102, 86, 169, start_alpha / 2).lighter(factor);
for (int i = 0; i < dat.size(); ++i) {
const uint8_t last = prev_dat[i];
const uint8_t cur = dat[i];
if (last != cur) {
double delta_t = ts - last_change_t[i];
if (delta_t * freq > periodic_threshold) {
// Last change was while ago, choose color based on delta up or down
colors[i] = (cur > last) ? cyan : red;
} else {
// Periodic changes
colors[i] = blend(colors[i], greyish_blue);
}
// Track bit level changes
for (int bit = 0; bit < 8; bit++) {
if ((cur ^ last) & (1 << bit)) {
bit_change_counts[i][bit] += 1;
}
}
last_change_t[i] = ts;
} else {
// Fade out
float alpha_delta = 1.0 / (freq + 1) / fade_time;
colors[i].setAlphaF(std::max(0.0, colors[i].alphaF() - alpha_delta));
}
}
}
prev_dat = dat;
}
void ChangeTracker::clear() {
prev_dat.clear();
last_change_t.clear();
bit_change_counts.clear();
colors.clear();
}
// SegmentTree // SegmentTree
void SegmentTree::build(const QVector<QPointF> &arr) { void SegmentTree::build(const QVector<QPointF> &arr) {

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <array>
#include <cmath> #include <cmath>
#include <QApplication> #include <QApplication>
@ -17,22 +16,6 @@
#include "tools/cabana/dbc/dbc.h" #include "tools/cabana/dbc/dbc.h"
#include "tools/cabana/settings.h" #include "tools/cabana/settings.h"
class ChangeTracker {
public:
void compute(const QByteArray &dat, double ts, uint32_t freq);
void clear();
QVector<double> last_change_t;
QVector<QColor> colors;
QVector<std::array<uint32_t, 8>> bit_change_counts;
private:
const int periodic_threshold = 10;
const int start_alpha = 128;
const float fade_time = 2.0;
QByteArray prev_dat;
};
class LogSlider : public QSlider { class LogSlider : public QSlider {
Q_OBJECT Q_OBJECT

Loading…
Cancel
Save