cabana: support all features except video in live stream mode. (#27994)

old-commit-hash: 91dc064ac7
beeps
Dean Lee 2 years ago committed by GitHub
parent 195ba2cd64
commit 2b9c35ddf7
  1. 18
      tools/cabana/chart/chart.cc
  2. 2
      tools/cabana/chart/chartswidget.cc
  3. 2
      tools/cabana/chart/sparkline.cc
  4. 2
      tools/cabana/historylog.cc
  5. 6
      tools/cabana/mainwin.cc
  6. 72
      tools/cabana/streams/abstractstream.cc
  7. 34
      tools/cabana/streams/abstractstream.h
  8. 4
      tools/cabana/streams/devicestream.cc
  9. 119
      tools/cabana/streams/livestream.cc
  10. 48
      tools/cabana/streams/livestream.h
  11. 7
      tools/cabana/streams/pandastream.cc
  12. 17
      tools/cabana/streams/replaystream.cc
  13. 4
      tools/cabana/streams/replaystream.h
  14. 45
      tools/cabana/tools/findsimilarbits.cc
  15. 6
      tools/cabana/videowidget.cc

@ -36,7 +36,7 @@ ChartView::ChartView(const std::pair<double, double> &x_range, ChartsWidget *par
createToolButtons(); createToolButtons();
// TODO: enable zoomIn/seekTo in live streaming mode. // TODO: enable zoomIn/seekTo in live streaming mode.
setRubberBand(can->liveStreaming() ? QChartView::NoRubberBand : QChartView::HorizontalRubberBand); setRubberBand(QChartView::HorizontalRubberBand);
setMouseTracking(true); setMouseTracking(true);
setTheme(settings.theme == DARK_THEME ? QChart::QChart::ChartThemeDark : QChart::ChartThemeLight); setTheme(settings.theme == DARK_THEME ? QChart::QChart::ChartThemeDark : QChart::ChartThemeLight);
@ -259,7 +259,7 @@ void ChartView::updateSeries(const cabana::Signal *sig) {
} }
s.series->setColor(getColor(s.sig)); s.series->setColor(getColor(s.sig));
const auto &msgs = can->events().at(s.msg_id); const auto &msgs = can->events(s.msg_id);
auto first = std::upper_bound(msgs.cbegin(), msgs.cend(), s.last_value_mono_time, [](uint64_t ts, auto e) { auto first = std::upper_bound(msgs.cbegin(), msgs.cend(), s.last_value_mono_time, [](uint64_t ts, auto e) {
return ts < e->mono_time; return ts < e->mono_time;
}); });
@ -439,14 +439,12 @@ void ChartView::mousePressEvent(QMouseEvent *event) {
drag->setHotSpot(-QPoint(5, 5)); drag->setHotSpot(-QPoint(5, 5));
drag->exec(Qt::CopyAction | Qt::MoveAction, Qt::MoveAction); drag->exec(Qt::CopyAction | Qt::MoveAction, Qt::MoveAction);
} else if (event->button() == Qt::LeftButton && QApplication::keyboardModifiers().testFlag(Qt::ShiftModifier)) { } else if (event->button() == Qt::LeftButton && QApplication::keyboardModifiers().testFlag(Qt::ShiftModifier)) {
if (!can->liveStreaming()) { // Save current playback state when scrubbing
// Save current playback state when scrubbing resume_after_scrub = !can->isPaused();
resume_after_scrub = !can->isPaused(); if (resume_after_scrub) {
if (resume_after_scrub) { can->pause(true);
can->pause(true);
}
is_scrubbing = true;
} }
is_scrubbing = true;
} else { } else {
QChartView::mousePressEvent(event); QChartView::mousePressEvent(event);
} }
@ -473,7 +471,7 @@ void ChartView::mouseReleaseEvent(QMouseEvent *event) {
viewport()->update(); viewport()->update();
} }
event->accept(); event->accept();
} else if (!can->liveStreaming() && event->button() == Qt::RightButton) { } else if (event->button() == Qt::RightButton) {
charts_widget->zoom_undo_stack->undo(); charts_widget->zoom_undo_stack->undo();
event->accept(); event->accept();
} else { } else {

@ -191,7 +191,7 @@ void ChartsWidget::updateState() {
double max_sec = std::min(std::floor(display_range.first + max_chart_range), can->lastEventSecond()); double max_sec = std::min(std::floor(display_range.first + max_chart_range), can->lastEventSecond());
display_range.first = std::max(0.0, max_sec - max_chart_range); display_range.first = std::max(0.0, max_sec - max_chart_range);
display_range.second = display_range.first + max_chart_range; display_range.second = display_range.first + max_chart_range;
} else if (cur_sec < zoomed_range.first || cur_sec >= zoomed_range.second) { } else if (cur_sec < (zoomed_range.first - 0.1) || cur_sec >= zoomed_range.second) {
// loop in zoomed range // loop in zoomed range
can->seekTo(zoomed_range.first); can->seekTo(zoomed_range.first);
} }

