cabana: fixed the multi-threading issues of AbstractStream (#28590)

* fix multi-threading issues

* protect masks with mutex
pull/28601/head
Dean Lee 2 years ago committed by GitHub
parent 2b49a4754b
commit e80440dc5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      tools/cabana/dbc/dbc.cc
  2. 2
      tools/cabana/dbc/dbc.h
  3. 8
      tools/cabana/dbc/dbcmanager.cc
  4. 3
      tools/cabana/dbc/dbcmanager.h
  5. 5
      tools/cabana/historylog.cc
  6. 1
      tools/cabana/messageswidget.cc
  7. 35
      tools/cabana/streams/abstractstream.cc
  8. 5
      tools/cabana/streams/abstractstream.h

@ -76,7 +76,7 @@ QString cabana::Msg::newSignalName() {
} }
void cabana::Msg::update() { void cabana::Msg::update() {
mask = QVector<uint8_t>(size, 0x00).toList(); mask.assign(size, 0x00);
multiplexor = nullptr; multiplexor = nullptr;
// sort signals // sort signals

@ -102,7 +102,7 @@ public:
QString comment; QString comment;
std::vector<cabana::Signal *> sigs; std::vector<cabana::Signal *> sigs;
QList<uint8_t> mask; std::vector<uint8_t> mask;
cabana::Signal *multiplexor = nullptr; cabana::Signal *multiplexor = nullptr;
}; };

@ -58,6 +58,7 @@ void DBCManager::addSignal(const MessageId &id, const cabana::Signal &sig) {
if (auto m = msg(id)) { if (auto m = msg(id)) {
if (auto s = m->addSignal(sig)) { if (auto s = m->addSignal(sig)) {
emit signalAdded(id, s); emit signalAdded(id, s);
emit maskUpdated();
} }
} }
} }
@ -66,6 +67,7 @@ void DBCManager::updateSignal(const MessageId &id, const QString &sig_name, cons
if (auto m = msg(id)) { if (auto m = msg(id)) {
if (auto s = m->updateSignal(sig_name, sig)) { if (auto s = m->updateSignal(sig_name, sig)) {
emit signalUpdated(s); emit signalUpdated(s);
emit maskUpdated();
} }
} }
} }
@ -75,6 +77,7 @@ void DBCManager::removeSignal(const MessageId &id, const QString &sig_name) {
if (auto s = m->sig(sig_name)) { if (auto s = m->sig(sig_name)) {
emit signalRemoved(s); emit signalRemoved(s);
m->removeSignal(sig_name); m->removeSignal(sig_name);
emit maskUpdated();
} }
} }
} }
@ -91,6 +94,7 @@ void DBCManager::removeMsg(const MessageId &id) {
assert(dbc_file); // This should be impossible assert(dbc_file); // This should be impossible
dbc_file->removeMsg(id); dbc_file->removeMsg(id);
emit msgRemoved(id); emit msgRemoved(id);
emit maskUpdated();
} }
QString DBCManager::newMsgName(const MessageId &id) { QString DBCManager::newMsgName(const MessageId &id) {
@ -102,8 +106,8 @@ QString DBCManager::newSignalName(const MessageId &id) {
return m ? m->newSignalName() : ""; return m ? m->newSignalName() : "";
} }
const QList<uint8_t> &DBCManager::mask(const MessageId &id) { const std::vector<uint8_t> &DBCManager::mask(const MessageId &id) {
static QList<uint8_t> empty_mask; static std::vector<uint8_t> empty_mask;
auto m = msg(id); auto m = msg(id);
return m ? m->mask : empty_mask; return m ? m->mask : empty_mask;
} }

@ -32,7 +32,7 @@ public:
QString newMsgName(const MessageId &id); QString newMsgName(const MessageId &id);
QString newSignalName(const MessageId &id); QString newSignalName(const MessageId &id);
const QList<uint8_t>& mask(const MessageId &id); const std::vector<uint8_t>& mask(const MessageId &id);
const std::map<uint32_t, cabana::Msg> &getMessages(uint8_t source); const std::map<uint32_t, cabana::Msg> &getMessages(uint8_t source);
cabana::Msg *msg(const MessageId &id); cabana::Msg *msg(const MessageId &id);
@ -57,6 +57,7 @@ signals:
void msgUpdated(MessageId id); void msgUpdated(MessageId id);
void msgRemoved(MessageId id); void msgRemoved(MessageId id);
void DBCFileChanged(); void DBCFileChanged();
void maskUpdated();
private: private:
std::map<int, std::shared_ptr<DBCFile>> dbc_files; std::map<int, std::shared_ptr<DBCFile>> dbc_files;

@ -141,7 +141,6 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(InputIt first, I
} }
std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_time, uint64_t min_time) { std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_time, uint64_t min_time) {
const QList<uint8_t> mask;
const auto &events = can->events(msg_id); const auto &events = can->events(msg_id);
const auto freq = can->lastMessage(msg_id).freq; const auto freq = can->lastMessage(msg_id).freq;
const bool update_colors = !display_signals_mode || sigs.empty(); const bool update_colors = !display_signals_mode || sigs.empty();
@ -154,7 +153,7 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_ti
auto msgs = fetchData(first, events.rend(), min_time); auto msgs = fetchData(first, events.rend(), min_time);
if (update_colors && (min_time > 0 || messages.empty())) { if (update_colors && (min_time > 0 || messages.empty())) {
for (auto it = msgs.rbegin(); it != msgs.rend(); ++it) { for (auto it = msgs.rbegin(); it != msgs.rend(); ++it) {
hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, speed, mask, freq); hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, speed, nullptr, freq);
it->colors = hex_colors.colors; it->colors = hex_colors.colors;
} }
} }
@ -167,7 +166,7 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_ti
auto msgs = fetchData(first, events.cend(), 0); auto msgs = fetchData(first, events.cend(), 0);
if (update_colors) { if (update_colors) {
for (auto it = msgs.begin(); it != msgs.end(); ++it) { for (auto it = msgs.begin(); it != msgs.end(); ++it) {
hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, speed, mask, freq); hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, speed, nullptr, freq);
it->colors = hex_colors.colors; it->colors = hex_colors.colors;
} }
} }

