Cabana: support live streaming (#26946)
* support live streaming * update live stream's time * cleanup stream classes * disable video control in live streaming mode * emit streamStarted() in LiveStream::streamThread * disable some features in live streaming mode * refactor charts to support live streaming mode * disable dynamic mode checkbox in live streaming mode * updateDispalyRange * thread safe events * TODO: add support for ZMQ * atomic time stamp * only keep settings.cached_segment_limit*60 seconds data in liveStream * make charts work better in live mode * cleanup ChartView * fix toolbar * cleanup cleanup * disable openpilotPrefix and useOpenGL on macos * add comment * exit gracefully * support ZMQ * use ::operator new/delete * cleanup streams * cleanup * align stream buffers * check looping back * check if series is empty * cleanup * add TODO: write stream to log file to replay it * upper_bound * remove class member event_range * change default settings value * cleanup updateDisplayrange * fix merge errorpull/27008/head
parent
fd4dc109e1
commit
f9490739ab
27 changed files with 395 additions and 238 deletions
@ -1,86 +0,0 @@ |
||||
#pragma once |
||||
|
||||
#include <atomic> |
||||
|
||||
#include <QColor> |
||||
#include <QHash> |
||||
#include <QApplication> |
||||
|
||||
#include "opendbc/can/common_dbc.h" |
||||
#include "tools/cabana/settings.h" |
||||
#include "tools/replay/replay.h" |
||||
|
||||
struct CanData { |
||||
double ts = 0.; |
||||
uint8_t src = 0; |
||||
uint32_t address = 0; |
||||
uint32_t count = 0; |
||||
uint32_t freq = 0; |
||||
QByteArray dat; |
||||
QList<QColor> colors; |
||||
}; |
||||
|
||||
class CANMessages : public QObject { |
||||
Q_OBJECT |
||||
|
||||
public: |
||||
CANMessages(QObject *parent); |
||||
~CANMessages(); |
||||
bool loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags = REPLAY_FLAG_NONE); |
||||
void seekTo(double ts); |
||||
bool eventFilter(const Event *event); |
||||
|
||||
inline QString routeName() const { return replay->route()->name(); } |
||||
inline QString carFingerprint() const { return replay->carFingerprint().c_str(); } |
||||
inline VisionStreamType visionStreamType() const { return replay->hasFlag(REPLAY_FLAG_ECAM) ? VISION_STREAM_WIDE_ROAD : VISION_STREAM_ROAD; } |
||||
inline double totalSeconds() const { return replay->totalSeconds(); } |
||||
inline double routeStartTime() const { return replay->routeStartTime() / (double)1e9; } |
||||
inline double currentSec() const { return replay->currentSeconds(); } |
||||
inline QDateTime currentDateTime() const { return replay->currentDateTime(); } |
||||
inline const CanData &lastMessage(const QString &id) { return can_msgs[id]; } |
||||
|
||||
inline const Route* route() const { return replay->route(); } |
||||
inline const std::vector<Event *> *events() const { return replay->events(); } |
||||
inline void setSpeed(float speed) { replay->setSpeed(speed); } |
||||
inline bool isPaused() const { return replay->isPaused(); } |
||||
void pause(bool pause); |
||||
inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { return replay->getTimeline(); } |
||||
|
||||
signals: |
||||
void paused(); |
||||
void resume(); |
||||
void seekedTo(double sec); |
||||
void streamStarted(); |
||||
void eventsMerged(); |
||||
void updated(); |
||||
void msgsReceived(const QHash<QString, CanData> *); |
||||
void received(QHash<QString, CanData> *); |
||||
|
||||
public: |
||||
QMap<QString, CanData> can_msgs; |
||||
|
||||
protected: |
||||
void process(QHash<QString, CanData> *); |
||||
void settingChanged(); |
||||
|
||||
Replay *replay = nullptr; |
||||
std::atomic<double> counters_begin_sec = 0; |
||||
std::atomic<bool> processing = false; |
||||
QHash<QString, uint32_t> counters; |
||||
}; |
||||
|
||||
inline QString toHex(const QByteArray &dat) { |
||||
return dat.toHex(' ').toUpper(); |
||||
} |
||||
inline char toHex(uint value) { |
||||
return "0123456789ABCDEF"[value & 0xF]; |
||||
} |
||||
|
||||
inline const QString &getColor(int i) { |
||||
// TODO: add more colors
|
||||
static const QString SIGNAL_COLORS[] = {"#9FE2BF", "#40E0D0", "#6495ED", "#CCCCFF", "#FF7F50", "#FFBF00"}; |
||||
return SIGNAL_COLORS[i % std::size(SIGNAL_COLORS)]; |
||||
} |
||||
|
||||
// A global pointer referring to the unique CANMessages object
|
||||
extern CANMessages *can; |
@ -0,0 +1,76 @@ |
||||
#pragma once |
||||
|
||||
#include <atomic> |
||||
|
||||
#include <QColor> |
||||
#include <QHash> |
||||
|
||||
#include "tools/cabana/settings.h" |
||||
#include "tools/replay/replay.h" |
||||
|
||||
struct CanData { |
||||
double ts = 0.; |
||||
uint8_t src = 0; |
||||
uint32_t address = 0; |
||||
uint32_t count = 0; |
||||
uint32_t freq = 0; |
||||
QByteArray dat; |
||||
QList<QColor> colors; |
||||
}; |
||||
|
||||
class AbstractStream : public QObject { |
||||
Q_OBJECT |
||||
|
||||
public: |
||||
AbstractStream(QObject *parent, bool is_live_streaming); |
||||
virtual ~AbstractStream() {}; |
||||
inline bool liveStreaming() const { return is_live_streaming; } |
||||
virtual void seekTo(double ts) {} |
||||
virtual QString routeName() const = 0; |
||||
virtual QString carFingerprint() const { return ""; } |
||||
virtual double totalSeconds() const { return 0; } |
||||
virtual double routeStartTime() const { return 0; } |
||||
virtual double currentSec() const = 0; |
||||
virtual QDateTime currentDateTime() const { return {}; } |
||||
virtual const CanData &lastMessage(const QString &id) { return can_msgs[id]; } |
||||
virtual VisionStreamType visionStreamType() const { return VISION_STREAM_ROAD; } |
||||
virtual const Route *route() const { return nullptr; } |
||||
virtual const std::vector<Event *> *events() const = 0; |
||||
virtual void setSpeed(float speed) {} |
||||
virtual bool isPaused() const { return false; } |
||||
virtual void pause(bool pause) {} |
||||
virtual const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { return {}; } |
||||
|
||||
signals: |
||||
void paused(); |
||||
void resume(); |
||||
void seekedTo(double sec); |
||||
void streamStarted(); |
||||
void eventsMerged(); |
||||
void updated(); |
||||
void msgsReceived(const QHash<QString, CanData> *); |
||||
void received(QHash<QString, CanData> *); |
||||
|
||||
public: |
||||
QMap<QString, CanData> can_msgs; |
||||
|
||||
protected: |
||||
void process(QHash<QString, CanData> *); |
||||
bool updateEvent(const Event *event); |
||||
|
||||
bool is_live_streaming = false; |
||||
std::atomic<double> counters_begin_sec = 0; |
||||
std::atomic<bool> processing = false; |
||||
QHash<QString, uint32_t> counters; |
||||
}; |
||||
|
||||
inline QString toHex(const QByteArray &dat) { return dat.toHex(' ').toUpper(); } |
||||
inline char toHex(uint value) { return "0123456789ABCDEF"[value & 0xF]; } |
||||
inline const QString &getColor(int i) { |
||||
// TODO: add more colors
|
||||
static const QString SIGNAL_COLORS[] = {"#9FE2BF", "#40E0D0", "#6495ED", "#CCCCFF", "#FF7F50", "#FFBF00"}; |
||||
return SIGNAL_COLORS[i % std::size(SIGNAL_COLORS)]; |
||||
} |
||||
|
||||
// A global pointer referring to the unique AbstractStream object
|
||||
extern AbstractStream *can; |
@ -0,0 +1,71 @@ |
||||
#include "tools/cabana/streams/livestream.h" |
||||
|
||||
LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) { |
||||
if (!zmq_address.isEmpty()) { |
||||
setenv("ZMQ", "1", 1); |
||||
} |
||||
updateCachedNS(); |
||||
QObject::connect(&settings, &Settings::changed, this, &LiveStream::updateCachedNS); |
||||
stream_thread = new QThread(this); |
||||
QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); }); |
||||
QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater); |
||||
stream_thread->start(); |
||||
} |
||||
|
||||
LiveStream::~LiveStream() { |
||||
stream_thread->requestInterruption(); |
||||
stream_thread->quit(); |
||||
stream_thread->wait(); |
||||
for (Event *e : can_events) ::delete e; |
||||
for (auto m : messages) delete m; |
||||
} |
||||
|
||||
void LiveStream::streamThread() { |
||||
std::unique_ptr<Context> context(Context::create()); |
||||
std::string address = zmq_address.isEmpty() ? "127.0.0.1" : zmq_address.toStdString(); |
||||
std::unique_ptr<SubSocket> sock(SubSocket::create(context.get(), "can", address)); |
||||
assert(sock != NULL); |
||||
sock->setTimeout(50); |
||||
|
||||
// run as fast as messages come in
|
||||
while (!QThread::currentThread()->isInterruptionRequested()) { |
||||
Message *msg = sock->receive(true); |
||||
if (!msg) { |
||||
QThread::msleep(50); |
||||
continue; |
||||
} |
||||
AlignedBuffer *buf = messages.emplace_back(new AlignedBuffer()); |
||||
Event *evt = ::new Event(buf->align(msg)); |
||||
delete msg; |
||||
|
||||
{ |
||||
std::lock_guard lk(lock); |
||||
can_events.push_back(evt); |
||||
if ((evt->mono_time - can_events.front()->mono_time) > cache_ns) { |
||||
::delete can_events.front(); |
||||
delete messages.front(); |
||||
can_events.pop_front(); |
||||
messages.pop_front(); |
||||
} |
||||
} |
||||
if (start_ts == 0) { |
||||
start_ts = evt->mono_time; |
||||
emit streamStarted(); |
||||
} |
||||
current_ts = evt->mono_time; |
||||
if (start_ts > current_ts) { |
||||
qDebug() << "stream is looping back to old time stamp"; |
||||
start_ts = current_ts.load(); |
||||
} |
||||
updateEvent(evt); |
||||
// TODO: write stream to log file to replay it with cabana --data_dir flag.
|
||||
} |
||||
} |
||||
|
||||
const std::vector<Event *> *LiveStream::events() const { |
||||
std::lock_guard lk(lock); |
||||
events_vector.clear(); |
||||
events_vector.reserve(can_events.size()); |
||||
std::copy(can_events.begin(), can_events.end(), std::back_inserter(events_vector)); |
||||
return &events_vector; |
||||
} |
@ -0,0 +1,31 @@ |
||||
#pragma once |
||||
|
||||
#include "tools/cabana/streams/abstractstream.h" |
||||
|
||||
class LiveStream : public AbstractStream { |
||||
Q_OBJECT |
||||
|
||||
public: |
||||
LiveStream(QObject *parent, QString address = {}); |
||||
~LiveStream(); |
||||
inline QString routeName() const override { |
||||
return QString("Live Streaming From %1").arg(zmq_address.isEmpty() ? "127.0.0.1" : zmq_address); |
||||
} |
||||
inline double routeStartTime() const override { return start_ts / (double)1e9; } |
||||
inline double currentSec() const override { return (current_ts - start_ts) / (double)1e9; } |
||||
const std::vector<Event *> *events() const override; |
||||
|
||||
protected: |
||||
void streamThread(); |
||||
void updateCachedNS() { cache_ns = (settings.cached_segment_limit * 60) * 1e9; } |
||||
|
||||
mutable std::mutex lock; |
||||
mutable std::vector<Event *> events_vector; |
||||
std::deque<Event *> can_events; |
||||
std::deque<AlignedBuffer *> messages; |
||||
std::atomic<uint64_t> start_ts = 0; |
||||
std::atomic<uint64_t> current_ts = 0; |
||||
std::atomic<uint64_t> cache_ns = 0; |
||||
const QString zmq_address; |
||||
QThread *stream_thread; |
||||
}; |
@ -0,0 +1,54 @@ |
||||
#include "tools/cabana/streams/replaystream.h" |
||||
|
||||
#include "tools/cabana/dbcmanager.h" |
||||
|
||||
ReplayStream::ReplayStream(QObject *parent) : AbstractStream(parent, false) { |
||||
QObject::connect(&settings, &Settings::changed, [this]() { |
||||
if (replay) replay->setSegmentCacheLimit(settings.cached_segment_limit); |
||||
}); |
||||
} |
||||
|
||||
ReplayStream::~ReplayStream() { |
||||
if (replay) replay->stop(); |
||||
} |
||||
|
||||
static bool event_filter(const Event *e, void *opaque) { |
||||
return ((ReplayStream *)opaque)->eventFilter(e); |
||||
} |
||||
|
||||
bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags) { |
||||
replay = new Replay(route, {"can", "roadEncodeIdx", "wideRoadEncodeIdx", "carParams"}, {}, nullptr, replay_flags, data_dir, this); |
||||
replay->setSegmentCacheLimit(settings.cached_segment_limit); |
||||
replay->installEventFilter(event_filter, this); |
||||
QObject::connect(replay, &Replay::seekedTo, this, &AbstractStream::seekedTo); |
||||
QObject::connect(replay, &Replay::segmentsMerged, this, &AbstractStream::eventsMerged); |
||||
QObject::connect(replay, &Replay::streamStarted, this, &AbstractStream::streamStarted); |
||||
if (replay->load()) { |
||||
const auto &segments = replay->route()->segments(); |
||||
if (std::none_of(segments.begin(), segments.end(), [](auto &s) { return s.second.rlog.length() > 0; })) { |
||||
qWarning() << "no rlogs in route" << route; |
||||
return false; |
||||
} |
||||
replay->start(); |
||||
return true; |
||||
} |
||||
return false; |
||||
} |
||||
|
||||
bool ReplayStream::eventFilter(const Event *event) { |
||||
if (event->which == cereal::Event::Which::CAN) { |
||||
updateEvent(event); |
||||
} |
||||
return true; |
||||
} |
||||
|
||||
void ReplayStream::seekTo(double ts) { |
||||
replay->seekTo(std::max(double(0), ts), false); |
||||
counters_begin_sec = 0; |
||||
emit updated(); |
||||
} |
||||
|
||||
void ReplayStream::pause(bool pause) { |
||||
replay->pause(pause); |
||||
emit(pause ? paused() : resume()); |
||||
} |
@ -0,0 +1,32 @@ |
||||
#pragma once |
||||
|
||||
#include "opendbc/can/common_dbc.h" |
||||
#include "tools/cabana/streams/abstractstream.h" |
||||
#include "tools/cabana/settings.h" |
||||
|
||||
class ReplayStream : public AbstractStream { |
||||
Q_OBJECT |
||||
|
||||
public: |
||||
ReplayStream(QObject *parent); |
||||
~ReplayStream(); |
||||
bool loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags = REPLAY_FLAG_NONE); |
||||
bool eventFilter(const Event *event); |
||||
void seekTo(double ts) override; |
||||
inline QString routeName() const override { return replay->route()->name(); } |
||||
inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); } |
||||
inline VisionStreamType visionStreamType() const override { return replay->hasFlag(REPLAY_FLAG_ECAM) ? VISION_STREAM_WIDE_ROAD : VISION_STREAM_ROAD; } |
||||
inline double totalSeconds() const override { return replay->totalSeconds(); } |
||||
inline double routeStartTime() const override { return replay->routeStartTime() / (double)1e9; } |
||||
inline double currentSec() const override { return replay->currentSeconds(); } |
||||
inline QDateTime currentDateTime() const override { return replay->currentDateTime(); } |
||||
inline const Route *route() const override { return replay->route(); } |
||||
inline const std::vector<Event *> *events() const override { return replay->events(); } |
||||
inline void setSpeed(float speed) override { replay->setSpeed(speed); } |
||||
inline bool isPaused() const override { return replay->isPaused(); } |
||||
void pause(bool pause) override; |
||||
inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() override { return replay->getTimeline(); } |
||||
|
||||
private: |
||||
Replay *replay = nullptr; |
||||
}; |
Loading…
Reference in new issue