Cabana: support live streaming (#26946)
	
		
	
				
					
				
			* support live streaming
* update live stream's time
* cleanup stream classes
* disable video control in live streaming mode
* emit streamStarted() in LiveStream::streamThread
* disable some features in live streaming mode
* refactor charts to support live streaming mode
* disable dynamic mode checkbox in live streaming mode
* updateDispalyRange
* thread safe events
* TODO: add support for ZMQ
* atomic time stamp
* only keep settings.cached_segment_limit*60  seconds data in liveStream
* make charts work better in live mode
* cleanup ChartView
* fix toolbar
* cleanup
cleanup
* disable openpilotPrefix and useOpenGL on macos
* add comment
* exit gracefully
* support ZMQ
* use ::operator new/delete
* cleanup streams
* cleanup
* align stream buffers
* check looping back
* check if series is empty
* cleanup
* add TODO: write stream to log file to replay it
* upper_bound
* remove class member event_range
* change default settings value
* cleanup updateDisplayrange
* fix merge error
old-commit-hash: f9490739ab
			
			
				vw-mqb-aeb
			
			
		
							parent
							
								
									26ca64da0f
								
							
						
					
					
						commit
						504453c3e4
					
				
				 27 changed files with 395 additions and 238 deletions
			
			
		@ -1,86 +0,0 @@ | 
				
			||||
#pragma once | 
				
			||||
 | 
				
			||||
#include <atomic> | 
				
			||||
 | 
				
			||||
#include <QColor> | 
				
			||||
#include <QHash> | 
				
			||||
#include <QApplication> | 
				
			||||
 | 
				
			||||
#include "opendbc/can/common_dbc.h" | 
				
			||||
#include "tools/cabana/settings.h" | 
				
			||||
#include "tools/replay/replay.h" | 
				
			||||
 | 
				
			||||
struct CanData { | 
				
			||||
  double ts = 0.; | 
				
			||||
  uint8_t src = 0; | 
				
			||||
  uint32_t address = 0; | 
				
			||||
  uint32_t count = 0; | 
				
			||||
  uint32_t freq = 0; | 
				
			||||
  QByteArray dat; | 
				
			||||
  QList<QColor> colors; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class CANMessages : public QObject { | 
				
			||||
  Q_OBJECT | 
				
			||||
 | 
				
			||||
public: | 
				
			||||
  CANMessages(QObject *parent); | 
				
			||||
  ~CANMessages(); | 
				
			||||
  bool loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags = REPLAY_FLAG_NONE); | 
				
			||||
  void seekTo(double ts); | 
				
			||||
  bool eventFilter(const Event *event); | 
				
			||||
 | 
				
			||||
  inline QString routeName() const { return replay->route()->name(); } | 
				
			||||
  inline QString carFingerprint() const { return replay->carFingerprint().c_str(); } | 
				
			||||
  inline VisionStreamType visionStreamType() const { return replay->hasFlag(REPLAY_FLAG_ECAM) ? VISION_STREAM_WIDE_ROAD : VISION_STREAM_ROAD; } | 
				
			||||
  inline double totalSeconds() const { return replay->totalSeconds(); } | 
				
			||||
  inline double routeStartTime() const { return replay->routeStartTime() / (double)1e9; } | 
				
			||||
  inline double currentSec() const { return replay->currentSeconds(); } | 
				
			||||
  inline QDateTime currentDateTime() const { return replay->currentDateTime(); } | 
				
			||||
  inline const CanData &lastMessage(const QString &id) { return can_msgs[id]; } | 
				
			||||
 | 
				
			||||
  inline const Route* route() const { return replay->route(); } | 
				
			||||
  inline const std::vector<Event *> *events() const { return replay->events(); } | 
				
			||||
  inline void setSpeed(float speed) { replay->setSpeed(speed); } | 
				
			||||
  inline bool isPaused() const { return replay->isPaused(); } | 
				
			||||
  void pause(bool pause); | 
				
			||||
  inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { return replay->getTimeline(); } | 
				
			||||
 | 
				
			||||
signals: | 
				
			||||
  void paused(); | 
				
			||||
  void resume(); | 
				
			||||
  void seekedTo(double sec); | 
				
			||||
  void streamStarted(); | 
				
			||||
  void eventsMerged(); | 
				
			||||
  void updated(); | 
				
			||||
  void msgsReceived(const QHash<QString, CanData> *); | 
				
			||||
  void received(QHash<QString, CanData> *); | 
				
			||||
 | 
				
			||||
public: | 
				
			||||
  QMap<QString, CanData> can_msgs; | 
				
			||||
 | 
				
			||||
protected: | 
				
			||||
  void process(QHash<QString, CanData> *); | 
				
			||||
  void settingChanged(); | 
				
			||||
 | 
				
			||||
  Replay *replay = nullptr; | 
				
			||||
  std::atomic<double> counters_begin_sec = 0; | 
				
			||||
  std::atomic<bool> processing = false; | 
				
			||||
  QHash<QString, uint32_t> counters; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
inline QString toHex(const QByteArray &dat) { | 
				
			||||
  return dat.toHex(' ').toUpper(); | 
				
			||||
} | 
				
			||||
inline char toHex(uint value) { | 
				
			||||
  return "0123456789ABCDEF"[value & 0xF]; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
inline const QString &getColor(int i) { | 
				
			||||
  // TODO: add more colors
 | 
				
			||||
  static const QString SIGNAL_COLORS[] = {"#9FE2BF", "#40E0D0", "#6495ED", "#CCCCFF", "#FF7F50", "#FFBF00"}; | 
				
			||||
  return SIGNAL_COLORS[i % std::size(SIGNAL_COLORS)]; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
// A global pointer referring to the unique CANMessages object
 | 
				
			||||
extern CANMessages *can; | 
				
			||||
@ -0,0 +1,76 @@ | 
				
			||||
#pragma once | 
				
			||||
 | 
				
			||||
#include <atomic> | 
				
			||||
 | 
				
			||||
#include <QColor> | 
				
			||||
#include <QHash> | 
				
			||||
 | 
				
			||||
#include "tools/cabana/settings.h" | 
				
			||||
#include "tools/replay/replay.h" | 
				
			||||
 | 
				
			||||
struct CanData { | 
				
			||||
  double ts = 0.; | 
				
			||||
  uint8_t src = 0; | 
				
			||||
  uint32_t address = 0; | 
				
			||||
  uint32_t count = 0; | 
				
			||||
  uint32_t freq = 0; | 
				
			||||
  QByteArray dat; | 
				
			||||
  QList<QColor> colors; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class AbstractStream : public QObject { | 
				
			||||
  Q_OBJECT | 
				
			||||
 | 
				
			||||
public: | 
				
			||||
  AbstractStream(QObject *parent, bool is_live_streaming); | 
				
			||||
  virtual ~AbstractStream() {}; | 
				
			||||
  inline bool liveStreaming() const { return is_live_streaming; } | 
				
			||||
  virtual void seekTo(double ts) {} | 
				
			||||
  virtual QString routeName() const = 0; | 
				
			||||
  virtual QString carFingerprint() const { return ""; } | 
				
			||||
  virtual double totalSeconds() const { return 0; } | 
				
			||||
  virtual double routeStartTime() const { return 0; } | 
				
			||||
  virtual double currentSec() const = 0; | 
				
			||||
  virtual QDateTime currentDateTime() const { return {}; } | 
				
			||||
  virtual const CanData &lastMessage(const QString &id) { return can_msgs[id]; } | 
				
			||||
  virtual VisionStreamType visionStreamType() const { return VISION_STREAM_ROAD; } | 
				
			||||
  virtual const Route *route() const { return nullptr; } | 
				
			||||
  virtual const std::vector<Event *> *events() const = 0; | 
				
			||||
  virtual void setSpeed(float speed) {} | 
				
			||||
  virtual bool isPaused() const { return false; } | 
				
			||||
  virtual void pause(bool pause) {} | 
				
			||||
  virtual const std::vector<std::tuple<int, int, TimelineType>> getTimeline() { return {}; } | 
				
			||||
 | 
				
			||||
signals: | 
				
			||||
  void paused(); | 
				
			||||
  void resume(); | 
				
			||||
  void seekedTo(double sec); | 
				
			||||
  void streamStarted(); | 
				
			||||
  void eventsMerged(); | 
				
			||||
  void updated(); | 
				
			||||
  void msgsReceived(const QHash<QString, CanData> *); | 
				
			||||
  void received(QHash<QString, CanData> *); | 
				
			||||
 | 
				
			||||
public: | 
				
			||||
  QMap<QString, CanData> can_msgs; | 
				
			||||
 | 
				
			||||
protected: | 
				
			||||
  void process(QHash<QString, CanData> *); | 
				
			||||
  bool updateEvent(const Event *event); | 
				
			||||
 | 
				
			||||
  bool is_live_streaming = false; | 
				
			||||
  std::atomic<double> counters_begin_sec = 0; | 
				
			||||
  std::atomic<bool> processing = false; | 
				
			||||
  QHash<QString, uint32_t> counters; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
inline QString toHex(const QByteArray &dat) { return dat.toHex(' ').toUpper(); } | 
				
			||||
inline char toHex(uint value) { return "0123456789ABCDEF"[value & 0xF]; } | 
				
			||||
inline const QString &getColor(int i) { | 
				
			||||
  // TODO: add more colors
 | 
				
			||||
  static const QString SIGNAL_COLORS[] = {"#9FE2BF", "#40E0D0", "#6495ED", "#CCCCFF", "#FF7F50", "#FFBF00"}; | 
				
			||||
  return SIGNAL_COLORS[i % std::size(SIGNAL_COLORS)]; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
// A global pointer referring to the unique AbstractStream object
 | 
				
			||||
extern AbstractStream *can; | 
				
			||||
@ -0,0 +1,71 @@ | 
				
			||||
#include "tools/cabana/streams/livestream.h" | 
				
			||||
 | 
				
			||||
LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) { | 
				
			||||
  if (!zmq_address.isEmpty()) { | 
				
			||||
    setenv("ZMQ", "1", 1); | 
				
			||||
  } | 
				
			||||
  updateCachedNS(); | 
				
			||||
  QObject::connect(&settings, &Settings::changed, this, &LiveStream::updateCachedNS); | 
				
			||||
  stream_thread = new QThread(this); | 
				
			||||
  QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); }); | 
				
			||||
  QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater); | 
				
			||||
  stream_thread->start(); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
LiveStream::~LiveStream() { | 
				
			||||
  stream_thread->requestInterruption(); | 
				
			||||
  stream_thread->quit(); | 
				
			||||
  stream_thread->wait(); | 
				
			||||
  for (Event *e : can_events) ::delete e; | 
				
			||||
  for (auto m : messages) delete m; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void LiveStream::streamThread() { | 
				
			||||
  std::unique_ptr<Context> context(Context::create()); | 
				
			||||
  std::string address = zmq_address.isEmpty() ? "127.0.0.1" : zmq_address.toStdString(); | 
				
			||||
  std::unique_ptr<SubSocket> sock(SubSocket::create(context.get(), "can", address)); | 
				
			||||
  assert(sock != NULL); | 
				
			||||
  sock->setTimeout(50); | 
				
			||||
 | 
				
			||||
  // run as fast as messages come in
 | 
				
			||||
  while (!QThread::currentThread()->isInterruptionRequested()) { | 
				
			||||
    Message *msg = sock->receive(true); | 
				
			||||
    if (!msg) { | 
				
			||||
      QThread::msleep(50); | 
				
			||||
      continue; | 
				
			||||
    } | 
				
			||||
    AlignedBuffer *buf = messages.emplace_back(new AlignedBuffer()); | 
				
			||||
    Event *evt = ::new Event(buf->align(msg)); | 
				
			||||
    delete msg; | 
				
			||||
 | 
				
			||||
    { | 
				
			||||
      std::lock_guard lk(lock); | 
				
			||||
      can_events.push_back(evt); | 
				
			||||
      if ((evt->mono_time - can_events.front()->mono_time) > cache_ns) { | 
				
			||||
        ::delete can_events.front(); | 
				
			||||
        delete messages.front(); | 
				
			||||
        can_events.pop_front(); | 
				
			||||
        messages.pop_front(); | 
				
			||||
      } | 
				
			||||
    } | 
				
			||||
    if (start_ts == 0) { | 
				
			||||
      start_ts = evt->mono_time; | 
				
			||||
      emit streamStarted(); | 
				
			||||
    } | 
				
			||||
    current_ts = evt->mono_time; | 
				
			||||
    if (start_ts > current_ts) { | 
				
			||||
      qDebug() << "stream is looping back to old time stamp"; | 
				
			||||
      start_ts = current_ts.load(); | 
				
			||||
    } | 
				
			||||
    updateEvent(evt); | 
				
			||||
    // TODO: write stream to log file to replay it with cabana --data_dir flag.
 | 
				
			||||
  } | 
				
			||||
} | 
				
			||||
 | 
				
			||||
const std::vector<Event *> *LiveStream::events() const { | 
				
			||||
  std::lock_guard lk(lock); | 
				
			||||
  events_vector.clear(); | 
				
			||||
  events_vector.reserve(can_events.size()); | 
				
			||||
  std::copy(can_events.begin(), can_events.end(), std::back_inserter(events_vector)); | 
				
			||||
  return &events_vector; | 
				
			||||
} | 
				
			||||
@ -0,0 +1,31 @@ | 
				
			||||
#pragma once | 
				
			||||
 | 
				
			||||
#include "tools/cabana/streams/abstractstream.h" | 
				
			||||
 | 
				
			||||
class LiveStream : public AbstractStream { | 
				
			||||
  Q_OBJECT | 
				
			||||
 | 
				
			||||
public: | 
				
			||||
  LiveStream(QObject *parent, QString address = {}); | 
				
			||||
  ~LiveStream(); | 
				
			||||
  inline QString routeName() const override { | 
				
			||||
    return QString("Live Streaming From %1").arg(zmq_address.isEmpty() ? "127.0.0.1" : zmq_address); | 
				
			||||
  } | 
				
			||||
  inline double routeStartTime() const override { return start_ts / (double)1e9; } | 
				
			||||
  inline double currentSec() const override { return (current_ts - start_ts) / (double)1e9; } | 
				
			||||
  const std::vector<Event *> *events() const override; | 
				
			||||
 | 
				
			||||
protected: | 
				
			||||
  void streamThread(); | 
				
			||||
  void updateCachedNS() { cache_ns = (settings.cached_segment_limit * 60) * 1e9; } | 
				
			||||
 | 
				
			||||
  mutable std::mutex lock; | 
				
			||||
  mutable std::vector<Event *> events_vector; | 
				
			||||
  std::deque<Event *> can_events; | 
				
			||||
  std::deque<AlignedBuffer *> messages; | 
				
			||||
  std::atomic<uint64_t> start_ts = 0; | 
				
			||||
  std::atomic<uint64_t> current_ts = 0; | 
				
			||||
  std::atomic<uint64_t> cache_ns = 0; | 
				
			||||
  const QString zmq_address; | 
				
			||||
  QThread *stream_thread; | 
				
			||||
}; | 
				
			||||
@ -0,0 +1,54 @@ | 
				
			||||
#include "tools/cabana/streams/replaystream.h" | 
				
			||||
 | 
				
			||||
#include "tools/cabana/dbcmanager.h" | 
				
			||||
 | 
				
			||||
ReplayStream::ReplayStream(QObject *parent) : AbstractStream(parent, false) { | 
				
			||||
  QObject::connect(&settings, &Settings::changed, [this]() { | 
				
			||||
    if (replay) replay->setSegmentCacheLimit(settings.cached_segment_limit); | 
				
			||||
  }); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
ReplayStream::~ReplayStream() { | 
				
			||||
  if (replay) replay->stop(); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
static bool event_filter(const Event *e, void *opaque) { | 
				
			||||
  return ((ReplayStream *)opaque)->eventFilter(e); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags) { | 
				
			||||
  replay = new Replay(route, {"can", "roadEncodeIdx", "wideRoadEncodeIdx", "carParams"}, {}, nullptr, replay_flags, data_dir, this); | 
				
			||||
  replay->setSegmentCacheLimit(settings.cached_segment_limit); | 
				
			||||
  replay->installEventFilter(event_filter, this); | 
				
			||||
  QObject::connect(replay, &Replay::seekedTo, this, &AbstractStream::seekedTo); | 
				
			||||
  QObject::connect(replay, &Replay::segmentsMerged, this, &AbstractStream::eventsMerged); | 
				
			||||
  QObject::connect(replay, &Replay::streamStarted, this, &AbstractStream::streamStarted); | 
				
			||||
  if (replay->load()) { | 
				
			||||
    const auto &segments = replay->route()->segments(); | 
				
			||||
    if (std::none_of(segments.begin(), segments.end(), [](auto &s) { return s.second.rlog.length() > 0; })) { | 
				
			||||
      qWarning() << "no rlogs in route" << route; | 
				
			||||
      return false; | 
				
			||||
    } | 
				
			||||
    replay->start(); | 
				
			||||
    return true; | 
				
			||||
  } | 
				
			||||
  return false; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
bool ReplayStream::eventFilter(const Event *event) { | 
				
			||||
  if (event->which == cereal::Event::Which::CAN) { | 
				
			||||
    updateEvent(event); | 
				
			||||
  } | 
				
			||||
  return true; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void ReplayStream::seekTo(double ts) { | 
				
			||||
  replay->seekTo(std::max(double(0), ts), false); | 
				
			||||
  counters_begin_sec = 0; | 
				
			||||
  emit updated(); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void ReplayStream::pause(bool pause) { | 
				
			||||
  replay->pause(pause); | 
				
			||||
  emit(pause ? paused() : resume()); | 
				
			||||
} | 
				
			||||
@ -0,0 +1,32 @@ | 
				
			||||
#pragma once | 
				
			||||
 | 
				
			||||
#include "opendbc/can/common_dbc.h" | 
				
			||||
#include "tools/cabana/streams/abstractstream.h" | 
				
			||||
#include "tools/cabana/settings.h" | 
				
			||||
 | 
				
			||||
class ReplayStream : public AbstractStream { | 
				
			||||
  Q_OBJECT | 
				
			||||
 | 
				
			||||
public: | 
				
			||||
  ReplayStream(QObject *parent); | 
				
			||||
  ~ReplayStream(); | 
				
			||||
  bool loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags = REPLAY_FLAG_NONE); | 
				
			||||
  bool eventFilter(const Event *event); | 
				
			||||
  void seekTo(double ts) override; | 
				
			||||
  inline QString routeName() const override { return replay->route()->name(); } | 
				
			||||
  inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); } | 
				
			||||
  inline VisionStreamType visionStreamType() const override { return replay->hasFlag(REPLAY_FLAG_ECAM) ? VISION_STREAM_WIDE_ROAD : VISION_STREAM_ROAD; } | 
				
			||||
  inline double totalSeconds() const override { return replay->totalSeconds(); } | 
				
			||||
  inline double routeStartTime() const override { return replay->routeStartTime() / (double)1e9; } | 
				
			||||
  inline double currentSec() const override { return replay->currentSeconds(); } | 
				
			||||
  inline QDateTime currentDateTime() const override { return replay->currentDateTime(); } | 
				
			||||
  inline const Route *route() const override { return replay->route(); } | 
				
			||||
  inline const std::vector<Event *> *events() const override { return replay->events(); } | 
				
			||||
  inline void setSpeed(float speed) override { replay->setSpeed(speed); } | 
				
			||||
  inline bool isPaused() const override { return replay->isPaused(); } | 
				
			||||
  void pause(bool pause) override; | 
				
			||||
  inline const std::vector<std::tuple<int, int, TimelineType>> getTimeline() override { return replay->getTimeline(); } | 
				
			||||
 | 
				
			||||
private: | 
				
			||||
  Replay *replay = nullptr; | 
				
			||||
}; | 
				
			||||
					Loading…
					
					
				
		Reference in new issue