# include  "tools/replay/seg_mgr.h" 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								# include  <algorithm> 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								SegmentManager : : ~ SegmentManager ( )  { 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    std : : unique_lock  lock ( mutex_ ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    exit_  =  true ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  cv_ . notify_one ( ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  ( thread_ . joinable ( ) )  thread_ . join ( ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								} 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								bool  SegmentManager : : load ( )  { 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  ( ! route_ . load ( ) )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    rError ( " failed to load route: %s " ,  route_ . name ( ) . c_str ( ) ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  false ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  for  ( const  auto  & [ n ,  file ]  :  route_ . segments ( ) )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  ( ! file . rlog . empty ( )  | |  ! file . qlog . empty ( ) )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      segments_ . insert ( { n ,  nullptr } ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  ( segments_ . empty ( ) )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    rInfo ( " no valid segments in route: %s " ,  route_ . name ( ) . c_str ( ) ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  false ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  rInfo ( " loaded route %s with %zu valid segments " ,  route_ . name ( ) . c_str ( ) ,  segments_ . size ( ) ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  thread_  =  std : : thread ( & SegmentManager : : manageSegmentCache ,  this ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  true ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								} 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								void  SegmentManager : : setCurrentSegment ( int  seg_num )  { 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    std : : unique_lock  lock ( mutex_ ) ; 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    if  ( cur_seg_num_  = =  seg_num )  return ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cur_seg_num_  =  seg_num ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    needs_update_  =  true ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  cv_ . notify_one ( ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								} 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								void  SegmentManager : : manageSegmentCache ( )  { 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  while  ( true )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    std : : unique_lock  lock ( mutex_ ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    cv_ . wait ( lock ,  [ this ] ( )  {  return  exit_  | |  needs_update_ ;  } ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  ( exit_ )  break ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    needs_update_  =  false ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    auto  cur  =  segments_ . lower_bound ( cur_seg_num_ ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  ( cur  = =  segments_ . end ( ) )  continue ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    // Calculate the range of segments to load
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    auto  begin  =  std : : prev ( cur ,  std : : min < int > ( segment_cache_limit_  /  2 ,  std : : distance ( segments_ . begin ( ) ,  cur ) ) ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    auto  end  =  std : : next ( begin ,  std : : min < int > ( segment_cache_limit_ ,  std : : distance ( begin ,  segments_ . end ( ) ) ) ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    begin  =  std : : prev ( end ,  std : : min < int > ( segment_cache_limit_ ,  std : : distance ( segments_ . begin ( ) ,  end ) ) ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    lock . unlock ( ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    loadSegmentsInRange ( begin ,  cur ,  end ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    bool  merged  =  mergeSegments ( begin ,  end ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    // Free segments outside the current range
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    std : : for_each ( segments_ . begin ( ) ,  begin ,  [ ] ( auto  & segment )  {  segment . second . reset ( ) ;  } ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    std : : for_each ( end ,  segments_ . end ( ) ,  [ ] ( auto  & segment )  {  segment . second . reset ( ) ;  } ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  ( merged  & &  onSegmentMergedCallback_ )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      onSegmentMergedCallback_ ( ) ;   // Notify listener that segments have been merged
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								} 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								bool  SegmentManager : : mergeSegments ( const  SegmentMap : : iterator  & begin ,  const  SegmentMap : : iterator  & end )  { 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  std : : set < int >  segments_to_merge ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  size_t  total_event_count  =  0 ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  for  ( auto  it  =  begin ;  it  ! =  end ;  + + it )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    const  auto  & segment  =  it - > second ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  ( segment  & &  segment - > getState ( )  = =  Segment : : LoadState : : Loaded )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      segments_to_merge . insert ( segment - > seg_num ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      total_event_count  + =  segment - > log - > events . size ( ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  ( segments_to_merge  = =  merged_segments_ )  return  false ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  auto  merged_event_data  =  std : : make_shared < EventData > ( ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  auto  & merged_events  =  merged_event_data - > events ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  merged_events . reserve ( total_event_count ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  rDebug ( " merging segments: %s " ,  join ( segments_to_merge ,  " ,  " ) . c_str ( ) ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  for  ( int  n  :  segments_to_merge )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    const  auto  & events  =  segments_ . at ( n ) - > log - > events ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  ( events . empty ( ) )  continue ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    // Skip INIT_DATA if present
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    auto  events_begin  =  ( events . front ( ) . which  = =  cereal : : Event : : Which : : INIT_DATA )  ?  std : : next ( events . begin ( ) )  :  events . begin ( ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    size_t  previous_size  =  merged_events . size ( ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    merged_events . insert ( merged_events . end ( ) ,  events_begin ,  events . end ( ) ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    std : : inplace_merge ( merged_events . begin ( ) ,  merged_events . begin ( )  +  previous_size ,  merged_events . end ( ) ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    merged_event_data - > segments [ n ]  =  segments_ . at ( n ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  std : : atomic_store ( & event_data_ ,  std : : move ( merged_event_data ) ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  merged_segments_  =  segments_to_merge ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  return  true ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								} 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								void  SegmentManager : : loadSegmentsInRange ( SegmentMap : : iterator  begin ,  SegmentMap : : iterator  cur ,  SegmentMap : : iterator  end )  { 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  auto  tryLoadSegment  =  [ this ] ( auto  first ,  auto  last )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  ( auto  it  =  first ;  it  ! =  last ;  + + it )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      auto  & segment_ptr  =  it - > second ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  ( ! segment_ptr )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        segment_ptr  =  std : : make_shared < Segment > ( 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            it - > first ,  route_ . at ( it - > first ) ,  flags_ ,  filters_ , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								            [ this ] ( int  seg_num ,  bool  success )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              std : : unique_lock  lock ( mutex_ ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              needs_update_  =  true ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								              cv_ . notify_one ( ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								            } ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  ( segment_ptr - > getState ( )  = =  Segment : : LoadState : : Loading )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        return  true ;   // Segment is still loading
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  false ;   // No segments need loading
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  // Try forward loading, then reverse if necessary
   
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  if  ( ! tryLoadSegment ( cur ,  end ) )  { 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    tryLoadSegment ( std : : make_reverse_iterator ( cur ) ,  std : : make_reverse_iterator ( begin ) ) ; 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								}