@ -5,7 +5,7 @@
#include "tools/cabana/streams/abstractstream.h" #include "tools/cabana/streams/abstractstream.h"
void Sparkline::update(const MessageId &msg_id, const cabana::Signal *sig, double last_msg_ts, int range, QSize size) { void Sparkline::update(const MessageId &msg_id, const cabana::Signal *sig, double last_msg_ts, int range, QSize size) {
const auto &msgs = can->events().at(msg_id); const auto &msgs = can->events(msg_id);
uint64_t ts = (last_msg_ts + can->routeStartTime()) * 1e9; uint64_t ts = (last_msg_ts + can->routeStartTime()) * 1e9;
uint64_t first_ts = (ts > range * 1e9) ? ts - range * 1e9 : 0; uint64_t first_ts = (ts > range * 1e9) ? ts - range * 1e9 : 0;
auto first = std::lower_bound(msgs.cbegin(), msgs.cend(), first_ts, [](auto e, uint64_t ts) { auto first = std::lower_bound(msgs.cbegin(), msgs.cend(), first_ts, [](auto e, uint64_t ts) {

@ -138,7 +138,7 @@ std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(InputIt first, I
} }
std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_time, uint64_t min_time) { std::deque<HistoryLogModel::Message> HistoryLogModel::fetchData(uint64_t from_time, uint64_t min_time) {
const auto &events = can->events().at(msg_id); const auto &events = can->events(msg_id);
const auto freq = can->lastMessage(msg_id).freq; const auto freq = can->lastMessage(msg_id).freq;
const bool update_colors = !display_signals_mode || sigs.empty(); const bool update_colors = !display_signals_mode || sigs.empty();

@ -145,10 +145,8 @@ void MainWindow::createActions() {
commands_act->setDefaultWidget(undo_view); commands_act->setDefaultWidget(undo_view);
commands_menu->addAction(commands_act); commands_menu->addAction(commands_act);
if (!can->liveStreaming()) { QMenu *tools_menu = menuBar()->addMenu(tr("&Tools"));
QMenu *tools_menu = menuBar()->addMenu(tr("&Tools")); tools_menu->addAction(tr("Find &Similar Bits"), this, &MainWindow::findSimilarBits);
tools_menu->addAction(tr("Find &Similar Bits"), this, &MainWindow::findSimilarBits);
}
QMenu *help_menu = menuBar()->addMenu(tr("&Help")); QMenu *help_menu = menuBar()->addMenu(tr("&Help"));
help_menu->addAction(tr("Help"), this, &MainWindow::onlineHelp)->setShortcuts(QKeySequence::HelpContents); help_menu->addAction(tr("Help"), this, &MainWindow::onlineHelp)->setShortcuts(QKeySequence::HelpContents);

@ -3,14 +3,13 @@
AbstractStream *can = nullptr; AbstractStream *can = nullptr;
AbstractStream::AbstractStream(QObject *parent, bool is_live_streaming) : is_live_streaming(is_live_streaming), QObject(parent) { AbstractStream::AbstractStream(QObject *parent) : QObject(parent) {
can = this; can = this;
new_msgs = std::make_unique<QHash<MessageId, CanData>>(); new_msgs = std::make_unique<QHash<MessageId, CanData>>();
QObject::connect(this, &AbstractStream::received, this, &AbstractStream::process, Qt::QueuedConnection);
QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo); QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo);
} }
void AbstractStream::process(QHash<MessageId, CanData> *messages) { void AbstractStream::updateMessages(QHash<MessageId, CanData> *messages) {
auto prev_src_size = sources.size(); auto prev_src_size = sources.size();
for (auto it = messages->begin(); it != messages->end(); ++it) { for (auto it = messages->begin(); it != messages->end(); ++it) {
const auto &id = it.key(); const auto &id = it.key();
@ -26,45 +25,39 @@ void AbstractStream::process(QHash<MessageId, CanData> *messages) {
processing = false; processing = false;
} }
bool AbstractStream::updateEvent(const Event *event) { void AbstractStream::updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size) {
static double prev_update_ts = 0; all_msgs[id].compute((const char*)data, size, sec, getSpeed());
if (event->which == cereal::Event::Which::CAN) { if (!new_msgs->contains(id)) {
double current_sec = event->mono_time / 1e9 - routeStartTime(); new_msgs->insert(id, {});
for (const auto &c : event->event.getCan()) { }
MessageId id = {.source = c.getSrc(), .address = c.getAddress()}; }
const auto dat = c.getDat();
all_msgs[id].compute((const char *)dat.begin(), dat.size(), current_sec, getSpeed()); bool AbstractStream::postEvents() {
if (!new_msgs->contains(id)) { // delay posting CAN message if UI thread is busy
new_msgs->insert(id, {}); if (!processing) {
} processing = true;
} for (auto it = new_msgs->begin(); it != new_msgs->end(); ++it) {
double ts = millis_since_boot(); it.value() = all_msgs[it.key()];
// delay posting CAN message if UI thread is busy
if ((ts - prev_update_ts) > (1000.0 / settings.fps) && !processing && !new_msgs->isEmpty()) {
processing = true;
prev_update_ts = ts;
for (auto it = new_msgs->begin(); it != new_msgs->end(); ++it) {
it.value() = all_msgs[it.key()];
}
// use pointer to avoid data copy in queued connection.
emit received(new_msgs.release());
new_msgs.reset(new QHash<MessageId, CanData>);
new_msgs->reserve(100);
} }
// use pointer to avoid data copy in queued connection.
QMetaObject::invokeMethod(this, std::bind(&AbstractStream::updateMessages, this, new_msgs.release()), Qt::QueuedConnection);
new_msgs.reset(new QHash<MessageId, CanData>);
new_msgs->reserve(100);
return true;
} }
return true; return false;
} }
const CanData &AbstractStream::lastMessage(const MessageId &id) { const CanData &AbstractStream::lastMessage(const MessageId &id) {
static CanData empty_data; static CanData empty_data = {};
auto it = last_msgs.find(id); auto it = last_msgs.find(id);
return it != last_msgs.end() ? it.value() : empty_data; return it != last_msgs.end() ? it.value() : empty_data;
} }
// it is thread safe to update data in updateLastMsgsTo. // it is thread safe to update data in updateLastMsgsTo.
// updateEvent will not be called before replayStream::seekedTo return. // updateLastMsgsTo is always called in UI thread.
void AbstractStream::updateLastMsgsTo(double sec) { void AbstractStream::updateLastMsgsTo(double sec) {
new_msgs->clear(); new_msgs.reset(new QHash<MessageId, CanData>);
all_msgs.clear(); all_msgs.clear();
last_msgs.clear(); last_msgs.clear();
@ -89,7 +82,7 @@ void AbstractStream::updateLastMsgsTo(double sec) {
}); });
} }
void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<CanEvent *>> &msgs, void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<const CanEvent *>> &msgs,
std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last) { std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last) {
size_t memory_size = 0; size_t memory_size = 0;
for (auto it = first; it != last; ++it) { for (auto it = first; it != last; ++it) {
@ -101,22 +94,24 @@ void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<CanEve
} }
char *ptr = memory_blocks.emplace_back(new char[memory_size]).get(); char *ptr = memory_blocks.emplace_back(new char[memory_size]).get();
uint64_t ts = 0;
for (auto it = first; it != last; ++it) { for (auto it = first; it != last; ++it) {
if ((*it)->which == cereal::Event::Which::CAN) { if ((*it)->which == cereal::Event::Which::CAN) {
ts = (*it)->mono_time; uint64_t ts = (*it)->mono_time;
for (const auto &c : (*it)->event.getCan()) { for (const auto &c : (*it)->event.getCan()) {
auto dat = c.getDat();
CanEvent *e = (CanEvent *)ptr; CanEvent *e = (CanEvent *)ptr;
e->src = c.getSrc();
e->address = c.getAddress();
e->mono_time = ts; e->mono_time = ts;
auto dat = c.getDat();
e->size = dat.size(); e->size = dat.size();
memcpy(e->dat, (uint8_t *)dat.begin(), e->size); memcpy(e->dat, (uint8_t *)dat.begin(), e->size);
msgs[{.source = c.getSrc(), .address = c.getAddress()}].push_back(e);
msgs[{.source = e->src, .address = e->address}].push_back(e);
all_events_.push_back(e);
ptr += sizeof(CanEvent) + sizeof(uint8_t) * e->size; ptr += sizeof(CanEvent) + sizeof(uint8_t) * e->size;
} }
} }
} }
last_event_ts = std::max(last_event_ts, ts);
} }
void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append) { void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append) {
@ -125,7 +120,7 @@ void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std
if (append) { if (append) {
parseEvents(events_, first, last); parseEvents(events_, first, last);
} else { } else {
std::unordered_map<MessageId, std::deque<CanEvent *>> new_events; std::unordered_map<MessageId, std::deque<const CanEvent *>> new_events;
parseEvents(new_events, first, last); parseEvents(new_events, first, last);
for (auto &[id, new_e] : new_events) { for (auto &[id, new_e] : new_events) {
auto &e = events_[id]; auto &e = events_[id];
@ -133,6 +128,7 @@ void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std
e.insert(it, new_e.cbegin(), new_e.cend()); e.insert(it, new_e.cbegin(), new_e.cend());
} }
} }
total_sec = (all_events_.back()->mono_time - all_events_.front()->mono_time) / 1e9;
emit eventsMerged(); emit eventsMerged();
} }

