@ -3,8 +3,8 @@
# include <QApplication>
# include <QDebug>
# include <capnp/dynamic.h>
# include "cereal/services.h"
# include "selfdrive/camerad/cameras/camera_common.h"
# include "selfdrive/common/timing.h"
# include "selfdrive/hardware/hw.h"
# include "selfdrive/ui/replay/util.h"
@ -27,16 +27,15 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
if ( sm = = nullptr ) {
pm = new PubMaster ( s ) ;
}
route_ = std : : make_unique < Route > ( route ) ;
events_ = new std : : vector < Event * > ( ) ;
// queueSegment is always executed in the main thread
// doSeek & queueSegment are always executed in the same thread
connect ( this , & Replay : : seekTo , this , & Replay : : doSeek ) ;
connect ( this , & Replay : : segmentChanged , this , & Replay : : queueSegment ) ;
}
Replay : : ~ Replay ( ) {
qDebug ( ) < < " shutdown: in progress... " ;
exit_ = true ;
updating_events_ = true ;
if ( stream_thread_ ) {
@ -53,18 +52,27 @@ Replay::~Replay() {
}
bool Replay : : load ( ) {
if ( ! route_ - > load ( ) | | route_ - > size ( ) = = 0 ) {
qDebug ( ) < < " failed load route " < < route_ - > name ( ) < < " from server " ;
if ( ! route_ - > load ( ) ) {
qDebug ( ) < < " failed to load route " < < route_ - > name ( ) < < " from server " ;
return false ;
}
qDebug ( ) < < " load route " < < route_ - > name ( ) < < route_ - > size ( ) < < " segments " ;
segments_ . resize ( route_ - > size ( ) ) ;
for ( int i = 0 ; i < route_ - > size ( ) ; + + i ) {
const SegmentFile & f = route_ - > at ( i ) ;
if ( ( ! f . rlog . isEmpty ( ) | | ! f . qlog . isEmpty ( ) ) & & ( ! f . road_cam . isEmpty ( ) | | ! f . qcamera . isEmpty ( ) ) ) {
segments_ [ i ] = nullptr ;
}
}
if ( segments_ . empty ( ) ) {
qDebug ( ) < < " no valid segments in route " < < route_ - > name ( ) ;
return false ;
}
qDebug ( ) < < " load route " < < route_ - > name ( ) < < " with " < < segments_ . size ( ) < < " valid segments " ;
return true ;
}
void Replay : : start ( int seconds ) {
seekTo ( seconds ) ;
seekTo ( seconds , false ) ;
camera_server_ = std : : make_unique < CameraServer > ( ) ;
// start stream thread
@ -84,35 +92,24 @@ void Replay::updateEvents(const std::function<bool()> &lambda) {
stream_cv_ . notify_one ( ) ;
}
void Replay : : seekTo ( int seconds , bool relative ) {
if ( segments_ . empty ( ) ) return ;
bool segment_loaded = false ;
bool segment_changed = false ;
void Replay : : doSeek ( int seconds , bool relative ) {
updateEvents ( [ & ] ( ) {
if ( relative ) {
seconds + = ( ( cur_mono_time_ - route_start_ts_ ) * 1e-9 ) ;
seconds + = currentSeconds ( ) ;
}
qInfo ( ) < < " seeking to " < < seconds ;
cur_mono_time_ = route_start_ts_ + std : : clamp ( seconds , 0 , ( int ) segments_ . size ( ) * 60 ) * 1e9 ;
int segment = std : : min ( seconds / 60 , ( int ) segments_ . size ( ) - 1 ) ;
segment_changed = current_segment_ . exchange ( segment ) ! = segment ;
segment_loaded = std : : find ( segments_merged_ . begin ( ) , segments_merged_ . end ( ) , segment ) ! = segments_merged_ . end ( ) ;
return segment_loaded ;
cur_mono_time_ = route_start_ts_ + std : : clamp ( seconds , 0 , ( int ) segments_ . rbegin ( ) - > first * 60 ) * 1e9 ;
current_segment_ = std : : min ( seconds / 60 , ( int ) segments_ . rbegin ( ) - > first - 1 ) ;
return false ;
} ) ;
if ( segment_changed | | ! segment_loaded ) {
emit segmentChanged ( ) ;
}
queueSegment ( ) ;
}
void Replay : : pause ( bool pause ) {
updateEvents ( [ = ] ( ) {
qDebug ( ) < < ( pause ? " paused... " : " resuming " ) ;
if ( pause ) {
float current_ts = ( cur_mono_time_ - route_start_ts_ ) / 1e9 ;
qInfo ( ) < < " at " < < current_ts < < " s " ;
qInfo ( ) < < " at " < < currentSeconds ( ) < < " s " ;
}
paused_ = pause ;
return true ;
@ -127,43 +124,36 @@ void Replay::setCurrentSegment(int n) {
// maintain the segment window
void Replay : : queueSegment ( ) {
// fetch segments forward
int cur_seg = current_segment_ . load ( ) ;
int end_idx = cur_seg ;
for ( int i = cur_seg , fwd = 0 ; i < segments_ . size ( ) & & fwd < = FORWARD_SEGS ; + + i ) {
if ( ! segments_ [ i ] ) {
segments_ [ i ] = std : : make_unique < Segment > ( i , route_ - > at ( i ) , load_dcam , load_ecam ) ;
QObject : : connect ( segments_ [ i ] . get ( ) , & Segment : : loadFinished , this , & Replay : : queueSegment ) ;
}
end_idx = i ;
// skip invalid segment
if ( segments_ [ i ] - > isValid ( ) ) {
+ + fwd ;
} else if ( i = = cur_seg ) {
+ + cur_seg ;
// forward fetch segments
SegmentMap : : iterator begin , end ;
begin = end = segments_ . lower_bound ( current_segment_ ) ;
for ( int fwd = 0 ; end ! = segments_ . end ( ) & & fwd < = FORWARD_SEGS ; + + end , + + fwd ) {
auto & [ n , seg ] = * end ;
if ( ! seg ) {
seg = std : : make_unique < Segment > ( n , route_ - > at ( n ) , load_dcam , load_ecam ) ;
QObject : : connect ( seg . get ( ) , & Segment : : loadFinished , this , & Replay : : queueSegment ) ;
}
}
// merge segments
mergeSegments ( begin , end ) ;
// free segments out of current semgnt window.
for ( auto it = segments_ . begin ( ) ; it ! = begin ; + + it ) {
it - > second . reset ( nullptr ) ;
}
for ( auto it = end ; it ! = segments_ . end ( ) ; + + it ) {
it - > second . reset ( nullptr ) ;
}
mergeSegments ( std : : min ( cur_seg , ( int ) segments_ . size ( ) - 1 ) , end_idx ) ;
}
void Replay : : mergeSegments ( int cur_seg , int end_idx ) {
void Replay : : mergeSegments ( const SegmentMap : : iterator & begin , const SegmentMap : : iterator & end ) {
// segments must be merged in sequence.
std : : vector < int > segments_need_merge ;
const int begin_idx = std : : max ( cur_seg - BACKWARD_SEGS , 0 ) ;
for ( int i = begin_idx ; i < = end_idx ; + + i ) {
if ( segments_ [ i ] & & segments_ [ i ] - > isLoaded ( ) ) {
segments_need_merge . push_back ( i ) ;
} else if ( i > = cur_seg & & segments_ [ i ] & & segments_ [ i ] - > isValid ( ) ) {
// segment is valid,but still loading. can't skip it to merge the next one.
// otherwise the stream thread may jump to the next segment.
break ;
}
for ( auto it = begin ; it ! = end & & it - > second - > isLoaded ( ) ; + + it ) {
segments_need_merge . push_back ( it - > first ) ;
}
if ( segments_need_merge ! = segments_merged_ ) {
qDebug ( ) < < " merge segments " < < segments_need_merge ;
// merge & sort events
std : : vector < Event * > * new_events = new std : : vector < Event * > ( ) ;
new_events - > reserve ( std : : accumulate ( segments_need_merge . begin ( ) , segments_need_merge . end ( ) , 0 ,
@ -173,7 +163,6 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
auto middle = new_events - > insert ( new_events - > end ( ) , log - > events . begin ( ) , log - > events . end ( ) ) ;
std : : inplace_merge ( new_events - > begin ( ) , middle , new_events - > end ( ) , Event : : lessThan ( ) ) ;
}
// update events
auto prev_events = events_ ;
updateEvents ( [ & ] ( ) {
@ -182,7 +171,7 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
auto it = std : : find_if ( new_events - > begin ( ) , new_events - > end ( ) , [ = ] ( auto e ) { return e - > which = = cereal : : Event : : Which : : INIT_DATA ; } ) ;
if ( it ! = new_events - > end ( ) ) {
route_start_ts_ = ( * it ) - > mono_time ;
// cur_mono_time_ is set by seekTo int start() before get route_start_ts_
// cur_mono_time_ is set by seekTo in start() before get route_start_ts_
cur_mono_time_ + = route_start_ts_ ;
}
}
@ -193,22 +182,16 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
} ) ;
delete prev_events ;
} else {
updateEvents ( [ ] ( ) { return true ; } ) ;
}
// free segments out of current semgnt window.
for ( int i = 0 ; i < segments_ . size ( ) ; i + + ) {
if ( ( i < begin_idx | | i > end_idx ) & & segments_ [ i ] ) {
segments_ [ i ] . reset ( nullptr ) ;
}
updateEvents ( [ = ] ( ) { return begin - > second - > isLoaded ( ) ; } ) ;
}
}
void Replay : : publishFrame ( const Event * e ) {
auto publish = [ = ] ( CameraType cam_type , const cereal : : EncodeIndex : : Reader & eidx ) {
auto & seg = segments_ [ eidx . getSegmentNum ( ) ] ;
if ( seg & & seg - > isLoaded ( ) & & seg - > frames [ cam_type ] & & eidx . getType ( ) = = cereal : : EncodeIndex : : Type : : FULL_H_E_V_C ) {
camera_server_ - > pushFrame ( cam_type , seg - > frames [ cam_type ] . get ( ) , eidx ) ;
int n = eidx . getSegmentNum ( ) ;
bool segment_loaded = std : : find ( segments_merged_ . begin ( ) , segments_merged_ . end ( ) , n ) ! = segments_merged_ . end ( ) ;
if ( segment_loaded & & segments_ [ n ] - > frames [ cam_type ] & & eidx . getType ( ) = = cereal : : EncodeIndex : : Type : : FULL_H_E_V_C ) {
camera_server_ - > pushFrame ( cam_type , segments_ [ n ] - > frames [ cam_type ] . get ( ) , eidx ) ;
}
} ;
if ( e - > which = = cereal : : Event : : ROAD_ENCODE_IDX ) {
@ -238,22 +221,21 @@ void Replay::stream() {
continue ;
}
uint64_t evt_start_ts = cur_mono_time_ ;
uint64_t loop_start_ts = nanos_since_boot ( ) ;
const uint64_t evt_start_ts = cur_mono_time_ ;
const uint64_t loop_start_ts = nanos_since_boot ( ) ;
for ( auto end = events_ - > end ( ) ; ! updating_events_ & & eit ! = end ; + + eit ) {
const Event * evt = ( * eit ) ;
cur_which = evt - > which ;
cur_mono_time_ = evt - > mono_time ;
if ( cur_which < sockets_ . size ( ) & & sockets_ [ cur_which ] ! = nullptr ) {
int current_ts = ( cur_mono_time_ - route_start_ts_ ) / 1e9 ;
if ( ( current_ts - last_print ) > 5.0 ) {
const int current_ts = currentSeconds ( ) ;
if ( last_print > current_ts | | ( current_ts - last_print ) > 5.0 ) {
last_print = current_ts ;
qInfo ( ) < < " at " < < current_ts < < " s " ;
}
setCurrentSegment ( current_ts / 60 ) ;
if ( cur_which < sockets_ . size ( ) & & sockets_ [ cur_which ] ! = nullptr ) {
// keep time
long etime = cur_mono_time_ - evt_start_ts ;
long rtime = nanos_since_boot ( ) - loop_start_ts ;