@ -83,6 +83,7 @@ MessagesWidget::MessagesWidget(QWidget *parent) : QWidget(parent) {
}); });
QObject::connect(suppress_defined_signals, &QCheckBox::stateChanged, [=](int state) { QObject::connect(suppress_defined_signals, &QCheckBox::stateChanged, [=](int state) {
settings.suppress_defined_signals = (state == Qt::Checked); settings.suppress_defined_signals = (state == Qt::Checked);
emit settings.changed();
}); });
QObject::connect(can, &AbstractStream::msgsReceived, model, &MessageListModel::msgsReceived); QObject::connect(can, &AbstractStream::msgsReceived, model, &MessageListModel::msgsReceived);
QObject::connect(dbc(), &DBCManager::DBCFileChanged, this, &MessagesWidget::dbcModified); QObject::connect(dbc(), &DBCManager::DBCFileChanged, this, &MessagesWidget::dbcModified);

@ -12,6 +12,9 @@ StreamNotifier *StreamNotifier::instance() {
AbstractStream::AbstractStream(QObject *parent) : new_msgs(new QHash<MessageId, CanData>()), QObject(parent) { AbstractStream::AbstractStream(QObject *parent) : new_msgs(new QHash<MessageId, CanData>()), QObject(parent) {
assert(parent != nullptr); assert(parent != nullptr);
QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo); QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo);
QObject::connect(&settings, &Settings::changed, this, &AbstractStream::updateMasks);
QObject::connect(dbc(), &DBCManager::DBCFileChanged, this, &AbstractStream::updateMasks);
QObject::connect(dbc(), &DBCManager::maskUpdated, this, &AbstractStream::updateMasks);
QObject::connect(this, &AbstractStream::streamStarted, [this]() { QObject::connect(this, &AbstractStream::streamStarted, [this]() {
emit StreamNotifier::instance()->changingStream(); emit StreamNotifier::instance()->changingStream();
delete can; delete can;
@ -20,6 +23,20 @@ AbstractStream::AbstractStream(QObject *parent) : new_msgs(new QHash<MessageId,
}); });
} }
void AbstractStream::updateMasks() {
std::lock_guard lk(mutex);
masks.clear();
if (settings.suppress_defined_signals) {
for (auto s : sources) {
if (auto f = dbc()->findDBCFile(s)) {
for (const auto &[address, m] : f->getMessages()) {
masks[{.source = (uint8_t)s, .address = address}] = m.mask;
}
}
}
}
}
void AbstractStream::updateMessages(QHash<MessageId, CanData> *messages) { void AbstractStream::updateMessages(QHash<MessageId, CanData> *messages) {
auto prev_src_size = sources.size(); auto prev_src_size = sources.size();
auto prev_msg_size = last_msgs.size(); auto prev_msg_size = last_msgs.size();
@ -29,6 +46,7 @@ void AbstractStream::updateMessages(QHash<MessageId, CanData> *messages) {
sources.insert(id.source); sources.insert(id.source);
} }
if (sources.size() != prev_src_size) { if (sources.size() != prev_src_size) {
updateMasks();
emit sourcesUpdated(sources); emit sourcesUpdated(sources);
} }
emit updated(); emit updated();
@ -38,7 +56,9 @@ void AbstractStream::updateMessages(QHash<MessageId, CanData> *messages) {
} }
void AbstractStream::updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size) { void AbstractStream::updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size) {
QList<uint8_t> mask = settings.suppress_defined_signals ? dbc()->mask(id) : QList<uint8_t>(); std::lock_guard lk(mutex);
auto mask_it = masks.find(id);
std::vector<uint8_t> *mask = mask_it == masks.end() ? nullptr : &mask_it->second;
all_msgs[id].compute((const char *)data, size, sec, getSpeed(), mask); all_msgs[id].compute((const char *)data, size, sec, getSpeed(), mask);
if (!new_msgs->contains(id)) { if (!new_msgs->contains(id)) {
new_msgs->insert(id, {}); new_msgs->insert(id, {});
@ -47,7 +67,8 @@ void AbstractStream::updateEvent(const MessageId &id, double sec, const uint8_t
bool AbstractStream::postEvents() { bool AbstractStream::postEvents() {
// delay posting CAN message if UI thread is busy // delay posting CAN message if UI thread is busy
if (processing.exchange(true) == false) { if (processing == false) {
processing = true;
for (auto it = new_msgs->begin(); it != new_msgs->end(); ++it) { for (auto it = new_msgs->begin(); it != new_msgs->end(); ++it) {
it.value() = all_msgs[it.key()]; it.value() = all_msgs[it.key()];
} }
@ -84,7 +105,8 @@ void AbstractStream::updateLastMsgsTo(double sec) {
auto it = std::lower_bound(ev.crbegin(), ev.crend(), last_ts, [](auto e, uint64_t ts) { auto it = std::lower_bound(ev.crbegin(), ev.crend(), last_ts, [](auto e, uint64_t ts) {
return e->mono_time > ts; return e->mono_time > ts;
}); });
QList<uint8_t> mask = settings.suppress_defined_signals ? dbc()->mask(id) : QList<uint8_t>(); auto mask_it = masks.find(id);
std::vector<uint8_t> *mask = mask_it == masks.end() ? nullptr : &mask_it->second;
if (it != ev.crend()) { if (it != ev.crend()) {
double ts = (*it)->mono_time / 1e9 - routeStartTime(); double ts = (*it)->mono_time / 1e9 - routeStartTime();
auto &m = all_msgs[id]; auto &m = all_msgs[id];
@ -93,7 +115,10 @@ void AbstractStream::updateLastMsgsTo(double sec) {
m.freq = m.count / std::max(1.0, ts); m.freq = m.count / std::max(1.0, ts);
} }
} }
// deep copy all_msgs to last_msgs to avoid multi-threading issue.
last_msgs = all_msgs; last_msgs = all_msgs;
last_msgs.detach();
// use a timer to prevent recursive calls // use a timer to prevent recursive calls
QTimer::singleShot(0, [this]() { QTimer::singleShot(0, [this]() {
emit updated(); emit updated();
@ -171,7 +196,7 @@ static inline QColor blend(const QColor &a, const QColor &b) {
return QColor((a.red() + b.red()) / 2, (a.green() + b.green()) / 2, (a.blue() + b.blue()) / 2, (a.alpha() + b.alpha()) / 2); return QColor((a.red() + b.red()) / 2, (a.green() + b.green()) / 2, (a.blue() + b.blue()) / 2, (a.alpha() + b.alpha()) / 2);
} }
void CanData::compute(const char *can_data, const int size, double current_sec, double playback_speed, const QList<uint8_t> &mask, uint32_t in_freq) { void CanData::compute(const char *can_data, const int size, double current_sec, double playback_speed, const std::vector<uint8_t> *mask, uint32_t in_freq) {
ts = current_sec; ts = current_sec;
++count; ++count;
const double sec_to_first_event = current_sec - (can->allEvents().front()->mono_time / 1e9 - can->routeStartTime()); const double sec_to_first_event = current_sec - (can->allEvents().front()->mono_time / 1e9 - can->routeStartTime());
@ -190,7 +215,7 @@ void CanData::compute(const char *can_data, const int size, double current_sec,
const QColor &greyish_blue = !lighter ? GREYISH_BLUE : GREYISH_BLUE_LIGHTER; const QColor &greyish_blue = !lighter ? GREYISH_BLUE : GREYISH_BLUE_LIGHTER;
for (int i = 0; i < size; ++i) { for (int i = 0; i < size; ++i) {
const uint8_t mask_byte = (i < mask.size()) ? (~mask[i]) : 0xff; const uint8_t mask_byte = (mask && i < mask->size()) ? (~((*mask)[i])) : 0xff;
const uint8_t last = dat[i] & mask_byte; const uint8_t last = dat[i] & mask_byte;
const uint8_t cur = can_data[i] & mask_byte; const uint8_t cur = can_data[i] & mask_byte;
const int delta = cur - last; const int delta = cur - last;

@ -13,7 +13,7 @@
#include "tools/replay/replay.h" #include "tools/replay/replay.h"
struct CanData { struct CanData {
void compute(const char *dat, const int size, double current_sec, double playback_speed, const QList<uint8_t> &mask, uint32_t in_freq = 0); void compute(const char *dat, const int size, double current_sec, double playback_speed, const std::vector<uint8_t> *mask, uint32_t in_freq = 0);
double ts = 0.; double ts = 0.;
uint32_t count = 0; uint32_t count = 0;
@ -79,6 +79,7 @@ protected:
uint64_t lastEventMonoTime() const { return lastest_event_ts; } uint64_t lastEventMonoTime() const { return lastest_event_ts; }
void updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size); void updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size);
void updateMessages(QHash<MessageId, CanData> *); void updateMessages(QHash<MessageId, CanData> *);
void updateMasks();
void updateLastMsgsTo(double sec); void updateLastMsgsTo(double sec);
uint64_t lastest_event_ts = 0; uint64_t lastest_event_ts = 0;
@ -88,6 +89,8 @@ protected:
std::unordered_map<MessageId, std::vector<const CanEvent *>> events_; std::unordered_map<MessageId, std::vector<const CanEvent *>> events_;
std::vector<const CanEvent *> all_events_; std::vector<const CanEvent *> all_events_;
std::deque<std::unique_ptr<char[]>> memory_blocks; std::deque<std::unique_ptr<char[]>> memory_blocks;
std::mutex mutex;
std::unordered_map<MessageId, std::vector<uint8_t>> masks;
}; };
class AbstractOpenStreamWidget : public QWidget { class AbstractOpenStreamWidget : public QWidget {

Loading…
Cancel
Save