@ -27,6 +27,8 @@ struct CanData {
}; };
struct CanEvent { struct CanEvent {
uint8_t src;
uint32_t address;
uint64_t mono_time; uint64_t mono_time;
uint8_t size; uint8_t size;
uint8_t dat[]; uint8_t dat[];
@ -36,28 +38,26 @@ class AbstractStream : public QObject {
Q_OBJECT Q_OBJECT
public: public:
AbstractStream(QObject *parent, bool is_live_streaming); AbstractStream(QObject *parent);
virtual ~AbstractStream() {}; virtual ~AbstractStream() {};
inline bool liveStreaming() const { return is_live_streaming; } inline bool liveStreaming() const { return route() == nullptr; }
inline double lastEventSecond() const { return last_event_ts / 1e9 - routeStartTime(); } inline double lastEventSecond() const { return lastEventMonoTime() / 1e9 - routeStartTime(); }
virtual void seekTo(double ts) {} virtual void seekTo(double ts) {}
virtual QString routeName() const = 0; virtual QString routeName() const = 0;
virtual QString carFingerprint() const { return ""; } virtual QString carFingerprint() const { return ""; }
virtual double totalSeconds() const { return 0; }
virtual double routeStartTime() const { return 0; } virtual double routeStartTime() const { return 0; }
virtual double currentSec() const = 0; virtual double currentSec() const = 0;
virtual QDateTime currentDateTime() const { return {}; } double totalSeconds() const { return total_sec; }
virtual const CanData &lastMessage(const MessageId &id); const CanData &lastMessage(const MessageId &id);
virtual VisionStreamType visionStreamType() const { return VISION_STREAM_ROAD; } virtual VisionStreamType visionStreamType() const { return VISION_STREAM_ROAD; }
virtual const Route *route() const { return nullptr; } virtual const Route *route() const { return nullptr; }
virtual void setSpeed(float speed) {} virtual void setSpeed(float speed) {}
virtual double getSpeed() { return 1; } virtual double getSpeed() { return 1; }
virtual bool isPaused() const { return false; } virtual bool isPaused() const { return false; }
virtual void pause(bool pause) {} virtual void pause(bool pause) {}
virtual const std::vector<Event*> *rawEvents() const { return nullptr; } const std::deque<const CanEvent *> &allEvents() const { return all_events_; }
const std::unordered_map<MessageId, std::deque<CanEvent *>> &events() const { return events_; } const std::deque<const CanEvent *> &events(const MessageId &id) const { return events_.at(id); }
virtual const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { return {}; } virtual const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { return {}; }
void mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append);
signals: signals:
void paused(); void paused();
@ -67,7 +67,6 @@ signals:
void eventsMerged(); void eventsMerged();
void updated(); void updated();
void msgsReceived(const QHash<MessageId, CanData> *); void msgsReceived(const QHash<MessageId, CanData> *);
void received(QHash<MessageId, CanData> *);
void sourcesUpdated(const SourceSet &s); void sourcesUpdated(const SourceSet &s);
public: public:
@ -75,17 +74,20 @@ public:
SourceSet sources; SourceSet sources;
protected: protected:
virtual void process(QHash<MessageId, CanData> *); void mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append);
bool updateEvent(const Event *event); bool postEvents();
uint64_t lastEventMonoTime() const { return all_events_.empty() ? 0 : all_events_.back()->mono_time; }
void updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size);
void updateMessages(QHash<MessageId, CanData> *);
void parseEvents(std::unordered_map<MessageId, std::deque<const CanEvent *>> &msgs, std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last);
void updateLastMsgsTo(double sec); void updateLastMsgsTo(double sec);
void parseEvents(std::unordered_map<MessageId, std::deque<CanEvent *>> &msgs, std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last);
bool is_live_streaming = false; double total_sec = 0;
std::atomic<bool> processing = false; std::atomic<bool> processing = false;
std::unique_ptr<QHash<MessageId, CanData>> new_msgs; std::unique_ptr<QHash<MessageId, CanData>> new_msgs;
QHash<MessageId, CanData> all_msgs; QHash<MessageId, CanData> all_msgs;
std::unordered_map<MessageId, std::deque<CanEvent *>> events_; std::unordered_map<MessageId, std::deque<const CanEvent *>> events_;
uint64_t last_event_ts = 0; std::deque<const CanEvent *> all_events_;
std::deque<std::unique_ptr<char[]>> memory_blocks; std::deque<std::unique_ptr<char[]>> memory_blocks;
}; };

