# include "tools/cabana/streams/abstractstream.h"
AbstractStream * can = nullptr ;
AbstractStream : : AbstractStream ( QObject * parent , bool is_live_streaming ) : is_live_streaming ( is_live_streaming ) , QObject ( parent ) {
can = this ;
new_msgs = std : : make_unique < QHash < QString , CanData > > ( ) ;
QObject : : connect ( this , & AbstractStream : : received , this , & AbstractStream : : process , Qt : : QueuedConnection ) ;
QObject : : connect ( this , & AbstractStream : : seekedTo , this , & AbstractStream : : updateLastMsgsTo ) ;
}
void AbstractStream : : process ( QHash < QString , CanData > * messages ) {
for ( auto it = messages - > begin ( ) ; it ! = messages - > end ( ) ; + + it ) {
can_msgs [ it . key ( ) ] = it . value ( ) ;
}
emit updated ( ) ;
emit msgsReceived ( messages ) ;
delete messages ;
processing = false ;
}
bool AbstractStream : : updateEvent ( const Event * event ) {
static double prev_update_ts = 0 ;
if ( event - > which = = cereal : : Event : : Which : : CAN ) {
double current_sec = event - > mono_time / 1e9 - routeStartTime ( ) ;
for ( const auto & c : event - > event . getCan ( ) ) {
QString id = QString ( " %1:%2 " ) . arg ( c . getSrc ( ) ) . arg ( c . getAddress ( ) , 1 , 16 ) ;
CanData & data = ( * new_msgs ) [ id ] ;
data . ts = current_sec ;
data . dat = QByteArray ( ( char * ) c . getDat ( ) . begin ( ) , c . getDat ( ) . size ( ) ) ;
data . count = + + counters [ id ] ;
data . freq = data . count / std : : max ( 1.0 , current_sec ) ;
change_trackers [ id ] . compute ( data . dat , data . ts , data . freq ) ;
data . colors = change_trackers [ id ] . colors ;
data . last_change_t = change_trackers [ id ] . last_change_t ;
}
double ts = millis_since_boot ( ) ;
if ( ( ts - prev_update_ts ) > ( 1000.0 / settings . fps ) & & ! processing & & ! new_msgs - > isEmpty ( ) ) {
// delay posting CAN message if UI thread is busy
processing = true ;
prev_update_ts = ts ;
// use pointer to avoid data copy in queued connection.
emit received ( new_msgs . release ( ) ) ;
new_msgs . reset ( new QHash < QString , CanData > ) ;
new_msgs - > reserve ( 100 ) ;
}
}
return true ;
}
const CanData & AbstractStream : : lastMessage ( const QString & id ) {
static CanData empty_data ;
auto it = can_msgs . find ( id ) ;
return it ! = can_msgs . end ( ) ? it . value ( ) : empty_data ;
}
void AbstractStream : : updateLastMsgsTo ( double sec ) {
QHash < std : : pair < uint8_t , uint32_t > , CanData > last_msgs ; // Much faster than QHash<String, CanData>
last_msgs . reserve ( can_msgs . size ( ) ) ;
double route_start_time = routeStartTime ( ) ;
uint64_t last_ts = ( sec + route_start_time ) * 1e9 ;
auto last = std : : upper_bound ( events ( ) - > rbegin ( ) , events ( ) - > rend ( ) , last_ts , [ ] ( uint64_t ts , auto & e ) { return e - > mono_time < ts ; } ) ;
for ( auto it = last ; it ! = events ( ) - > rend ( ) ; + + it ) {
if ( ( * it ) - > which = = cereal : : Event : : Which : : CAN ) {
for ( const auto & c : ( * it ) - > event . getCan ( ) ) {
auto & m = last_msgs [ { c . getSrc ( ) , c . getAddress ( ) } ] ;
if ( + + m . count = = 1 ) {
m . ts = ( ( * it ) - > mono_time / 1e9 ) - route_start_time ;
m . dat = QByteArray ( ( char * ) c . getDat ( ) . begin ( ) , c . getDat ( ) . size ( ) ) ;
m . colors = QVector < QColor > ( m . dat . size ( ) , QColor ( 0 , 0 , 0 , 0 ) ) ;
m . last_change_t = QVector < double > ( m . dat . size ( ) , m . ts ) ;
} else {
m . freq = m . count / std : : max ( 1.0 , m . ts ) ;
}
}
}
}
// it is thread safe to update data here.
// updateEvent will not be called before replayStream::seekedTo return.
new_msgs - > clear ( ) ;
change_trackers . clear ( ) ;
counters . clear ( ) ;
can_msgs . clear ( ) ;
for ( auto it = last_msgs . cbegin ( ) ; it ! = last_msgs . cend ( ) ; + + it ) {
QString msg_id = QString ( " %1:%2 " ) . arg ( it . key ( ) . first ) . arg ( it . key ( ) . second , 1 , 16 ) ;
can_msgs [ msg_id ] = it . value ( ) ;
counters [ msg_id ] = it . value ( ) . count ;
}
emit updated ( ) ;
emit msgsReceived ( & can_msgs ) ;
}