# include "tools/replay/replay.h"
# include <QDebug>
# include <QtConcurrent>
# include <capnp/dynamic.h>
# include "cereal/services.h"
# include "common/params.h"
# include "common/timing.h"
# include "system/hardware/hw.h"
# include "tools/replay/util.h"
Replay : : Replay ( QString route , QStringList allow , QStringList block , SubMaster * sm_ , uint32_t flags , QString data_dir , QObject * parent )
: sm ( sm_ ) , flags_ ( flags ) , QObject ( parent ) {
std : : vector < const char * > s ;
auto event_struct = capnp : : Schema : : from < cereal : : Event > ( ) . asStruct ( ) ;
sockets_ . resize ( event_struct . getUnionFields ( ) . size ( ) ) ;
for ( const auto & it : services ) {
if ( ( allow . empty ( ) | | allow . contains ( it . name ) ) & & ! block . contains ( it . name ) ) {
uint16_t which = event_struct . getFieldByName ( it . name ) . getProto ( ) . getDiscriminantValue ( ) ;
sockets_ [ which ] = it . name ;
s . push_back ( it . name ) ;
}
}
qDebug ( ) < < " services " < < s ;
qDebug ( ) < < " loading route " < < route ;
if ( sm = = nullptr ) {
pm = std : : make_unique < PubMaster > ( s ) ;
}
route_ = std : : make_unique < Route > ( route , data_dir ) ;
events_ = std : : make_unique < std : : vector < Event * > > ( ) ;
new_events_ = std : : make_unique < std : : vector < Event * > > ( ) ;
}
Replay : : ~ Replay ( ) {
stop ( ) ;
}
void Replay : : stop ( ) {
if ( ! stream_thread_ & & segments_ . empty ( ) ) return ;
rInfo ( " shutdown: in progress... " ) ;
if ( stream_thread_ ! = nullptr ) {
exit_ = updating_events_ = true ;
stream_cv_ . notify_one ( ) ;
stream_thread_ - > quit ( ) ;
stream_thread_ - > wait ( ) ;
stream_thread_ = nullptr ;
}
segments_ . clear ( ) ;
camera_server_ . reset ( nullptr ) ;
timeline_future . waitForFinished ( ) ;
rInfo ( " shutdown: done " ) ;
}
bool Replay : : load ( ) {
if ( ! route_ - > load ( ) ) {
qCritical ( ) < < " failed to load route " < < route_ - > name ( )
< < " from " < < ( route_ - > dir ( ) . isEmpty ( ) ? " server " : route_ - > dir ( ) ) ;
return false ;
}
for ( auto & [ n , f ] : route_ - > segments ( ) ) {
bool has_log = ! f . rlog . isEmpty ( ) | | ! f . qlog . isEmpty ( ) ;
bool has_video = ! f . road_cam . isEmpty ( ) | | ! f . qcamera . isEmpty ( ) ;
if ( has_log & & ( has_video | | hasFlag ( REPLAY_FLAG_NO_VIPC ) ) ) {
segments_ . insert ( { n , nullptr } ) ;
}
}
if ( segments_ . empty ( ) ) {
qCritical ( ) < < " no valid segments in route " < < route_ - > name ( ) ;
return false ;
}
rInfo ( " load route %s with %zu valid segments " , qPrintable ( route_ - > name ( ) ) , segments_ . size ( ) ) ;
return true ;
}
void Replay : : start ( int seconds ) {
seekTo ( route_ - > identifier ( ) . segment_id * 60 + seconds , false ) ;
}
void Replay : : updateEvents ( const std : : function < bool ( ) > & lambda ) {
// set updating_events to true to force stream thread relase the lock and wait for evnets_udpated.
updating_events_ = true ;
{
std : : unique_lock lk ( stream_lock_ ) ;
events_updated_ = lambda ( ) ;
updating_events_ = false ;
}
stream_cv_ . notify_one ( ) ;
}
void Replay : : seekTo ( int seconds , bool relative ) {
seconds = relative ? seconds + currentSeconds ( ) : seconds ;
updateEvents ( [ & ] ( ) {
seconds = std : : max ( 0 , seconds ) ;
int seg = seconds / 60 ;
if ( segments_ . find ( seg ) = = segments_ . end ( ) ) {
rWarning ( " can't seek to %d s segment %d is invalid " , seconds , seg ) ;
return true ;
}
rInfo ( " seeking to %d s, segment %d " , seconds , seg ) ;
current_segment_ = seg ;
cur_mono_time_ = route_start_ts_ + seconds * 1e9 ;
return isSegmentMerged ( seg ) ;
} ) ;
queueSegment ( ) ;
}
void Replay : : seekToFlag ( FindFlag flag ) {
if ( auto next = find ( flag ) ) {
seekTo ( * next - 2 , false ) ; // seek to 2 seconds before next
}
}
void Replay : : buildTimeline ( ) {
uint64_t engaged_begin = 0 ;
uint64_t alert_begin = 0 ;
TimelineType alert_type = TimelineType : : None ;
for ( int i = 0 ; i < segments_ . size ( ) & & ! exit_ ; + + i ) {
LogReader log ;
if ( ! log . load ( route_ - > at ( i ) . qlog . toStdString ( ) , & exit_ , ! hasFlag ( REPLAY_FLAG_NO_FILE_CACHE ) , 0 , 3 ) ) continue ;
for ( const Event * e : log . events ) {
if ( e - > which = = cereal : : Event : : Which : : CONTROLS_STATE ) {
auto cs = e - > event . getControlsState ( ) ;
if ( ! engaged_begin & & cs . getEnabled ( ) ) {
engaged_begin = e - > mono_time ;
} else if ( engaged_begin & & ! cs . getEnabled ( ) ) {
std : : lock_guard lk ( timeline_lock ) ;
timeline . push_back ( { toSeconds ( engaged_begin ) , toSeconds ( e - > mono_time ) , TimelineType : : Engaged } ) ;
engaged_begin = 0 ;
}
if ( ! alert_begin & & cs . getAlertType ( ) . size ( ) > 0 ) {
alert_begin = e - > mono_time ;
alert_type = TimelineType : : AlertInfo ;
if ( cs . getAlertStatus ( ) ! = cereal : : ControlsState : : AlertStatus : : NORMAL ) {
alert_type = cs . getAlertStatus ( ) = = cereal : : ControlsState : : AlertStatus : : USER_PROMPT
? TimelineType : : AlertWarning
: TimelineType : : AlertCritical ;
}
} else if ( alert_begin & & cs . getAlertType ( ) . size ( ) = = 0 ) {
std : : lock_guard lk ( timeline_lock ) ;
timeline . push_back ( { toSeconds ( alert_begin ) , toSeconds ( e - > mono_time ) , alert_type } ) ;
alert_begin = 0 ;
}
} else if ( e - > which = = cereal : : Event : : Which : : USER_FLAG ) {
std : : lock_guard lk ( timeline_lock ) ;
timeline . push_back ( { toSeconds ( e - > mono_time ) , toSeconds ( e - > mono_time ) , TimelineType : : UserFlag } ) ;
}
}
}
}
std : : optional < uint64_t > Replay : : find ( FindFlag flag ) {
int cur_ts = currentSeconds ( ) ;
for ( auto [ start_ts , end_ts , type ] : getTimeline ( ) ) {
if ( type = = TimelineType : : Engaged ) {
if ( flag = = FindFlag : : nextEngagement & & start_ts > cur_ts ) {
return start_ts ;
} else if ( flag = = FindFlag : : nextDisEngagement & & end_ts > cur_ts ) {
return end_ts ;
}
} else if ( type = = TimelineType : : UserFlag ) {
if ( flag = = FindFlag : : nextUserFlag & & start_ts > cur_ts ) {
return start_ts ;
}
}
}
return std : : nullopt ;
}
void Replay : : pause ( bool pause ) {
updateEvents ( [ = ] ( ) {
rWarning ( " %s at %d s " , pause ? " paused... " : " resuming " , currentSeconds ( ) ) ;
paused_ = pause ;
return true ;
} ) ;
}
void Replay : : setCurrentSegment ( int n ) {
if ( current_segment_ . exchange ( n ) ! = n ) {
QMetaObject : : invokeMethod ( this , & Replay : : queueSegment , Qt : : QueuedConnection ) ;
}
}
void Replay : : segmentLoadFinished ( bool success ) {
if ( ! success ) {
Segment * seg = qobject_cast < Segment * > ( sender ( ) ) ;
rWarning ( " failed to load segment %d, removing it from current replay list " , seg - > seg_num ) ;
segments_ . erase ( seg - > seg_num ) ;
}
queueSegment ( ) ;
}
void Replay : : queueSegment ( ) {
if ( segments_ . empty ( ) ) return ;
SegmentMap : : iterator cur , end ;
cur = end = segments_ . lower_bound ( std : : min ( current_segment_ . load ( ) , segments_ . rbegin ( ) - > first ) ) ;
for ( int i = 0 ; end ! = segments_ . end ( ) & & i < = FORWARD_SEGS ; + + i ) {
+ + end ;
}
// load one segment at a time
for ( auto it = cur ; it ! = end ; + + it ) {
auto & [ n , seg ] = * it ;
if ( ( seg & & ! seg - > isLoaded ( ) ) | | ! seg ) {
if ( ! seg ) {
rDebug ( " loading segment %d... " , n ) ;
seg = std : : make_unique < Segment > ( n , route_ - > at ( n ) , flags_ ) ;
QObject : : connect ( seg . get ( ) , & Segment : : loadFinished , this , & Replay : : segmentLoadFinished ) ;
}
break ;
}
}
const auto & cur_segment = cur - > second ;
// merge the previous adjacent segment if it's loaded
auto begin = segments_ . find ( cur_segment - > seg_num - 1 ) ;
if ( begin = = segments_ . end ( ) | | ! ( begin - > second & & begin - > second - > isLoaded ( ) ) ) {
begin = cur ;
}
mergeSegments ( begin , end ) ;
// free segments out of current semgnt window.
std : : for_each ( segments_ . begin ( ) , begin , [ ] ( auto & e ) { e . second . reset ( nullptr ) ; } ) ;
std : : for_each ( end , segments_ . end ( ) , [ ] ( auto & e ) { e . second . reset ( nullptr ) ; } ) ;
// start stream thread
if ( stream_thread_ = = nullptr & & cur_segment - > isLoaded ( ) ) {
startStream ( cur_segment . get ( ) ) ;
emit streamStarted ( ) ;
}
}
void Replay : : mergeSegments ( const SegmentMap : : iterator & begin , const SegmentMap : : iterator & end ) {
// merge 3 segments in sequence.
std : : vector < int > segments_need_merge ;
size_t new_events_size = 0 ;
for ( auto it = begin ; it ! = end & & it - > second & & it - > second - > isLoaded ( ) & & segments_need_merge . size ( ) < 3 ; + + it ) {
segments_need_merge . push_back ( it - > first ) ;
new_events_size + = it - > second - > log - > events . size ( ) ;
}
if ( segments_need_merge ! = segments_merged_ ) {
std : : string s ;
for ( int i = 0 ; i < segments_need_merge . size ( ) ; + + i ) {
s + = std : : to_string ( segments_need_merge [ i ] ) ;
if ( i ! = segments_need_merge . size ( ) - 1 ) s + = " , " ;
}
rDebug ( " merge segments %s " , s . c_str ( ) ) ;
new_events_ - > clear ( ) ;
new_events_ - > reserve ( new_events_size ) ;
for ( int n : segments_need_merge ) {
const auto & e = segments_ [ n ] - > log - > events ;
auto middle = new_events_ - > insert ( new_events_ - > end ( ) , e . begin ( ) , e . end ( ) ) ;
std : : inplace_merge ( new_events_ - > begin ( ) , middle , new_events_ - > end ( ) , Event : : lessThan ( ) ) ;
}
updateEvents ( [ & ] ( ) {
events_ . swap ( new_events_ ) ;
segments_merged_ = segments_need_merge ;
return true ;
} ) ;
}
}
void Replay : : startStream ( const Segment * cur_segment ) {
const auto & events = cur_segment - > log - > events ;
// get route start time from initData
auto it = std : : find_if ( events . begin ( ) , events . end ( ) , [ ] ( auto e ) { return e - > which = = cereal : : Event : : Which : : INIT_DATA ; } ) ;
route_start_ts_ = it ! = events . end ( ) ? ( * it ) - > mono_time : events [ 0 ] - > mono_time ;
cur_mono_time_ + = route_start_ts_ ;
// write CarParams
it = std : : find_if ( events . begin ( ) , events . end ( ) , [ ] ( auto e ) { return e - > which = = cereal : : Event : : Which : : CAR_PARAMS ; } ) ;
if ( it ! = events . end ( ) ) {
car_fingerprint_ = ( * it ) - > event . getCarParams ( ) . getCarFingerprint ( ) ;
capnp : : MallocMessageBuilder builder ;
builder . setRoot ( ( * it ) - > event . getCarParams ( ) ) ;
auto words = capnp : : messageToFlatArray ( builder ) ;
auto bytes = words . asBytes ( ) ;
Params ( ) . put ( " CarParams " , ( const char * ) bytes . begin ( ) , bytes . size ( ) ) ;
} else {
rWarning ( " failed to read CarParams from current segment " ) ;
}
// start camera server
if ( ! hasFlag ( REPLAY_FLAG_NO_VIPC ) ) {
std : : pair < int , int > camera_size [ MAX_CAMERAS ] = { } ;
for ( auto type : ALL_CAMERAS ) {
if ( auto & fr = cur_segment - > frames [ type ] ) {
camera_size [ type ] = { fr - > width , fr - > height } ;
}
}
camera_server_ = std : : make_unique < CameraServer > ( camera_size ) ;
}
// start stream thread
stream_thread_ = new QThread ( ) ;
QObject : : connect ( stream_thread_ , & QThread : : started , [ = ] ( ) { stream ( ) ; } ) ;
QObject : : connect ( stream_thread_ , & QThread : : finished , stream_thread_ , & QThread : : deleteLater ) ;
stream_thread_ - > start ( ) ;
timeline_future = QtConcurrent : : run ( this , & Replay : : buildTimeline ) ;
}
void Replay : : publishMessage ( const Event * e ) {
if ( sm = = nullptr ) {
auto bytes = e - > bytes ( ) ;
int ret = pm - > send ( sockets_ [ e - > which ] , ( capnp : : byte * ) bytes . begin ( ) , bytes . size ( ) ) ;
if ( ret = = - 1 ) {
rWarning ( " stop publishing %s due to multiple publishers error " , sockets_ [ e - > which ] ) ;
sockets_ [ e - > which ] = nullptr ;
}
} else {
sm - > update_msgs ( nanos_since_boot ( ) , { { sockets_ [ e - > which ] , e - > event } } ) ;
}
}
void Replay : : publishFrame ( const Event * e ) {
static const std : : map < cereal : : Event : : Which , CameraType > cam_types {
{ cereal : : Event : : ROAD_ENCODE_IDX , RoadCam } ,
{ cereal : : Event : : DRIVER_ENCODE_IDX , DriverCam } ,
{ cereal : : Event : : WIDE_ROAD_ENCODE_IDX , WideRoadCam } ,
} ;
if ( ( e - > which = = cereal : : Event : : DRIVER_ENCODE_IDX & & ! hasFlag ( REPLAY_FLAG_DCAM ) ) | |
( e - > which = = cereal : : Event : : WIDE_ROAD_ENCODE_IDX & & ! hasFlag ( REPLAY_FLAG_ECAM ) ) ) {
return ;
}
auto eidx = capnp : : AnyStruct : : Reader ( e - > event ) . getPointerSection ( ) [ 0 ] . getAs < cereal : : EncodeIndex > ( ) ;
if ( eidx . getType ( ) = = cereal : : EncodeIndex : : Type : : FULL_H_E_V_C & & isSegmentMerged ( eidx . getSegmentNum ( ) ) ) {
CameraType cam = cam_types . at ( e - > which ) ;
camera_server_ - > pushFrame ( cam , segments_ [ eidx . getSegmentNum ( ) ] - > frames [ cam ] . get ( ) , eidx ) ;
}
}
void Replay : : stream ( ) {
cereal : : Event : : Which cur_which = cereal : : Event : : Which : : INIT_DATA ;
std : : unique_lock lk ( stream_lock_ ) ;
while ( true ) {
stream_cv_ . wait ( lk , [ = ] ( ) { return exit_ | | ( events_updated_ & & ! paused_ ) ; } ) ;
events_updated_ = false ;
if ( exit_ ) break ;
Event cur_event ( cur_which , cur_mono_time_ ) ;
auto eit = std : : upper_bound ( events_ - > begin ( ) , events_ - > end ( ) , & cur_event , Event : : lessThan ( ) ) ;
if ( eit = = events_ - > end ( ) ) {
rInfo ( " waiting for events... " ) ;
continue ;
}
uint64_t evt_start_ts = cur_mono_time_ ;
uint64_t loop_start_ts = nanos_since_boot ( ) ;
for ( auto end = events_ - > end ( ) ; ! updating_events_ & & eit ! = end ; + + eit ) {
const Event * evt = ( * eit ) ;
cur_which = evt - > which ;
cur_mono_time_ = evt - > mono_time ;
setCurrentSegment ( toSeconds ( cur_mono_time_ ) / 60 ) ;
// migration for pandaState -> pandaStates to keep UI working for old segments
if ( cur_which = = cereal : : Event : : Which : : PANDA_STATE_D_E_P_R_E_C_A_T_E_D & &
sockets_ [ cereal : : Event : : Which : : PANDA_STATES ] ! = nullptr ) {
MessageBuilder msg ;
auto ps = msg . initEvent ( ) . initPandaStates ( 1 ) ;
ps [ 0 ] . setIgnitionLine ( true ) ;
ps [ 0 ] . setPandaType ( cereal : : PandaState : : PandaType : : DOS ) ;
pm - > send ( sockets_ [ cereal : : Event : : Which : : PANDA_STATES ] , msg ) ;
}
if ( cur_which < sockets_ . size ( ) & & sockets_ [ cur_which ] ! = nullptr ) {
// keep time
long etime = cur_mono_time_ - evt_start_ts ;
long rtime = nanos_since_boot ( ) - loop_start_ts ;
long behind_ns = etime - rtime ;
// if behind_ns is greater than 1 second, it means that an invalid segemnt is skipped by seeking/replaying
if ( behind_ns > = 1 * 1e9 ) {
// reset start times
evt_start_ts = cur_mono_time_ ;
loop_start_ts = nanos_since_boot ( ) ;
} else if ( behind_ns > 0 & & ! hasFlag ( REPLAY_FLAG_FULL_SPEED ) ) {
precise_nano_sleep ( behind_ns ) ;
}
if ( ! evt - > frame ) {
publishMessage ( evt ) ;
} else if ( camera_server_ ) {
if ( hasFlag ( REPLAY_FLAG_FULL_SPEED ) ) {
camera_server_ - > waitForSent ( ) ;
}
publishFrame ( evt ) ;
}
}
}
// wait for frame to be sent before unlock.(frameReader may be deleted after unlock)
if ( camera_server_ ) {
camera_server_ - > waitForSent ( ) ;
}
if ( eit = = events_ - > end ( ) & & ! hasFlag ( REPLAY_FLAG_NO_LOOP ) ) {
int last_segment = segments_ . rbegin ( ) - > first ;
if ( current_segment_ > = last_segment & & isSegmentMerged ( last_segment ) ) {
rInfo ( " reaches the end of route, restart from beginning " ) ;
QMetaObject : : invokeMethod ( this , std : : bind ( & Replay : : seekTo , this , 0 , false ) , Qt : : QueuedConnection ) ;
}
}
}
}