@ -30,8 +30,8 @@ void DeviceStream::streamThread() {
continue; continue;
} }
std::lock_guard lk(lock); handleEvent(msg->getData(), msg->getSize());
handleEvent(messages.emplace_back(msg).event); delete msg;
} }
} }

@ -2,79 +2,118 @@
#include <QTimer> #include <QTimer>
LiveStream::LiveStream(QObject *parent) : AbstractStream(parent, true) { LiveStream::LiveStream(QObject *parent) : AbstractStream(parent) {
if (settings.log_livestream) { if (settings.log_livestream) {
std::string path = (settings.log_path + "/" + QDateTime::currentDateTime().toString("yyyy-MM-dd--hh-mm-ss") + "--0").toStdString(); std::string path = (settings.log_path + "/" + QDateTime::currentDateTime().toString("yyyy-MM-dd--hh-mm-ss") + "--0").toStdString();
util::create_directories(path, 0755); util::create_directories(path, 0755);
fs.reset(new std::ofstream(path + "/rlog" , std::ios::binary | std::ios::out)); fs.reset(new std::ofstream(path + "/rlog", std::ios::binary | std::ios::out));
} }
stream_thread = new QThread(this); stream_thread = new QThread(this);
QObject::connect(&settings, &Settings::changed, this, &LiveStream::startUpdateTimer);
QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); }); QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); });
QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater); QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater);
} }
void LiveStream::startUpdateTimer() {
update_timer.stop();
update_timer.start(1000.0 / settings.fps, this);
timer_id = update_timer.timerId();
}
void LiveStream::startStreamThread() { void LiveStream::startStreamThread() {
// delay the start of the thread to avoid calling startStreamThread // delay the start of the thread to avoid calling startStreamThread
// in the constructor when other classes' slots have not been connected to // in the constructor when other classes' slots have not been connected to
// the signals of the livestream. // the signals of the livestream.
QTimer::singleShot(0, [this]() { stream_thread->start(); }); QTimer::singleShot(0, [this]() { stream_thread->start(); });
startUpdateTimer();
} }
LiveStream::~LiveStream() { LiveStream::~LiveStream() {
update_timer.stop();
stream_thread->requestInterruption(); stream_thread->requestInterruption();
stream_thread->quit(); stream_thread->quit();
stream_thread->wait(); stream_thread->wait();
} }
void LiveStream::handleEvent(Event *evt) { // called in streamThread
void LiveStream::handleEvent(const char *data, const size_t size) {
if (fs) { if (fs) {
auto bytes = evt->words.asChars(); fs->write(data, size);
fs->write(bytes.begin(), bytes.size());
} }
if (start_ts == 0 || evt->mono_time < start_ts) { std::lock_guard lk(lock);
if (evt->mono_time < start_ts) { auto &msg = receivedMessages.emplace_back(data, size);
qDebug() << "stream is looping back to old time stamp"; receivedEvents.push_back(msg.event);
}
void LiveStream::timerEvent(QTimerEvent *event) {
if (event->timerId() == timer_id) {
{
// merge events received from live stream thread.
std::lock_guard lk(lock);
mergeEvents(receivedEvents.cbegin(), receivedEvents.cend(), true);
receivedEvents.clear();
receivedMessages.clear();
} }
start_ts = current_ts = evt->mono_time; if (!all_events_.empty()) {
begin_event_ts = all_events_.front()->mono_time;
updateEvents();
return;
}
}
QObject::timerEvent(event);
}
void LiveStream::updateEvents() {
static double prev_speed = 1.0;
static uint64_t prev_newest_event_ts = all_events_.back()->mono_time;
if (first_update_ts == 0) {
first_update_ts = nanos_since_boot();
first_event_ts = current_event_ts = all_events_.back()->mono_time;
emit streamStarted(); emit streamStarted();
} }
received.push_back(evt); if (paused_ || prev_speed != speed_) {
if (!pause_) { prev_speed = speed_;
if (speed_ < 1 && last_update_ts > 0) { first_update_ts = nanos_since_boot();
auto it = std::upper_bound(received.cbegin(), received.cend(), current_ts, [](uint64_t ts, auto &e) { first_event_ts = current_event_ts;
return ts < e->mono_time; return;
});
if (it != received.cend()) {
bool skip = (nanos_since_boot() - last_update_ts) < ((*it)->mono_time - current_ts) / speed_;
if (skip) return;
evt = *it;
}
}
current_ts = evt->mono_time;
last_update_ts = nanos_since_boot();
updateEvent(evt);
} }
}
void LiveStream::process(QHash<MessageId, CanData> *last_messages) { uint64_t last_event_ts = all_events_.back()->mono_time;
{ bool at_the_end = current_event_ts == prev_newest_event_ts;
std::lock_guard lk(lock); if (!at_the_end) {
auto first = std::upper_bound(received.cbegin(), received.cend(), last_event_ts, [](uint64_t ts, auto &e) { last_event_ts = first_event_ts + (nanos_since_boot() - first_update_ts) * speed_;
return ts < e->mono_time;
});
mergeEvents(first, received.cend(), true);
if (speed_ == 1) {
received.clear();
messages.clear();
}
} }
AbstractStream::process(last_messages);
auto first = std::upper_bound(all_events_.cbegin(), all_events_.cend(), current_event_ts, [](uint64_t ts, auto e) {
return ts < e->mono_time;
});
auto last = std::upper_bound(first, all_events_.cend(), last_event_ts, [](uint64_t ts, auto e) {
return ts < e->mono_time;
});
for (auto it = first; it != last; ++it) {
const CanEvent *e = *it;
MessageId id = {.source = e->src, .address = e->address};
updateEvent(id, (e->mono_time - begin_event_ts) / 1e9, e->dat, e->size);
current_event_ts = e->mono_time;
}
postEvents();
prev_newest_event_ts = all_events_.back()->mono_time;
}
void LiveStream::seekTo(double sec) {
sec = std::max(0.0, sec);
first_update_ts = nanos_since_boot();
first_event_ts = std::min<uint64_t>(sec * 1e9 + begin_event_ts, lastEventMonoTime());
current_event_ts = first_event_ts;
emit seekedTo((current_event_ts - begin_event_ts) / 1e9);
} }
void LiveStream::pause(bool pause) { void LiveStream::pause(bool pause) {
pause_ = pause; paused_ = pause;
emit paused(); emit(pause ? paused() : resume());
} }

