You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							86 lines
						
					
					
						
							2.6 KiB
						
					
					
				
			
		
		
	
	
							86 lines
						
					
					
						
							2.6 KiB
						
					
					
				| #include "tools/cabana/streams/livestream.h"
 | |
| 
 | |
| #include <QTimer>
 | |
| 
 | |
| LiveStream::LiveStream(QObject *parent, QString address) : zmq_address(address), AbstractStream(parent, true) {
 | |
|   stream_thread = new QThread(this);
 | |
|   QObject::connect(stream_thread, &QThread::started, [=]() { streamThread(); });
 | |
|   QObject::connect(stream_thread, &QThread::finished, stream_thread, &QThread::deleteLater);
 | |
|   QTimer::singleShot(0, [this]() { stream_thread->start(); });
 | |
| }
 | |
| 
 | |
| LiveStream::~LiveStream() {
 | |
|   stream_thread->requestInterruption();
 | |
|   stream_thread->quit();
 | |
|   stream_thread->wait();
 | |
| }
 | |
| 
 | |
| void LiveStream::streamThread() {
 | |
|   if (!zmq_address.isEmpty()) {
 | |
|     setenv("ZMQ", "1", 1);
 | |
|   }
 | |
|   std::unique_ptr<Context> context(Context::create());
 | |
|   std::string address = zmq_address.isEmpty() ? "127.0.0.1" : zmq_address.toStdString();
 | |
|   std::unique_ptr<SubSocket> sock(SubSocket::create(context.get(), "can", address));
 | |
|   assert(sock != NULL);
 | |
|   sock->setTimeout(50);
 | |
|   // run as fast as messages come in
 | |
|   while (!QThread::currentThread()->isInterruptionRequested()) {
 | |
|     Message *msg = sock->receive(true);
 | |
|     if (!msg) {
 | |
|       QThread::msleep(50);
 | |
|       continue;
 | |
|     }
 | |
|     std::lock_guard lk(lock);
 | |
|     handleEvent(messages.emplace_back(msg).event);
 | |
|     // TODO: write stream to log file to replay it with cabana --data_dir flag.
 | |
|   }
 | |
| }
 | |
| 
 | |
| void LiveStream::handleEvent(Event *evt) {
 | |
|   if (start_ts == 0 || evt->mono_time < start_ts) {
 | |
|     if (evt->mono_time < start_ts) {
 | |
|       qDebug() << "stream is looping back to old time stamp";
 | |
|     }
 | |
|     start_ts = current_ts = evt->mono_time;
 | |
|     emit streamStarted();
 | |
|   }
 | |
| 
 | |
|   received.push_back(evt);
 | |
|   if (!pause_) {
 | |
|     if (speed_ < 1 && last_update_ts > 0) {
 | |
|       auto it = std::upper_bound(received.cbegin(), received.cend(), current_ts, [](uint64_t ts, auto &e) {
 | |
|         return ts < e->mono_time;
 | |
|       });
 | |
|       if (it != received.cend()) {
 | |
|         bool skip = (nanos_since_boot() - last_update_ts) < ((*it)->mono_time - current_ts) / speed_;
 | |
|         if (skip) return;
 | |
| 
 | |
|         evt = *it;
 | |
|       }
 | |
|     }
 | |
|     current_ts = evt->mono_time;
 | |
|     last_update_ts = nanos_since_boot();
 | |
|     updateEvent(evt);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void LiveStream::process(QHash<MessageId, CanData> *last_messages) {
 | |
|   {
 | |
|     std::lock_guard lk(lock);
 | |
|     auto first = std::upper_bound(received.cbegin(), received.cend(), last_event_ts, [](uint64_t ts, auto &e) {
 | |
|       return ts < e->mono_time;
 | |
|     });
 | |
|     mergeEvents(first, received.cend(), true);
 | |
|     if (speed_ == 1) {
 | |
|       received.clear();
 | |
|       messages.clear();
 | |
|     }
 | |
|   }
 | |
|   AbstractStream::process(last_messages);
 | |
| }
 | |
| 
 | |
| void LiveStream::pause(bool pause) {
 | |
|   pause_ = pause;
 | |
|   emit paused();
 | |
| }
 | |
| 
 |