open source driving agent
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

122 lines
4.6 KiB

#include "tools/cabana/streams/abstractstream.h"
#include <QTimer>
#include <QtConcurrent>
AbstractStream *can = nullptr;
AbstractStream::AbstractStream(QObject *parent, bool is_live_streaming) : is_live_streaming(is_live_streaming), QObject(parent) {
can = this;
new_msgs = std::make_unique<QHash<MessageId, CanData>>();
QObject::connect(this, &AbstractStream::received, this, &AbstractStream::process, Qt::QueuedConnection);
QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo);
}
void AbstractStream::process(QHash<MessageId, CanData> *messages) {
for (auto it = messages->begin(); it != messages->end(); ++it) {
can_msgs[it.key()] = it.value();
}
emit updated();
emit msgsReceived(messages);
delete messages;
processing = false;
}
bool AbstractStream::updateEvent(const Event *event) {
static double prev_update_ts = 0;
if (event->which == cereal::Event::Which::CAN) {
double current_sec = event->mono_time / 1e9 - routeStartTime();
for (const auto &c : event->event.getCan()) {
MessageId id = {.source = c.getSrc(), .address = c.getAddress()};
CanData &data = (*new_msgs)[id];
data.ts = current_sec;
data.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size());
data.count = ++counters[id];
data.freq = data.count / std::max(1.0, current_sec);
auto &tracker = change_trackers[id];
tracker.compute(data.dat, data.ts, data.freq);
data.colors = tracker.colors;
data.last_change_t = tracker.last_change_t;
data.bit_change_counts = tracker.bit_change_counts;
}
double ts = millis_since_boot();
if ((ts - prev_update_ts) > (1000.0 / settings.fps) && !processing && !new_msgs->isEmpty()) {
// delay posting CAN message if UI thread is busy
processing = true;
prev_update_ts = ts;
// use pointer to avoid data copy in queued connection.
emit received(new_msgs.release());
new_msgs.reset(new QHash<MessageId, CanData>);
new_msgs->reserve(100);
}
}
return true;
}
const CanData &AbstractStream::lastMessage(const MessageId &id) {
static CanData empty_data;
auto it = can_msgs.find(id);
return it != can_msgs.end() ? it.value() : empty_data;
}
static QHash<MessageId, CanData> parseEvents(std::vector<Event *>::const_reverse_iterator first,
std::vector<Event *>::const_reverse_iterator last, double route_start_time) {
QHash<MessageId, CanData> msgs;
msgs.reserve(500);
for (auto it = first; it != last; ++it) {
if ((*it)->which == cereal::Event::Which::CAN) {
for (const auto &c : (*it)->event.getCan()) {
auto &m = msgs[{.source = c.getSrc(), .address = c.getAddress()}];
if (++m.count == 1) {
m.ts = ((*it)->mono_time / 1e9) - route_start_time;
m.dat = QByteArray((char *)c.getDat().begin(), c.getDat().size());
m.colors = QVector<QColor>(m.dat.size(), QColor(0, 0, 0, 0));
m.last_change_t = QVector<double>(m.dat.size(), m.ts);
m.bit_change_counts.resize(m.dat.size());
}
}
}
}
return msgs;
};
// it is thread safe to update data in updateLastMsgsTo.
// updateEvent will not be called before replayStream::seekedTo return.
void AbstractStream::updateLastMsgsTo(double sec) {
uint64_t ts = (sec + routeStartTime()) * 1e9;
const uint64_t delta = std::max(std::ceil(sec / std::thread::hardware_concurrency()), 30.0) * 1e9;
const auto evs = events();
auto first = std::upper_bound(evs->crbegin(), evs->crend(), ts, [](uint64_t ts, auto &e) { return ts > e->mono_time; });
QFutureSynchronizer<QHash<MessageId, CanData>> synchronizer;
while(first != evs->crend()) {
ts = (*first)->mono_time > delta ? (*first)->mono_time - delta : 0;
auto last = std::lower_bound(first, evs->crend(), ts, [](auto &e, uint64_t ts) { return e->mono_time > ts; });
synchronizer.addFuture(QtConcurrent::run(parseEvents, first, last, routeStartTime()));
first = last;
}
synchronizer.waitForFinished();
new_msgs->clear();
change_trackers.clear();
can_msgs.clear();
counters.clear();
for (const auto &f : synchronizer.futures()) {
auto msgs = f.result();
for (auto it = msgs.cbegin(); it != msgs.cend(); ++it) {
counters[it.key()] += it.value().count;
auto m = can_msgs.find(it.key());
if (m == can_msgs.end()) {
m = can_msgs.insert(it.key(), it.value());
} else {
m.value().count += it.value().count;
}
m.value().freq = m.value().count / std::max(1.0, m.value().ts);
}
}
QTimer::singleShot(0, [this]() {
emit updated();
emit msgsReceived(&can_msgs);
});
}