@ -1,5 +1,7 @@
#pragma once #pragma once
#include <QBasicTimer>
#include "tools/cabana/streams/abstractstream.h" #include "tools/cabana/streams/abstractstream.h"
class LiveStream : public AbstractStream { class LiveStream : public AbstractStream {
@ -8,24 +10,25 @@ class LiveStream : public AbstractStream {
public: public:
LiveStream(QObject *parent); LiveStream(QObject *parent);
virtual ~LiveStream(); virtual ~LiveStream();
inline double routeStartTime() const override { return start_ts / (double)1e9; } inline double routeStartTime() const override { return begin_event_ts / 1e9; }
inline double currentSec() const override { return (current_ts - start_ts) / (double)1e9; } inline double currentSec() const override { return (current_event_ts - begin_event_ts) / 1e9; }
void setSpeed(float speed) override { speed_ = std::min<float>(1.0, speed); } void setSpeed(float speed) override { speed_ = speed; }
double getSpeed() override { return speed_; } double getSpeed() override { return speed_; }
bool isPaused() const override { return pause_; } bool isPaused() const override { return paused_; }
void pause(bool pause) override; void pause(bool pause) override;
void startStreamThread(); void seekTo(double sec) override;
protected: protected:
virtual void handleEvent(Event *evt);
virtual void streamThread() = 0; virtual void streamThread() = 0;
void process(QHash<MessageId, CanData> *) override; void startStreamThread();
void handleEvent(const char *data, const size_t size);
private:
void startUpdateTimer();
void timerEvent(QTimerEvent *event) override;
void updateEvents();
struct Msg { struct Msg {
Msg(Message *m) {
event = ::new Event(aligned_buf.align(m));
delete m;
}
Msg(const char *data, const size_t size) { Msg(const char *data, const size_t size) {
event = ::new Event(aligned_buf.align(data, size)); event = ::new Event(aligned_buf.align(data, size));
} }
@ -34,16 +37,19 @@ protected:
AlignedBuffer aligned_buf; AlignedBuffer aligned_buf;
}; };
mutable std::mutex lock; std::mutex lock;
std::vector<Event *> received; QThread *stream_thread;
std::deque<Msg> messages; std::vector<Event *> receivedEvents;
std::atomic<uint64_t> start_ts = 0; std::deque<Msg> receivedMessages;
std::atomic<uint64_t> current_ts = 0;
std::atomic<float> speed_ = 1;
std::atomic<bool> pause_ = false;
uint64_t last_update_ts = 0;
std::unique_ptr<std::ofstream> fs; std::unique_ptr<std::ofstream> fs;
int timer_id;
QThread *stream_thread; QBasicTimer update_timer;
uint64_t begin_event_ts = 0;
uint64_t current_event_ts = 0;
uint64_t first_event_ts = 0;
uint64_t first_update_ts = 0;
double speed_ = 1;
bool paused_ = false;
}; };

@ -79,11 +79,8 @@ void PandaStream::streamThread() {
canData[i].setSrc(raw_can_data[i].src); canData[i].setSrc(raw_can_data[i].src);
} }
{ auto bytes = msg.toBytes();
std::lock_guard lk(lock); handleEvent((const char*)bytes.begin(), bytes.size());
auto bytes = msg.toBytes();
handleEvent(messages.emplace_back((const char*)bytes.begin(), bytes.size()).event);
}
panda->send_heartbeat(false); panda->send_heartbeat(false);
} }

