cabana: refactor the cache for CAN events (#27969)

old-commit-hash: 8ad2d84aeb
beeps
Dean Lee 2 years ago committed by GitHub
parent 7a51caa0af
commit 70800c6397
  1. 11
      tools/cabana/chart/chart.cc
  2. 14
      tools/cabana/chart/sparkline.cc
  3. 24
      tools/cabana/historylog.cc
  4. 42
      tools/cabana/streams/abstractstream.cc
  5. 15
      tools/cabana/streams/abstractstream.h

@ -260,7 +260,9 @@ void ChartView::updateSeries(const cabana::Signal *sig) {
s.series->setColor(getColor(s.sig));
const auto &msgs = can->events().at(s.msg_id);
auto first = std::upper_bound(msgs.cbegin(), msgs.cend(), CanEvent{.mono_time = s.last_value_mono_time});
auto first = std::upper_bound(msgs.cbegin(), msgs.cend(), s.last_value_mono_time, [](uint64_t ts, auto e) {
return ts < e->mono_time;
});
int new_size = std::max<int>(s.vals.size() + std::distance(first, msgs.cend()), settings.max_cached_minutes * 60 * 100);
if (s.vals.capacity() <= new_size) {
s.vals.reserve(new_size * 2);
@ -269,14 +271,15 @@ void ChartView::updateSeries(const cabana::Signal *sig) {
const double route_start_time = can->routeStartTime();
for (auto end = msgs.cend(); first != end; ++first) {
double value = get_raw_value(first->dat, first->size, *s.sig);
double ts = first->mono_time / 1e9 - route_start_time; // seconds
const CanEvent *e = *first;
double value = get_raw_value(e->dat, e->size, *s.sig);
double ts = e->mono_time / 1e9 - route_start_time; // seconds
s.vals.append({ts, value});
if (!s.step_vals.empty()) {
s.step_vals.append({ts, s.step_vals.back().y()});
}
s.step_vals.append({ts, value});
s.last_value_mono_time = first->mono_time;
s.last_value_mono_time = e->mono_time;
}
if (!can->liveStreaming()) {
s.segment_tree.build(s.vals);

@ -7,8 +7,13 @@
void Sparkline::update(const MessageId &msg_id, const cabana::Signal *sig, double last_msg_ts, int range, QSize size) {
const auto &msgs = can->events().at(msg_id);
uint64_t ts = (last_msg_ts + can->routeStartTime()) * 1e9;
auto first = std::lower_bound(msgs.cbegin(), msgs.cend(), CanEvent{.mono_time = (uint64_t)std::max<int64_t>(ts - range * 1e9, 0)});
auto last = std::upper_bound(first, msgs.cend(), CanEvent{.mono_time = ts});
uint64_t first_ts = (ts > range * 1e9) ? ts - range * 1e9 : 0;
auto first = std::lower_bound(msgs.cbegin(), msgs.cend(), first_ts, [](auto e, uint64_t ts) {
return e->mono_time < ts;
});
auto last = std::upper_bound(first, msgs.cend(), ts, [](uint64_t ts, auto e) {
return ts < e->mono_time;
});
bool update_values = last_ts != last_msg_ts || time_range != range;
last_ts = last_msg_ts;
@ -21,8 +26,9 @@ void Sparkline::update(const MessageId &msg_id, const cabana::Signal *sig, doubl
min_val = std::numeric_limits<double>::max();
max_val = std::numeric_limits<double>::lowest();
for (auto it = first; it != last; ++it) {
double value = get_raw_value(it->dat, it->size, *sig);
values.emplace_back((it->mono_time - first->mono_time) / 1e9, value);
const CanEvent *e = *it;
double value = get_raw_value(e->dat, e->size, *sig);
values.emplace_back((e->mono_time - (*first)->mono_time) / 1e9, value);
if (min_val > value) min_val = value;
if (max_val < value) max_val = value;
}

@ -119,14 +119,15 @@ template <class InputIt>
std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(InputIt first, InputIt last, uint64_t min_time) {
std::deque<HistoryLogModel::Message> msgs;
QVector<double> values(sigs.size());
for (; first != last && first->mono_time > min_time; ++first) {
for (; first != last && (*first)->mono_time > min_time; ++first) {
const CanEvent *e = *first;
for (int i = 0; i < sigs.size(); ++i) {
values[i] = get_raw_value(first->dat, first->size, *sigs[i]);
values[i] = get_raw_value(e->dat, e->size, *sigs[i]);
}
if (!filter_cmp || filter_cmp(values[filter_sig_idx], filter_value)) {
auto &m = msgs.emplace_back();
m.mono_time = first->mono_time;
m.data = QByteArray((const char *)first->dat, first->size);
m.mono_time = e->mono_time;
m.data = QByteArray((const char *)e->dat, e->size);
m.sig_values = values;
if (msgs.size() >= batch_size && min_time == 0) {
return msgs;
@ -141,23 +142,28 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_ti
const auto freq = can->lastMessage(msg_id).freq;
const bool update_colors = !display_signals_mode || sigs.empty();
const auto speed = can->getSpeed();
if (dynamic_mode) {
auto first = std::upper_bound(events.rbegin(), events.rend(), CanEvent{.mono_time=from_time}, std::greater<CanEvent>());
auto first = std::upper_bound(events.rbegin(), events.rend(), from_time, [](uint64_t ts, auto e) {
return ts > e->mono_time;
});
auto msgs = fetchData(first, events.rend(), min_time);
if (update_colors && (min_time > 0 || messages.empty())) {
for (auto it = msgs.rbegin(); it != msgs.rend(); ++it) {
hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, freq);
hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, speed, freq);
it->colors = hex_colors.colors;
}
}
return msgs;
} else {
assert(min_time == 0);
auto first = std::upper_bound(events.begin(), events.end(), CanEvent{.mono_time=from_time});
auto msgs = fetchData(first, events.end(), 0);
auto first = std::upper_bound(events.cbegin(), events.cend(), from_time, [](uint64_t ts, auto e) {
return ts < e->mono_time;
});
auto msgs = fetchData(first, events.cend(), 0);
if (update_colors) {
for (auto it = msgs.begin(); it != msgs.end(); ++it) {
hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, freq);
hex_colors.compute(it->data.data(), it->data.size(), it->mono_time / (double)1e9, speed, freq);
it->colors = hex_colors.colors;
}
}

@ -68,13 +68,15 @@ void AbstractStream::updateLastMsgsTo(double sec) {
all_msgs.clear();
last_msgs.clear();
CanEvent last_event = {.mono_time = uint64_t((sec + routeStartTime()) * 1e9)};
uint64_t last_ts = (sec + routeStartTime()) * 1e9;
for (auto &[id, e] : events_) {
auto it = std::lower_bound(e.crbegin(), e.crend(), last_event, std::greater<CanEvent>());
auto it = std::lower_bound(e.crbegin(), e.crend(), last_ts, [](auto e, uint64_t ts) {
return e->mono_time > ts;
});
if (it != e.crend()) {
double ts = it->mono_time / 1e9 - routeStartTime();
double ts = (*it)->mono_time / 1e9 - routeStartTime();
auto &m = all_msgs[id];
m.compute((const char *)it->dat, it->size, ts, getSpeed());
m.compute((const char *)(*it)->dat, (*it)->size, ts, getSpeed());
m.count = std::distance(it, e.crend());
m.freq = m.count / std::max(1.0, ts);
}
@ -87,18 +89,30 @@ void AbstractStream::updateLastMsgsTo(double sec) {
});
}
void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<CanEvent>> &msgs,
void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<CanEvent *>> &msgs,
std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last) {
size_t memory_size = 0;
for (auto it = first; it != last; ++it) {
if ((*it)->which == cereal::Event::Which::CAN) {
for (const auto &c : (*it)->event.getCan()) {
memory_size += sizeof(CanEvent) + sizeof(uint8_t) * c.getDat().size();
}
}
}
char *ptr = memory_blocks.emplace_back(new char[memory_size]).get();
uint64_t ts = 0;
for (; first != last; ++first) {
if ((*first)->which == cereal::Event::Which::CAN) {
ts = (*first)->mono_time;
for (const auto &c : (*first)->event.getCan()) {
for (auto it = first; it != last; ++it) {
if ((*it)->which == cereal::Event::Which::CAN) {
ts = (*it)->mono_time;
for (const auto &c : (*it)->event.getCan()) {
auto dat = c.getDat();
auto &m = msgs[{.source = c.getSrc(), .address = c.getAddress()}].emplace_back();
m.size = std::min(dat.size(), std::size(m.dat));
memcpy(m.dat, (uint8_t *)dat.begin(), m.size);
m.mono_time = ts;
CanEvent *e = (CanEvent *)ptr;
e->mono_time = ts;
e->size = dat.size();
memcpy(e->dat, (uint8_t *)dat.begin(), e->size);
msgs[{.source = c.getSrc(), .address = c.getAddress()}].push_back(e);
ptr += sizeof(CanEvent) + sizeof(uint8_t) * e->size;
}
}
}
@ -111,7 +125,7 @@ void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std
if (append) {
parseEvents(events_, first, last);
} else {
std::unordered_map<MessageId, std::deque<CanEvent>> new_events;
std::unordered_map<MessageId, std::deque<CanEvent *>> new_events;
parseEvents(new_events, first, last);
for (auto &[id, new_e] : new_events) {
auto &e = events_[id];

@ -25,11 +25,9 @@ struct CanData {
};
struct CanEvent {
uint64_t mono_time = 0;
uint8_t size = 0;
uint8_t dat[64] = {};
inline bool operator<(const CanEvent &r) const { return mono_time < r.mono_time; }
inline bool operator>(const CanEvent &r) const { return mono_time > r.mono_time; }
uint64_t mono_time;
uint8_t size;
uint8_t dat[];
};
class AbstractStream : public QObject {
@ -55,7 +53,7 @@ public:
virtual bool isPaused() const { return false; }
virtual void pause(bool pause) {}
virtual const std::vector<Event*> *rawEvents() const { return nullptr; }
const std::unordered_map<MessageId, std::deque<CanEvent>> &events() const { return events_; }
const std::unordered_map<MessageId, std::deque<CanEvent *>> &events() const { return events_; }
virtual const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { return {}; }
void mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append);
@ -78,14 +76,15 @@ protected:
virtual void process(QHash<MessageId, CanData> *);
bool updateEvent(const Event *event);
void updateLastMsgsTo(double sec);
void parseEvents(std::unordered_map<MessageId, std::deque<CanEvent>> &msgs, std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last);
void parseEvents(std::unordered_map<MessageId, std::deque<CanEvent *>> &msgs, std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last);
bool is_live_streaming = false;
std::atomic<bool> processing = false;
std::unique_ptr<QHash<MessageId, CanData>> new_msgs;
QHash<MessageId, CanData> all_msgs;
std::unordered_map<MessageId, std::deque<CanEvent>> events_;
std::unordered_map<MessageId, std::deque<CanEvent *>> events_;
uint64_t last_event_ts = 0;
std::deque<std::unique_ptr<char[]>> memory_blocks;
};
// A global pointer referring to the unique AbstractStream object

Loading…
Cancel
Save