# include "tools/cabana/streams/abstractstream.h"
# include <QTimer>
AbstractStream * can = nullptr ;
AbstractStream : : AbstractStream ( QObject * parent , bool is_live_streaming ) : is_live_streaming ( is_live_streaming ) , QObject ( parent ) {
can = this ;
new_msgs = std : : make_unique < QHash < MessageId , CanData > > ( ) ;
QObject : : connect ( this , & AbstractStream : : received , this , & AbstractStream : : process , Qt : : QueuedConnection ) ;
QObject : : connect ( this , & AbstractStream : : seekedTo , this , & AbstractStream : : updateLastMsgsTo ) ;
}
void AbstractStream : : process ( QHash < MessageId , CanData > * messages ) {
for ( auto it = messages - > begin ( ) ; it ! = messages - > end ( ) ; + + it ) {
last_msgs [ it . key ( ) ] = it . value ( ) ;
}
emit updated ( ) ;
emit msgsReceived ( messages ) ;
delete messages ;
processing = false ;
}
bool AbstractStream : : updateEvent ( const Event * event ) {
static double prev_update_ts = 0 ;
if ( event - > which = = cereal : : Event : : Which : : CAN ) {
double current_sec = event - > mono_time / 1e9 - routeStartTime ( ) ;
for ( const auto & c : event - > event . getCan ( ) ) {
MessageId id = { . source = c . getSrc ( ) , . address = c . getAddress ( ) } ;
CanData & data = ( * new_msgs ) [ id ] ;
data . ts = current_sec ;
data . dat = QByteArray ( ( char * ) c . getDat ( ) . begin ( ) , c . getDat ( ) . size ( ) ) ;
data . count = + + counters [ id ] ;
data . freq = data . count / std : : max ( 1.0 , current_sec ) ;
auto & tracker = change_trackers [ id ] ;
tracker . compute ( data . dat , data . ts , data . freq ) ;
data . colors = tracker . colors ;
data . last_change_t = tracker . last_change_t ;
data . bit_change_counts = tracker . bit_change_counts ;
if ( ! sources . contains ( id . source ) ) {
sources . insert ( id . source ) ;
emit sourcesUpdated ( sources ) ;
}
}
double ts = millis_since_boot ( ) ;
if ( ( ts - prev_update_ts ) > ( 1000.0 / settings . fps ) & & ! processing & & ! new_msgs - > isEmpty ( ) ) {
// delay posting CAN message if UI thread is busy
processing = true ;
prev_update_ts = ts ;
// use pointer to avoid data copy in queued connection.
emit received ( new_msgs . release ( ) ) ;
new_msgs . reset ( new QHash < MessageId , CanData > ) ;
new_msgs - > reserve ( 100 ) ;
}
}
return true ;
}
const CanData & AbstractStream : : lastMessage ( const MessageId & id ) {
static CanData empty_data ;
auto it = last_msgs . find ( id ) ;
return it ! = last_msgs . end ( ) ? it . value ( ) : empty_data ;
}
// it is thread safe to update data in updateLastMsgsTo.
// updateEvent will not be called before replayStream::seekedTo return.
void AbstractStream : : updateLastMsgsTo ( double sec ) {
new_msgs - > clear ( ) ;
change_trackers . clear ( ) ;
last_msgs . clear ( ) ;
counters . clear ( ) ;
CanEvent last_event = { . mono_time = uint64_t ( ( sec + routeStartTime ( ) ) * 1e9 ) } ;
for ( auto & [ id , e ] : events_ ) {
auto it = std : : lower_bound ( e . crbegin ( ) , e . crend ( ) , last_event , std : : greater < CanEvent > ( ) ) ;
if ( it ! = e . crend ( ) ) {
auto & m = last_msgs [ id ] ;
m . dat = QByteArray ( ( const char * ) it - > dat , it - > size ) ;
m . ts = it - > mono_time / 1e9 - routeStartTime ( ) ;
m . count = std : : distance ( it , e . crend ( ) ) ;
m . freq = m . count / std : : max ( 1.0 , m . ts ) ;
m . last_change_t = QVector < double > ( m . dat . size ( ) , m . ts ) ;
m . colors = QVector < QColor > ( m . dat . size ( ) , QColor ( 0 , 0 , 0 , 0 ) ) ;
m . bit_change_counts = QVector < std : : array < uint32_t , 8 > > ( m . dat . size ( ) ) ;
counters [ id ] = m . count ;
}
}
QTimer : : singleShot ( 0 , [ this ] ( ) {
emit updated ( ) ;
emit msgsReceived ( & last_msgs ) ;
} ) ;
}
void AbstractStream : : parseEvents ( std : : unordered_map < MessageId , std : : deque < CanEvent > > & msgs ,
std : : vector < Event * > : : const_iterator first , std : : vector < Event * > : : const_iterator last ) {
for ( ; first ! = last ; + + first ) {
if ( ( * first ) - > which = = cereal : : Event : : Which : : CAN ) {
for ( const auto & c : ( * first ) - > event . getCan ( ) ) {
auto dat = c . getDat ( ) ;
auto & m = msgs [ { . source = c . getSrc ( ) , . address = c . getAddress ( ) } ] . emplace_back ( ) ;
m . size = std : : min ( dat . size ( ) , std : : size ( m . dat ) ) ;
memcpy ( m . dat , ( uint8_t * ) dat . begin ( ) , m . size ) ;
m . mono_time = ( * first ) - > mono_time ;
}
last_event_ts = std : : max ( last_event_ts , ( * first ) - > mono_time ) ;
}
}
}
void AbstractStream : : mergeEvents ( std : : vector < Event * > : : const_iterator first , std : : vector < Event * > : : const_iterator last , bool append ) {
if ( first = = last ) return ;
if ( append ) {
parseEvents ( events_ , first , last ) ;
} else {
std : : unordered_map < MessageId , std : : deque < CanEvent > > new_events ;
parseEvents ( new_events , first , last ) ;
for ( auto & [ id , new_e ] : new_events ) {
auto & e = events_ [ id ] ;
auto it = std : : upper_bound ( e . cbegin ( ) , e . cend ( ) , new_e . front ( ) ) ;
e . insert ( it , new_e . cbegin ( ) , new_e . cend ( ) ) ;
}
}
emit eventsMerged ( ) ;
}