@ -8,7 +8,7 @@
#include "common/prefix.h" #include "common/prefix.h"
ReplayStream::ReplayStream(QObject *parent) : AbstractStream(parent, false) { ReplayStream::ReplayStream(QObject *parent) : AbstractStream(parent) {
QObject::connect(&settings, &Settings::changed, [this]() { QObject::connect(&settings, &Settings::changed, [this]() {
if (replay) replay->setSegmentCacheLimit(settings.max_cached_minutes); if (replay) replay->setSegmentCacheLimit(settings.max_cached_minutes);
}); });
@ -48,8 +48,21 @@ bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint
} }
bool ReplayStream::eventFilter(const Event *event) { bool ReplayStream::eventFilter(const Event *event) {
static double prev_update_ts = 0;
// delay posting CAN message if UI thread is busy
if (event->which == cereal::Event::Which::CAN) { if (event->which == cereal::Event::Which::CAN) {
updateEvent(event); double current_sec = event->mono_time / 1e9 - routeStartTime();
for (const auto &c : event->event.getCan()) {
MessageId id = {.source = c.getSrc(), .address = c.getAddress()};
const auto dat = c.getDat();
updateEvent(id, current_sec, (const uint8_t*)dat.begin(), dat.size());
}
double ts = millis_since_boot();
if ((ts - prev_update_ts) > (1000.0 / settings.fps)) {
if (postEvents()) {
prev_update_ts = ts;
}
}
} }
return true; return true;
} }

