replay: improve seeking, queuing segments (#22625)

* improve seeking, queuing segments

* cleanup,init current_segment_ to 0

* use isSegmentLoaded

* remove to isSegmentMerged
old-commit-hash: 317deeae84
commatwo_master
Dean Lee 4 years ago committed by GitHub
parent 260b065e0e
commit c35975072e
  1. 71
      selfdrive/ui/replay/replay.cc
  2. 5
      selfdrive/ui/replay/replay.h
  3. 11
      selfdrive/ui/replay/tests/test_replay.cc

@ -85,15 +85,22 @@ void Replay::updateEvents(const std::function<bool()> &lambda) {
} }
void Replay::doSeek(int seconds, bool relative) { void Replay::doSeek(int seconds, bool relative) {
if (segments_.empty()) return;
updateEvents([&]() { updateEvents([&]() {
if (relative) { if (relative) {
seconds += currentSeconds(); seconds += currentSeconds();
} }
qInfo() << "seeking to" << seconds; int seg = seconds / 60;
const int max_segment_number = segments_.rbegin()->first; if (segments_.find(seg) == segments_.end()) {
cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (max_segment_number + 1) * 60) * 1e9; qInfo() << "can't seek to" << seconds << "s, segment" << seg << "is invalid";
current_segment_ = std::min(seconds / 60, max_segment_number); return true;
return false; }
qInfo() << "seeking to" << seconds << "s, segment" << seg;
current_segment_ = seg;
cur_mono_time_ = route_start_ts_ + seconds * 1e9;
return isSegmentMerged(seg);
}); });
queueSegment(); queueSegment();
} }
@ -125,47 +132,39 @@ void Replay::segmentLoadFinished(bool success) {
} }
void Replay::queueSegment() { void Replay::queueSegment() {
// get the current segment window if (segments_.empty()) return;
SegmentMap::iterator begin, cur, end;
begin = cur = end = segments_.lower_bound(current_segment_);
if (cur != segments_.end() && cur->second == nullptr) {
// just load one segment on starting replay or seeking
end++;
} else {
for (int i = 0; i < BACKWARD_SEGS && begin != segments_.begin(); ++i) {
--begin;
}
for (int i = 0; i <= FORWARD_SEGS && end != segments_.end(); ++i) {
++end;
}
}
// load & merge segments SegmentMap::iterator begin, cur, end;
for (auto it = begin; it != end; ++it) { begin = cur = end = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first));
auto &[n, seg] = *it; // set fwd to 0 to just load the current segment when seeking to a new window.
const int fwd = cur->second == nullptr ? 0 : FORWARD_SEGS;
for (int i = 0; end != segments_.end() && i <= fwd; ++end, ++i) {
auto &[n, seg] = *end;
if (!seg) { if (!seg) {
seg = std::make_unique<Segment>(n, route_->at(n), load_dcam, load_ecam); seg = std::make_unique<Segment>(n, route_->at(n), load_dcam, load_ecam);
QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
qInfo() << "loading segment" << n << "..."; qInfo() << "loading segment" << n << "...";
} }
} }
const auto &cur_segment = cur->second;
enableHttpLogging(!cur_segment->isLoaded());
// merge the previous adjacent segment if it's loaded
auto prev = segments_.find(cur_segment->seg_num - 1);
if (prev != segments_.end() && prev->second && prev->second->isLoaded()) {
begin = prev;
}
mergeSegments(begin, end); mergeSegments(begin, end);
// free segments out of current semgnt window. // free segments out of current semgnt window.
for (auto it = segments_.begin(); it != begin; ++it) { std::for_each(segments_.begin(), begin, [](auto &e) { e.second.reset(nullptr); });
it->second.reset(nullptr); std::for_each(end, segments_.end(), [](auto &e) { e.second.reset(nullptr); });
}
for (auto it = end; it != segments_.end(); ++it) {
it->second.reset(nullptr);
}
// start stream thread // start stream thread
bool current_segment_loaded = (cur != segments_.end() && cur->second->isLoaded()); if (stream_thread_ == nullptr && cur_segment->isLoaded()) {
if (stream_thread_ == nullptr && current_segment_loaded) { startStream(cur_segment.get());
startStream(cur->second.get());
} }
enableHttpLogging(!current_segment_loaded);
} }
void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
@ -193,8 +192,6 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
return true; return true;
}); });
delete prev_events; delete prev_events;
} else {
updateEvents([=]() { return true; });
} }
} }
@ -253,7 +250,7 @@ void Replay::publishFrame(const Event *e) {
return; return;
} }
auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs<cereal::EncodeIndex>(); auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs<cereal::EncodeIndex>();
if (eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C && isSegmentLoaded(eidx.getSegmentNum())) { if (eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C && isSegmentMerged(eidx.getSegmentNum())) {
CameraType cam = cam_types.at(e->which); CameraType cam = cam_types.at(e->which);
camera_server_->pushFrame(cam, segments_[eidx.getSegmentNum()]->frames[cam].get(), eidx); camera_server_->pushFrame(cam, segments_[eidx.getSegmentNum()]->frames[cam].get(), eidx);
} }
@ -326,7 +323,7 @@ void Replay::stream() {
if (eit == events_->end()) { if (eit == events_->end()) {
int last_segment = segments_.rbegin()->first; int last_segment = segments_.rbegin()->first;
if (current_segment_ >= last_segment && isSegmentLoaded(last_segment)) { if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) {
qInfo() << "reaches the end of route, restart from beginning"; qInfo() << "reaches the end of route, restart from beginning";
emit seekTo(0, false); emit seekTo(0, false);
} }

@ -6,7 +6,6 @@
#include "selfdrive/ui/replay/route.h" #include "selfdrive/ui/replay/route.h"
constexpr int FORWARD_SEGS = 2; constexpr int FORWARD_SEGS = 2;
constexpr int BACKWARD_SEGS = 1;
class Replay : public QObject { class Replay : public QObject {
Q_OBJECT Q_OBJECT
@ -39,7 +38,7 @@ protected:
void publishMessage(const Event *e); void publishMessage(const Event *e);
void publishFrame(const Event *e); void publishFrame(const Event *e);
inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; } inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; }
inline bool isSegmentLoaded(int n) { inline bool isSegmentMerged(int n) {
return std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end(); return std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end();
} }
@ -49,7 +48,7 @@ protected:
std::mutex stream_lock_; std::mutex stream_lock_;
std::condition_variable stream_cv_; std::condition_variable stream_cv_;
std::atomic<bool> updating_events_ = false; std::atomic<bool> updating_events_ = false;
std::atomic<int> current_segment_ = -1; std::atomic<int> current_segment_ = 0;
SegmentMap segments_; SegmentMap segments_;
// the following variables must be protected with stream_lock_ // the following variables must be protected with stream_lock_
bool exit_ = false; bool exit_ = false;

@ -112,15 +112,8 @@ void TestReplay::test_seek() {
stream_thread_ = new QThread(this); stream_thread_ = new QThread(this);
QEventLoop loop; QEventLoop loop;
std::thread thread = std::thread([&]() { std::thread thread = std::thread([&]() {
for (int i = 0; i < 50; ++i) { for (int i = 0; i < 100; ++i) {
testSeekTo(random_int(0, 3 * 60)); testSeekTo(random_int(0, 5 * 60));
}
// remove 3 segments
for (int n : {5, 6, 8}) {
segments_.erase(n);
}
for (int i =0; i < 50; ++i) {
testSeekTo(random_int(4 * 60, 9 * 60));
} }
loop.quit(); loop.quit();
}); });

Loading…
Cancel
Save