replay: improve segment loading and event handling (#34490)

improve segment Loading and Event Handling
pull/34493/head
Dean Lee 3 months ago committed by GitHub
parent 2eb3585dae
commit 227bb68e18
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      tools/cabana/streams/abstractstream.cc
  2. 12
      tools/replay/replay.cc
  3. 1
      tools/replay/replay.h
  4. 13
      tools/replay/seg_mgr.cc
  5. 2
      tools/replay/seg_mgr.h

@ -226,7 +226,7 @@ std::pair<CanEventIter, CanEventIter> AbstractStream::eventsInRange(const Messag
if (!time_range) return {events.begin(), events.end()}; if (!time_range) return {events.begin(), events.end()};
auto first = std::lower_bound(events.begin(), events.end(), can->toMonoTime(time_range->first), CompareCanEvent()); auto first = std::lower_bound(events.begin(), events.end(), can->toMonoTime(time_range->first), CompareCanEvent());
auto last = std::upper_bound(events.begin(), events.end(), can->toMonoTime(time_range->second), CompareCanEvent()); auto last = std::upper_bound(first, events.end(), can->toMonoTime(time_range->second), CompareCanEvent());
return {first, last}; return {first, last};
} }

@ -109,6 +109,7 @@ void Replay::seekTo(double seconds, bool relative) {
interruptStream([&]() { interruptStream([&]() {
current_segment_.store(target_segment); current_segment_.store(target_segment);
cur_mono_time_ = route_start_ts_ + target_time * 1e9; cur_mono_time_ = route_start_ts_ + target_time * 1e9;
cur_which_ = cereal::Event::Which::INIT_DATA;
seeking_to_.store(target_time, std::memory_order_relaxed); seeking_to_.store(target_time, std::memory_order_relaxed);
return false; return false;
}); });
@ -250,7 +251,6 @@ void Replay::publishFrame(const Event *e) {
void Replay::streamThread() { void Replay::streamThread() {
stream_thread_id = pthread_self(); stream_thread_id = pthread_self();
cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA;
std::unique_lock lk(stream_lock_); std::unique_lock lk(stream_lock_);
while (true) { while (true) {
@ -259,7 +259,7 @@ void Replay::streamThread() {
event_data_ = seg_mgr_->getEventData(); event_data_ = seg_mgr_->getEventData();
const auto &events = event_data_->events; const auto &events = event_data_->events;
auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which, cur_mono_time_, {})); auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which_, cur_mono_time_, {}));
if (first == events.cend()) { if (first == events.cend()) {
rInfo("waiting for events..."); rInfo("waiting for events...");
events_ready_ = false; events_ready_ = false;
@ -273,9 +273,7 @@ void Replay::streamThread() {
camera_server_->waitForSent(); camera_server_->waitForSent();
} }
if (it != events.cend()) { if (it == events.cend() && !hasFlag(REPLAY_FLAG_NO_LOOP)) {
cur_which = it->which;
} else if (!hasFlag(REPLAY_FLAG_NO_LOOP)) {
int last_segment = seg_mgr_->route_.segments().rbegin()->first; int last_segment = seg_mgr_->route_.segments().rbegin()->first;
if (event_data_->isSegmentLoaded(last_segment)) { if (event_data_->isSegmentLoaded(last_segment)) {
rInfo("reaches the end of route, restart from beginning"); rInfo("reaches the end of route, restart from beginning");
@ -302,10 +300,12 @@ std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::con
seg_mgr_->setCurrentSegment(segment); seg_mgr_->setCurrentSegment(segment);
} }
cur_mono_time_ = evt.mono_time;
cur_which_ = evt.which;
// Skip events if socket is not present // Skip events if socket is not present
if (!sockets_[evt.which]) continue; if (!sockets_[evt.which]) continue;
cur_mono_time_ = evt.mono_time;
const uint64_t current_nanos = nanos_since_boot(); const uint64_t current_nanos = nanos_since_boot();
const int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (current_nanos - loop_start_ts); const int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (current_nanos - loop_start_ts);

@ -93,6 +93,7 @@ private:
std::time_t route_date_time_; std::time_t route_date_time_;
uint64_t route_start_ts_ = 0; uint64_t route_start_ts_ = 0;
std::atomic<uint64_t> cur_mono_time_ = 0; std::atomic<uint64_t> cur_mono_time_ = 0;
cereal::Event::Which cur_which_ = cereal::Event::Which::INIT_DATA;
double min_seconds_ = 0; double min_seconds_ = 0;
double max_seconds_ = 0; double max_seconds_ = 0;
SubMaster *sm_ = nullptr; SubMaster *sm_ = nullptr;

@ -6,7 +6,6 @@ SegmentManager::~SegmentManager() {
{ {
std::unique_lock lock(mutex_); std::unique_lock lock(mutex_);
exit_ = true; exit_ = true;
onSegmentMergedCallback_ = nullptr;
} }
cv_.notify_one(); cv_.notify_one();
if (thread_.joinable()) thread_.join(); if (thread_.joinable()) thread_.join();
@ -37,6 +36,8 @@ bool SegmentManager::load() {
void SegmentManager::setCurrentSegment(int seg_num) { void SegmentManager::setCurrentSegment(int seg_num) {
{ {
std::unique_lock lock(mutex_); std::unique_lock lock(mutex_);
if (cur_seg_num_ == seg_num) return;
cur_seg_num_ = seg_num; cur_seg_num_ = seg_num;
needs_update_ = true; needs_update_ = true;
} }
@ -58,6 +59,8 @@ void SegmentManager::manageSegmentCache() {
auto end = std::next(begin, std::min<int>(segment_cache_limit_, std::distance(begin, segments_.end()))); auto end = std::next(begin, std::min<int>(segment_cache_limit_, std::distance(begin, segments_.end())));
begin = std::prev(end, std::min<int>(segment_cache_limit_, std::distance(segments_.begin(), end))); begin = std::prev(end, std::min<int>(segment_cache_limit_, std::distance(segments_.begin(), end)));
lock.unlock();
loadSegmentsInRange(begin, cur, end); loadSegmentsInRange(begin, cur, end);
bool merged = mergeSegments(begin, end); bool merged = mergeSegments(begin, end);
@ -65,8 +68,6 @@ void SegmentManager::manageSegmentCache() {
std::for_each(segments_.begin(), begin, [](auto &segment) { segment.second.reset(); }); std::for_each(segments_.begin(), begin, [](auto &segment) { segment.second.reset(); });
std::for_each(end, segments_.end(), [](auto &segment) { segment.second.reset(); }); std::for_each(end, segments_.end(), [](auto &segment) { segment.second.reset(); });
lock.unlock();
if (merged && onSegmentMergedCallback_) { if (merged && onSegmentMergedCallback_) {
onSegmentMergedCallback_(); // Notify listener that segments have been merged onSegmentMergedCallback_(); // Notify listener that segments have been merged
} }
@ -118,7 +119,11 @@ void SegmentManager::loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap:
if (!segment_ptr) { if (!segment_ptr) {
segment_ptr = std::make_shared<Segment>( segment_ptr = std::make_shared<Segment>(
it->first, route_.at(it->first), flags_, filters_, it->first, route_.at(it->first), flags_, filters_,
[this](int seg_num, bool success) { setCurrentSegment(cur_seg_num_); }); [this](int seg_num, bool success) {
std::unique_lock lock(mutex_);
needs_update_ = true;
cv_.notify_one();
});
} }
if (segment_ptr->getState() == Segment::LoadState::Loading) { if (segment_ptr->getState() == Segment::LoadState::Loading) {

@ -45,7 +45,7 @@ private:
std::mutex mutex_; std::mutex mutex_;
std::condition_variable cv_; std::condition_variable cv_;
std::thread thread_; std::thread thread_;
std::atomic<int> cur_seg_num_ = -1; int cur_seg_num_ = -1;
bool needs_update_ = false; bool needs_update_ = false;
bool exit_ = false; bool exit_ = false;

Loading…
Cancel
Save