@ -1,7 +1,6 @@
#pragma once #pragma once
#include "tools/cabana/streams/abstractstream.h" #include "tools/cabana/streams/abstractstream.h"
#include "tools/cabana/settings.h"
class ReplayStream : public AbstractStream { class ReplayStream : public AbstractStream {
Q_OBJECT Q_OBJECT
@ -15,16 +14,13 @@ public:
inline QString routeName() const override { return replay->route()->name(); } inline QString routeName() const override { return replay->route()->name(); }
inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); } inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); }
inline VisionStreamType visionStreamType() const override { return replay->hasFlag(REPLAY_FLAG_ECAM) ? VISION_STREAM_WIDE_ROAD : VISION_STREAM_ROAD; } inline VisionStreamType visionStreamType() const override { return replay->hasFlag(REPLAY_FLAG_ECAM) ? VISION_STREAM_WIDE_ROAD : VISION_STREAM_ROAD; }
inline double totalSeconds() const override { return replay->totalSeconds(); }
inline double routeStartTime() const override { return replay->routeStartTime() / (double)1e9; } inline double routeStartTime() const override { return replay->routeStartTime() / (double)1e9; }
inline double currentSec() const override { return replay->currentSeconds(); } inline double currentSec() const override { return replay->currentSeconds(); }
inline QDateTime currentDateTime() const override { return replay->currentDateTime(); }
inline const Route *route() const override { return replay->route(); } inline const Route *route() const override { return replay->route(); }
inline void setSpeed(float speed) override { replay->setSpeed(speed); } inline void setSpeed(float speed) override { replay->setSpeed(speed); }
inline float getSpeed() const { return replay->getSpeed(); } inline float getSpeed() const { return replay->getSpeed(); }
inline bool isPaused() const override { return replay->isPaused(); } inline bool isPaused() const override { return replay->isPaused(); }
void pause(bool pause) override; void pause(bool pause) override;
const std::vector<Event*> *rawEvents() const override { return replay->events(); }
inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() override { return replay->getTimeline(); } inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() override { return replay->getTimeline(); }
static AbstractOpenStreamWidget *widget(AbstractStream **stream); static AbstractOpenStreamWidget *widget(AbstractStream **stream);

