From 317deeae847747810f632acd762595da759b41a2 Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Wed, 20 Oct 2021 13:23:53 +0800 Subject: [PATCH] replay: improve seeking, queuing segments (#22625) * improve seeking, queuing segments * cleanup,init current_segment_ to 0 * use isSegmentLoaded * remove to isSegmentMerged --- selfdrive/ui/replay/replay.cc | 71 ++++++++++++------------ selfdrive/ui/replay/replay.h | 5 +- selfdrive/ui/replay/tests/test_replay.cc | 11 +--- 3 files changed, 38 insertions(+), 49 deletions(-) diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index 3e9f5ac059..e4c9db1d48 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -85,15 +85,22 @@ void Replay::updateEvents(const std::function &lambda) { } void Replay::doSeek(int seconds, bool relative) { + if (segments_.empty()) return; + updateEvents([&]() { if (relative) { seconds += currentSeconds(); } - qInfo() << "seeking to" << seconds; - const int max_segment_number = segments_.rbegin()->first; - cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (max_segment_number + 1) * 60) * 1e9; - current_segment_ = std::min(seconds / 60, max_segment_number); - return false; + int seg = seconds / 60; + if (segments_.find(seg) == segments_.end()) { + qInfo() << "can't seek to" << seconds << "s, segment" << seg << "is invalid"; + return true; + } + + qInfo() << "seeking to" << seconds << "s, segment" << seg; + current_segment_ = seg; + cur_mono_time_ = route_start_ts_ + seconds * 1e9; + return isSegmentMerged(seg); }); queueSegment(); } @@ -125,47 +132,39 @@ void Replay::segmentLoadFinished(bool success) { } void Replay::queueSegment() { - // get the current segment window - SegmentMap::iterator begin, cur, end; - begin = cur = end = segments_.lower_bound(current_segment_); - if (cur != segments_.end() && cur->second == nullptr) { - // just load one segment on starting replay or seeking - end++; - } else { - for (int i = 0; i < BACKWARD_SEGS && begin != segments_.begin(); ++i) { - --begin; - } - for (int i = 0; i <= FORWARD_SEGS && end != segments_.end(); ++i) { - ++end; - } - } + if (segments_.empty()) return; - // load & merge segments - for (auto it = begin; it != end; ++it) { - auto &[n, seg] = *it; + SegmentMap::iterator begin, cur, end; + begin = cur = end = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first)); + // set fwd to 0 to just load the current segment when seeking to a new window. + const int fwd = cur->second == nullptr ? 0 : FORWARD_SEGS; + for (int i = 0; end != segments_.end() && i <= fwd; ++end, ++i) { + auto &[n, seg] = *end; if (!seg) { seg = std::make_unique(n, route_->at(n), load_dcam, load_ecam); QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); qInfo() << "loading segment" << n << "..."; } } + + const auto &cur_segment = cur->second; + enableHttpLogging(!cur_segment->isLoaded()); + + // merge the previous adjacent segment if it's loaded + auto prev = segments_.find(cur_segment->seg_num - 1); + if (prev != segments_.end() && prev->second && prev->second->isLoaded()) { + begin = prev; + } mergeSegments(begin, end); // free segments out of current semgnt window. - for (auto it = segments_.begin(); it != begin; ++it) { - it->second.reset(nullptr); - } - for (auto it = end; it != segments_.end(); ++it) { - it->second.reset(nullptr); - } + std::for_each(segments_.begin(), begin, [](auto &e) { e.second.reset(nullptr); }); + std::for_each(end, segments_.end(), [](auto &e) { e.second.reset(nullptr); }); // start stream thread - bool current_segment_loaded = (cur != segments_.end() && cur->second->isLoaded()); - if (stream_thread_ == nullptr && current_segment_loaded) { - startStream(cur->second.get()); + if (stream_thread_ == nullptr && cur_segment->isLoaded()) { + startStream(cur_segment.get()); } - - enableHttpLogging(!current_segment_loaded); } void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { @@ -193,8 +192,6 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: return true; }); delete prev_events; - } else { - updateEvents([=]() { return true; }); } } @@ -253,7 +250,7 @@ void Replay::publishFrame(const Event *e) { return; } auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs(); - if (eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C && isSegmentLoaded(eidx.getSegmentNum())) { + if (eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C && isSegmentMerged(eidx.getSegmentNum())) { CameraType cam = cam_types.at(e->which); camera_server_->pushFrame(cam, segments_[eidx.getSegmentNum()]->frames[cam].get(), eidx); } @@ -326,7 +323,7 @@ void Replay::stream() { if (eit == events_->end()) { int last_segment = segments_.rbegin()->first; - if (current_segment_ >= last_segment && isSegmentLoaded(last_segment)) { + if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) { qInfo() << "reaches the end of route, restart from beginning"; emit seekTo(0, false); } diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index 624c62735d..e0380e06b0 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -6,7 +6,6 @@ #include "selfdrive/ui/replay/route.h" constexpr int FORWARD_SEGS = 2; -constexpr int BACKWARD_SEGS = 1; class Replay : public QObject { Q_OBJECT @@ -39,7 +38,7 @@ protected: void publishMessage(const Event *e); void publishFrame(const Event *e); inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; } - inline bool isSegmentLoaded(int n) { + inline bool isSegmentMerged(int n) { return std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end(); } @@ -49,7 +48,7 @@ protected: std::mutex stream_lock_; std::condition_variable stream_cv_; std::atomic updating_events_ = false; - std::atomic current_segment_ = -1; + std::atomic current_segment_ = 0; SegmentMap segments_; // the following variables must be protected with stream_lock_ bool exit_ = false; diff --git a/selfdrive/ui/replay/tests/test_replay.cc b/selfdrive/ui/replay/tests/test_replay.cc index 83e633c324..4693a9807a 100644 --- a/selfdrive/ui/replay/tests/test_replay.cc +++ b/selfdrive/ui/replay/tests/test_replay.cc @@ -112,15 +112,8 @@ void TestReplay::test_seek() { stream_thread_ = new QThread(this); QEventLoop loop; std::thread thread = std::thread([&]() { - for (int i = 0; i < 50; ++i) { - testSeekTo(random_int(0, 3 * 60)); - } - // remove 3 segments - for (int n : {5, 6, 8}) { - segments_.erase(n); - } - for (int i =0; i < 50; ++i) { - testSeekTo(random_int(4 * 60, 9 * 60)); + for (int i = 0; i < 100; ++i) { + testSeekTo(random_int(0, 5 * 60)); } loop.quit(); });