|  |  |  | @ -3,14 +3,13 @@ | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | AbstractStream *can = nullptr; | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | AbstractStream::AbstractStream(QObject *parent, bool is_live_streaming) : is_live_streaming(is_live_streaming), QObject(parent) { | 
			
		
	
		
			
				
					|  |  |  |  | AbstractStream::AbstractStream(QObject *parent) : QObject(parent) { | 
			
		
	
		
			
				
					|  |  |  |  |   can = this; | 
			
		
	
		
			
				
					|  |  |  |  |   new_msgs = std::make_unique<QHash<MessageId, CanData>>(); | 
			
		
	
		
			
				
					|  |  |  |  |   QObject::connect(this, &AbstractStream::received, this, &AbstractStream::process, Qt::QueuedConnection); | 
			
		
	
		
			
				
					|  |  |  |  |   QObject::connect(this, &AbstractStream::seekedTo, this, &AbstractStream::updateLastMsgsTo); | 
			
		
	
		
			
				
					|  |  |  |  | } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | void AbstractStream::process(QHash<MessageId, CanData> *messages) { | 
			
		
	
		
			
				
					|  |  |  |  | void AbstractStream::updateMessages(QHash<MessageId, CanData> *messages) { | 
			
		
	
		
			
				
					|  |  |  |  |   auto prev_src_size = sources.size(); | 
			
		
	
		
			
				
					|  |  |  |  |   for (auto it = messages->begin(); it != messages->end(); ++it) { | 
			
		
	
		
			
				
					|  |  |  |  |     const auto &id = it.key(); | 
			
		
	
	
		
			
				
					|  |  |  | @ -26,45 +25,39 @@ void AbstractStream::process(QHash<MessageId, CanData> *messages) { | 
			
		
	
		
			
				
					|  |  |  |  |   processing = false; | 
			
		
	
		
			
				
					|  |  |  |  | } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | bool AbstractStream::updateEvent(const Event *event) { | 
			
		
	
		
			
				
					|  |  |  |  |   static double prev_update_ts = 0; | 
			
		
	
		
			
				
					|  |  |  |  |   if (event->which == cereal::Event::Which::CAN) { | 
			
		
	
		
			
				
					|  |  |  |  |     double current_sec = event->mono_time / 1e9 - routeStartTime(); | 
			
		
	
		
			
				
					|  |  |  |  |     for (const auto &c : event->event.getCan()) { | 
			
		
	
		
			
				
					|  |  |  |  |       MessageId id = {.source = c.getSrc(), .address = c.getAddress()}; | 
			
		
	
		
			
				
					|  |  |  |  |       const auto dat = c.getDat(); | 
			
		
	
		
			
				
					|  |  |  |  |       all_msgs[id].compute((const char *)dat.begin(), dat.size(), current_sec, getSpeed()); | 
			
		
	
		
			
				
					|  |  |  |  | void AbstractStream::updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size) { | 
			
		
	
		
			
				
					|  |  |  |  |   all_msgs[id].compute((const char*)data, size, sec, getSpeed()); | 
			
		
	
		
			
				
					|  |  |  |  |   if (!new_msgs->contains(id)) { | 
			
		
	
		
			
				
					|  |  |  |  |     new_msgs->insert(id, {}); | 
			
		
	
		
			
				
					|  |  |  |  |   } | 
			
		
	
		
			
				
					|  |  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  |  |     double ts = millis_since_boot(); | 
			
		
	
		
			
				
					|  |  |  |  | } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | bool AbstractStream::postEvents() { | 
			
		
	
		
			
				
					|  |  |  |  |   // delay posting CAN message if UI thread is busy
 | 
			
		
	
		
			
				
					|  |  |  |  |     if ((ts - prev_update_ts) > (1000.0 / settings.fps) && !processing && !new_msgs->isEmpty()) { | 
			
		
	
		
			
				
					|  |  |  |  |   if (!processing) { | 
			
		
	
		
			
				
					|  |  |  |  |     processing = true; | 
			
		
	
		
			
				
					|  |  |  |  |       prev_update_ts = ts; | 
			
		
	
		
			
				
					|  |  |  |  |     for (auto it = new_msgs->begin(); it != new_msgs->end(); ++it) { | 
			
		
	
		
			
				
					|  |  |  |  |       it.value() = all_msgs[it.key()]; | 
			
		
	
		
			
				
					|  |  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  |  |     // use pointer to avoid data copy in queued connection.
 | 
			
		
	
		
			
				
					|  |  |  |  |       emit received(new_msgs.release()); | 
			
		
	
		
			
				
					|  |  |  |  |     QMetaObject::invokeMethod(this, std::bind(&AbstractStream::updateMessages, this, new_msgs.release()), Qt::QueuedConnection); | 
			
		
	
		
			
				
					|  |  |  |  |     new_msgs.reset(new QHash<MessageId, CanData>); | 
			
		
	
		
			
				
					|  |  |  |  |     new_msgs->reserve(100); | 
			
		
	
		
			
				
					|  |  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  |  |   } | 
			
		
	
		
			
				
					|  |  |  |  |     return true; | 
			
		
	
		
			
				
					|  |  |  |  |   } | 
			
		
	
		
			
				
					|  |  |  |  |   return false; | 
			
		
	
		
			
				
					|  |  |  |  | } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | const CanData &AbstractStream::lastMessage(const MessageId &id) { | 
			
		
	
		
			
				
					|  |  |  |  |   static CanData empty_data; | 
			
		
	
		
			
				
					|  |  |  |  |   static CanData empty_data = {}; | 
			
		
	
		
			
				
					|  |  |  |  |   auto it = last_msgs.find(id); | 
			
		
	
		
			
				
					|  |  |  |  |   return it != last_msgs.end() ? it.value() : empty_data; | 
			
		
	
		
			
				
					|  |  |  |  | } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | // it is thread safe to update data in updateLastMsgsTo.
 | 
			
		
	
		
			
				
					|  |  |  |  | // updateEvent will not be called before replayStream::seekedTo return.
 | 
			
		
	
		
			
				
					|  |  |  |  | // updateLastMsgsTo is always called in UI thread.
 | 
			
		
	
		
			
				
					|  |  |  |  | void AbstractStream::updateLastMsgsTo(double sec) { | 
			
		
	
		
			
				
					|  |  |  |  |   new_msgs->clear(); | 
			
		
	
		
			
				
					|  |  |  |  |   new_msgs.reset(new QHash<MessageId, CanData>); | 
			
		
	
		
			
				
					|  |  |  |  |   all_msgs.clear(); | 
			
		
	
		
			
				
					|  |  |  |  |   last_msgs.clear(); | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  |  | @ -89,7 +82,7 @@ void AbstractStream::updateLastMsgsTo(double sec) { | 
			
		
	
		
			
				
					|  |  |  |  |   }); | 
			
		
	
		
			
				
					|  |  |  |  | } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<CanEvent *>> &msgs, | 
			
		
	
		
			
				
					|  |  |  |  | void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<const CanEvent *>> &msgs, | 
			
		
	
		
			
				
					|  |  |  |  |                                  std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last) { | 
			
		
	
		
			
				
					|  |  |  |  |   size_t memory_size = 0; | 
			
		
	
		
			
				
					|  |  |  |  |   for (auto it = first; it != last; ++it) { | 
			
		
	
	
		
			
				
					|  |  |  | @ -101,22 +94,24 @@ void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<CanEve | 
			
		
	
		
			
				
					|  |  |  |  |   } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   char *ptr = memory_blocks.emplace_back(new char[memory_size]).get(); | 
			
		
	
		
			
				
					|  |  |  |  |   uint64_t ts = 0; | 
			
		
	
		
			
				
					|  |  |  |  |   for (auto it = first; it != last; ++it) { | 
			
		
	
		
			
				
					|  |  |  |  |     if ((*it)->which == cereal::Event::Which::CAN) { | 
			
		
	
		
			
				
					|  |  |  |  |       ts = (*it)->mono_time; | 
			
		
	
		
			
				
					|  |  |  |  |       uint64_t ts = (*it)->mono_time; | 
			
		
	
		
			
				
					|  |  |  |  |       for (const auto &c : (*it)->event.getCan()) { | 
			
		
	
		
			
				
					|  |  |  |  |         auto dat = c.getDat(); | 
			
		
	
		
			
				
					|  |  |  |  |         CanEvent *e = (CanEvent *)ptr; | 
			
		
	
		
			
				
					|  |  |  |  |         e->src = c.getSrc(); | 
			
		
	
		
			
				
					|  |  |  |  |         e->address = c.getAddress(); | 
			
		
	
		
			
				
					|  |  |  |  |         e->mono_time = ts; | 
			
		
	
		
			
				
					|  |  |  |  |         auto dat = c.getDat(); | 
			
		
	
		
			
				
					|  |  |  |  |         e->size = dat.size(); | 
			
		
	
		
			
				
					|  |  |  |  |         memcpy(e->dat, (uint8_t *)dat.begin(), e->size); | 
			
		
	
		
			
				
					|  |  |  |  |         msgs[{.source = c.getSrc(), .address = c.getAddress()}].push_back(e); | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         msgs[{.source = e->src, .address = e->address}].push_back(e); | 
			
		
	
		
			
				
					|  |  |  |  |         all_events_.push_back(e); | 
			
		
	
		
			
				
					|  |  |  |  |         ptr += sizeof(CanEvent) + sizeof(uint8_t) * e->size; | 
			
		
	
		
			
				
					|  |  |  |  |       } | 
			
		
	
		
			
				
					|  |  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  |  |   } | 
			
		
	
		
			
				
					|  |  |  |  |   last_event_ts = std::max(last_event_ts, ts); | 
			
		
	
		
			
				
					|  |  |  |  | } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  | void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std::vector<Event *>::const_iterator last, bool append) { | 
			
		
	
	
		
			
				
					|  |  |  | @ -125,7 +120,7 @@ void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std | 
			
		
	
		
			
				
					|  |  |  |  |   if (append) { | 
			
		
	
		
			
				
					|  |  |  |  |     parseEvents(events_, first, last); | 
			
		
	
		
			
				
					|  |  |  |  |   } else { | 
			
		
	
		
			
				
					|  |  |  |  |     std::unordered_map<MessageId, std::deque<CanEvent *>> new_events; | 
			
		
	
		
			
				
					|  |  |  |  |     std::unordered_map<MessageId, std::deque<const CanEvent *>> new_events; | 
			
		
	
		
			
				
					|  |  |  |  |     parseEvents(new_events, first, last); | 
			
		
	
		
			
				
					|  |  |  |  |     for (auto &[id, new_e] : new_events) { | 
			
		
	
		
			
				
					|  |  |  |  |       auto &e = events_[id]; | 
			
		
	
	
		
			
				
					|  |  |  | @ -133,6 +128,7 @@ void AbstractStream::mergeEvents(std::vector<Event *>::const_iterator first, std | 
			
		
	
		
			
				
					|  |  |  |  |       e.insert(it, new_e.cbegin(), new_e.cend()); | 
			
		
	
		
			
				
					|  |  |  |  |     } | 
			
		
	
		
			
				
					|  |  |  |  |   } | 
			
		
	
		
			
				
					|  |  |  |  |   total_sec = (all_events_.back()->mono_time - all_events_.front()->mono_time) / 1e9; | 
			
		
	
		
			
				
					|  |  |  |  |   emit eventsMerged(); | 
			
		
	
		
			
				
					|  |  |  |  | } | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  |  | 
 |