@ -122,33 +122,26 @@ QList<FindSimilarBitsDlg::mismatched_struct> FindSimilarBitsDlg::calcBits(uint8_
int bit_idx, uint8_t find_bus, bool equal, int min_msgs_cnt) { int bit_idx, uint8_t find_bus, bool equal, int min_msgs_cnt) {
QHash<uint32_t, QVector<uint32_t>> mismatches; QHash<uint32_t, QVector<uint32_t>> mismatches;
QHash<uint32_t, uint32_t> msg_count; QHash<uint32_t, uint32_t> msg_count;
auto events = can->rawEvents(); const auto &events = can->allEvents();
int bit_to_find = -1; int bit_to_find = -1;
for (auto e : *events) { for (const CanEvent *e : events) {
if (e->which == cereal::Event::Which::CAN) { if (e->src == bus) {
for (const auto &c : e->event.getCan()) { if (e->address == selected_address && e->size > byte_idx) {
uint8_t src = c.getSrc(); bit_to_find = ((e->dat[byte_idx] >> (7 - bit_idx)) & 1) != 0;
uint32_t address = c.getAddress(); }
const auto dat = c.getDat(); }
if (src == bus) { if (e->src == find_bus) {
if (address == selected_address && dat.size() > byte_idx) { ++msg_count[e->address];
bit_to_find = ((dat[byte_idx] >> (7 - bit_idx)) & 1) != 0; if (bit_to_find == -1) continue;
}
} auto &mismatched = mismatches[e->address];
if (src == find_bus) { if (mismatched.size() < e->size * 8) {
++msg_count[address]; mismatched.resize(e->size * 8);
if (bit_to_find == -1) continue; }
for (int i = 0; i < e->size; ++i) {
auto &mismatched = mismatches[address]; for (int j = 0; j < 8; ++j) {
if (mismatched.size() < dat.size() * 8) { int bit = ((e->dat[i] >> (7 - j)) & 1) != 0;
mismatched.resize(dat.size() * 8); mismatched[i * 8 + j] += equal ? (bit != bit_to_find) : (bit == bit_to_find);
}
for (int i = 0; i < dat.size(); ++i) {
for (int j = 0; j < 8; ++j) {
int bit = ((dat[i] >> (7 - j)) & 1) != 0;
mismatched[i * 8 + j] += equal ? (bit != bit_to_find) : (bit == bit_to_find);
}
}
} }
} }
} }

@ -48,8 +48,6 @@ VideoWidget::VideoWidget(QWidget *parent) : QFrame(parent) {
QButtonGroup *group = new QButtonGroup(this); QButtonGroup *group = new QButtonGroup(this);
group->setExclusive(true); group->setExclusive(true);
for (float speed : {0.1, 0.5, 1., 2.}) { for (float speed : {0.1, 0.5, 1., 2.}) {
if (can->liveStreaming() && speed > 1) continue;
QPushButton *btn = new QPushButton(QString("%1x").arg(speed), this); QPushButton *btn = new QPushButton(QString("%1x").arg(speed), this);
btn->setCheckable(true); btn->setCheckable(true);
QObject::connect(btn, &QPushButton::clicked, [=]() { can->setSpeed(speed); }); QObject::connect(btn, &QPushButton::clicked, [=]() { can->setSpeed(speed); });
@ -117,7 +115,7 @@ QWidget *VideoWidget::createCameraWidget() {
QObject::connect(slider, &QSlider::valueChanged, [=](int value) { time_label->setText(utils::formatSeconds(value / 1000)); }); QObject::connect(slider, &QSlider::valueChanged, [=](int value) { time_label->setText(utils::formatSeconds(value / 1000)); });
QObject::connect(cam_widget, &CameraWidget::clicked, []() { can->pause(!can->isPaused()); }); QObject::connect(cam_widget, &CameraWidget::clicked, []() { can->pause(!can->isPaused()); });
QObject::connect(can, &AbstractStream::updated, this, &VideoWidget::updateState); QObject::connect(can, &AbstractStream::updated, this, &VideoWidget::updateState);
QObject::connect(can, &AbstractStream::streamStarted, [this]() { QObject::connect(can, &AbstractStream::eventsMerged, [this]() {
end_time_label->setText(utils::formatSeconds(can->totalSeconds())); end_time_label->setText(utils::formatSeconds(can->totalSeconds()));
slider->setRange(0, can->totalSeconds() * 1000); slider->setRange(0, can->totalSeconds() * 1000);
}); });
@ -125,6 +123,8 @@ QWidget *VideoWidget::createCameraWidget() {
} }
void VideoWidget::rangeChanged(double min, double max, bool is_zoomed) { void VideoWidget::rangeChanged(double min, double max, bool is_zoomed) {
if (can->liveStreaming()) return;
if (!is_zoomed) { if (!is_zoomed) {
min = 0; min = 0;
max = can->totalSeconds(); max = can->totalSeconds();

Loading…
Cancel
Save