@ -1,4 +1,5 @@
# include "tools/cabana/streams/abstractstream.h"
# include "tools/cabana/streams/abstractstream.h"
# include <QTimer>
# include <QTimer>
AbstractStream * can = nullptr ;
AbstractStream * can = nullptr ;
@ -26,7 +27,7 @@ void AbstractStream::updateMessages(QHash<MessageId, CanData> *messages) {
}
}
void AbstractStream : : updateEvent ( const MessageId & id , double sec , const uint8_t * data , uint8_t size ) {
void AbstractStream : : updateEvent ( const MessageId & id , double sec , const uint8_t * data , uint8_t size ) {
all_msgs [ id ] . compute ( ( const char * ) data , size , sec , getSpeed ( ) ) ;
all_msgs [ id ] . compute ( ( const char * ) data , size , sec , getSpeed ( ) ) ;
if ( ! new_msgs - > contains ( id ) ) {
if ( ! new_msgs - > contains ( id ) ) {
new_msgs - > insert ( id , { } ) ;
new_msgs - > insert ( id , { } ) ;
}
}
@ -82,18 +83,23 @@ void AbstractStream::updateLastMsgsTo(double sec) {
} ) ;
} ) ;
}
}
void AbstractStream : : parseEvents ( std : : unordered_map < MessageId , std : : deque < const CanEvent * > > & msgs ,
void AbstractStream : : mergeEvents ( std : : vector < Event * > : : const_iterator first , std : : vector < Event * > : : const_iterator last ) {
std : : vector < Event * > : : const_iterator first , std : : vector < Event * > : : const_iterator last ) {
size_t memory_size = 0 ;
size_t memory_size = 0 ;
size_t events_cnt = 0 ;
for ( auto it = first ; it ! = last ; + + it ) {
for ( auto it = first ; it ! = last ; + + it ) {
if ( ( * it ) - > which = = cereal : : Event : : Which : : CAN ) {
if ( ( * it ) - > which = = cereal : : Event : : Which : : CAN ) {
for ( const auto & c : ( * it ) - > event . getCan ( ) ) {
for ( const auto & c : ( * it ) - > event . getCan ( ) ) {
memory_size + = sizeof ( CanEvent ) + sizeof ( uint8_t ) * c . getDat ( ) . size ( ) ;
memory_size + = sizeof ( CanEvent ) + sizeof ( uint8_t ) * c . getDat ( ) . size ( ) ;
+ + events_cnt ;
}
}
}
}
}
}
if ( memory_size = = 0 ) return ;
char * ptr = memory_blocks . emplace_back ( new char [ memory_size ] ) . get ( ) ;
char * ptr = memory_blocks . emplace_back ( new char [ memory_size ] ) . get ( ) ;
std : : unordered_map < MessageId , std : : deque < const CanEvent * > > new_events_map ;
std : : vector < const CanEvent * > new_events ;
new_events . reserve ( events_cnt ) ;
for ( auto it = first ; it ! = last ; + + it ) {
for ( auto it = first ; it ! = last ; + + it ) {
if ( ( * it ) - > which = = cereal : : Event : : Which : : CAN ) {
if ( ( * it ) - > which = = cereal : : Event : : Which : : CAN ) {
uint64_t ts = ( * it ) - > mono_time ;
uint64_t ts = ( * it ) - > mono_time ;
@ -106,31 +112,28 @@ void AbstractStream::parseEvents(std::unordered_map<MessageId, std::deque<const
e - > size = dat . size ( ) ;
e - > size = dat . size ( ) ;
memcpy ( e - > dat , ( uint8_t * ) dat . begin ( ) , e - > size ) ;
memcpy ( e - > dat , ( uint8_t * ) dat . begin ( ) , e - > size ) ;
msgs [ { . source = e - > src , . address = e - > address } ] . push_back ( e ) ;
new_events_map [ { . source = e - > src , . address = e - > address } ] . push_back ( e ) ;
all_events_ . push_back ( e ) ;
new_events . push_back ( e ) ;
ptr + = sizeof ( CanEvent ) + sizeof ( uint8_t ) * e - > size ;
ptr + = sizeof ( CanEvent ) + sizeof ( uint8_t ) * e - > size ;
}
}
}
}
}
}
}
void AbstractStream : : mergeEvents ( std : : vector < Event * > : : const_iterator first , std : : vector < Event * > : : const_iterator last , bool append ) {
if ( first = = last ) return ;
if ( append ) {
bool append = new_events . front ( ) - > mono_time > lastest_event_ts ;
parseEvents ( events_ , first , last ) ;
for ( auto & [ id , new_e ] : new_events_map ) {
} else {
auto & e = events_ [ id ] ;
std : : unordered_map < MessageId , std : : deque < const CanEvent * > > new_events ;
auto pos = append ? e . end ( ) : std : : upper_bound ( e . cbegin ( ) , e . cend ( ) , new_e . front ( ) , [ ] ( const CanEvent * l , const CanEvent * r ) {
parseEvents ( new_events , first , last ) ;
return l - > mono_time < r - > mono_time ;
for ( auto & [ id , new_e ] : new_events ) {
} ) ;
auto & e = events_ [ id ] ;
e . insert ( pos , new_e . cbegin ( ) , new_e . cend ( ) ) ;
auto it = std : : upper_bound ( e . cbegin ( ) , e . cend ( ) , new_e . front ( ) , [ ] ( const CanEvent * l , const CanEvent * r ) {
return l - > mono_time < r - > mono_time ;
} ) ;
e . insert ( it , new_e . cbegin ( ) , new_e . cend ( ) ) ;
}
}
}
total_sec = ( all_events_ . back ( ) - > mono_time - all_events_ . front ( ) - > mono_time ) / 1e9 ;
auto pos = append ? all_events_ . end ( ) : std : : upper_bound ( all_events_ . begin ( ) , all_events_ . end ( ) , new_events . front ( ) , [ ] ( auto l , auto r ) {
return l - > mono_time < r - > mono_time ;
} ) ;
all_events_ . insert ( pos , new_events . cbegin ( ) , new_events . cend ( ) ) ;
lastest_event_ts = all_events_ . back ( ) - > mono_time ;
emit eventsMerged ( ) ;
emit eventsMerged ( ) ;
}
}