replay: simplify seek&merge (#22463)

* simplify seek&merge

* update test cases

update test cases

* cleanup test cases

* new function currentSeconds

* add TODO

* thread safe publishFrame

* cleanup

* fix 'at x S' not printed if seek back to old time

* exit replay if failed to load route

* move out setCurrentSegment from if statement

* cleanup

* use std::find

* const variables
old-commit-hash: 5527736df6
commatwo_master
Dean Lee 4 years ago committed by GitHub
parent 6dea543246
commit 0f49ecbff7
  1. 20
      selfdrive/ui/replay/main.cc
  2. 128
      selfdrive/ui/replay/replay.cc
  3. 12
      selfdrive/ui/replay/replay.h
  4. 6
      selfdrive/ui/replay/route.cc
  5. 8
      selfdrive/ui/replay/route.h
  6. 110
      selfdrive/ui/replay/tests/test_replay.cc

@ -46,24 +46,24 @@ void keyboardThread(Replay *replay) {
try {
if (r[0] == '#') {
r.erase(0, 1);
replay->seekTo(std::stoi(r) * 60);
replay->seekTo(std::stoi(r) * 60, false);
} else {
replay->seekTo(std::stoi(r));
replay->seekTo(std::stoi(r), false);
}
} catch (std::invalid_argument) {
qDebug() << "invalid argument";
}
getch(); // remove \n from entering seek
} else if (c == 'm') {
replay->relativeSeek(+60);
replay->seekTo(+60, true);
} else if (c == 'M') {
replay->relativeSeek(-60);
replay->seekTo(-60, true);
} else if (c == 's') {
replay->relativeSeek(+10);
replay->seekTo(+10, true);
} else if (c == 'S') {
replay->relativeSeek(-10);
replay->seekTo(-10, true);
} else if (c == 'G') {
replay->relativeSeek(0);
replay->seekTo(0, true);
} else if (c == ' ') {
replay->pause(!replay->isPaused());
}
@ -97,10 +97,10 @@ int main(int argc, char *argv[]){
QStringList block = parser.value("block").isEmpty() ? QStringList{} : parser.value("block").split(",");
Replay *replay = new Replay(route, allow, block, nullptr, parser.isSet("dcam"), parser.isSet("ecam"), &app);
if (replay->load()) {
replay->start(parser.value("start").toInt());
if (!replay->load()) {
return 0;
}
replay->start(parser.value("start").toInt());
// start keyboard control thread
QThread *t = QThread::create(keyboardThread, replay);
QObject::connect(t, &QThread::finished, t, &QThread::deleteLater);

@ -3,8 +3,8 @@
#include <QApplication>
#include <QDebug>
#include <capnp/dynamic.h>
#include "cereal/services.h"
#include "selfdrive/camerad/cameras/camera_common.h"
#include "selfdrive/common/timing.h"
#include "selfdrive/hardware/hw.h"
#include "selfdrive/ui/replay/util.h"
@ -27,16 +27,15 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
if (sm == nullptr) {
pm = new PubMaster(s);
}
route_ = std::make_unique<Route>(route);
events_ = new std::vector<Event *>();
// queueSegment is always executed in the main thread
// doSeek & queueSegment are always executed in the same thread
connect(this, &Replay::seekTo, this, &Replay::doSeek);
connect(this, &Replay::segmentChanged, this, &Replay::queueSegment);
}
Replay::~Replay() {
qDebug() << "shutdown: in progress...";
exit_ = true;
updating_events_ = true;
if (stream_thread_) {
@ -53,18 +52,27 @@ Replay::~Replay() {
}
bool Replay::load() {
if (!route_->load() || route_->size() == 0) {
qDebug() << "failed load route" << route_->name() << "from server";
if (!route_->load()) {
qDebug() << "failed to load route" << route_->name() << "from server";
return false;
}
qDebug() << "load route" << route_->name() << route_->size() << "segments";
segments_.resize(route_->size());
for (int i = 0; i < route_->size(); ++i) {
const SegmentFile &f = route_->at(i);
if ((!f.rlog.isEmpty() || !f.qlog.isEmpty()) && (!f.road_cam.isEmpty() || !f.qcamera.isEmpty())) {
segments_[i] = nullptr;
}
}
if (segments_.empty()) {
qDebug() << "no valid segments in route" << route_->name();
return false;
}
qDebug() << "load route" << route_->name() << "with" << segments_.size() << "valid segments";
return true;
}
void Replay::start(int seconds) {
seekTo(seconds);
seekTo(seconds, false);
camera_server_ = std::make_unique<CameraServer>();
// start stream thread
@ -84,35 +92,24 @@ void Replay::updateEvents(const std::function<bool()> &lambda) {
stream_cv_.notify_one();
}
void Replay::seekTo(int seconds, bool relative) {
if (segments_.empty()) return;
bool segment_loaded = false;
bool segment_changed = false;
void Replay::doSeek(int seconds, bool relative) {
updateEvents([&]() {
if (relative) {
seconds += ((cur_mono_time_ - route_start_ts_) * 1e-9);
seconds += currentSeconds();
}
qInfo() << "seeking to" << seconds;
cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (int)segments_.size() * 60) * 1e9;
int segment = std::min(seconds / 60, (int)segments_.size() - 1);
segment_changed = current_segment_.exchange(segment) != segment;
segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), segment) != segments_merged_.end();
return segment_loaded;
cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (int)segments_.rbegin()->first * 60) * 1e9;
current_segment_ = std::min(seconds / 60, (int)segments_.rbegin()->first - 1);
return false;
});
if (segment_changed || !segment_loaded) {
emit segmentChanged();
}
queueSegment();
}
void Replay::pause(bool pause) {
updateEvents([=]() {
qDebug() << (pause ? "paused..." : "resuming");
if (pause) {
float current_ts = (cur_mono_time_ - route_start_ts_) / 1e9;
qInfo() << "at " << current_ts << "s";
qInfo() << "at " << currentSeconds() << "s";
}
paused_ = pause;
return true;
@ -127,43 +124,36 @@ void Replay::setCurrentSegment(int n) {
// maintain the segment window
void Replay::queueSegment() {
// fetch segments forward
int cur_seg = current_segment_.load();
int end_idx = cur_seg;
for (int i = cur_seg, fwd = 0; i < segments_.size() && fwd <= FORWARD_SEGS; ++i) {
if (!segments_[i]) {
segments_[i] = std::make_unique<Segment>(i, route_->at(i), load_dcam, load_ecam);
QObject::connect(segments_[i].get(), &Segment::loadFinished, this, &Replay::queueSegment);
}
end_idx = i;
// skip invalid segment
if (segments_[i]->isValid()) {
++fwd;
} else if (i == cur_seg) {
++cur_seg;
// forward fetch segments
SegmentMap::iterator begin, end;
begin = end = segments_.lower_bound(current_segment_);
for (int fwd = 0; end != segments_.end() && fwd <= FORWARD_SEGS; ++end, ++fwd) {
auto &[n, seg] = *end;
if (!seg) {
seg = std::make_unique<Segment>(n, route_->at(n), load_dcam, load_ecam);
QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::queueSegment);
}
}
// merge segments
mergeSegments(begin, end);
// free segments out of current semgnt window.
for (auto it = segments_.begin(); it != begin; ++it) {
it->second.reset(nullptr);
}
for (auto it = end; it != segments_.end(); ++it) {
it->second.reset(nullptr);
}
mergeSegments(std::min(cur_seg, (int)segments_.size() - 1), end_idx);
}
void Replay::mergeSegments(int cur_seg, int end_idx) {
void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
// segments must be merged in sequence.
std::vector<int> segments_need_merge;
const int begin_idx = std::max(cur_seg - BACKWARD_SEGS, 0);
for (int i = begin_idx; i <= end_idx; ++i) {
if (segments_[i] && segments_[i]->isLoaded()) {
segments_need_merge.push_back(i);
} else if (i >= cur_seg && segments_[i] && segments_[i]->isValid()) {
// segment is valid,but still loading. can't skip it to merge the next one.
// otherwise the stream thread may jump to the next segment.
break;
}
for (auto it = begin; it != end && it->second->isLoaded(); ++it) {
segments_need_merge.push_back(it->first);
}
if (segments_need_merge != segments_merged_) {
qDebug() << "merge segments" << segments_need_merge;
// merge & sort events
std::vector<Event *> *new_events = new std::vector<Event *>();
new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0,
@ -173,7 +163,6 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end());
std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan());
}
// update events
auto prev_events = events_;
updateEvents([&]() {
@ -182,7 +171,7 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; });
if (it != new_events->end()) {
route_start_ts_ = (*it)->mono_time;
// cur_mono_time_ is set by seekTo int start() before get route_start_ts_
// cur_mono_time_ is set by seekTo in start() before get route_start_ts_
cur_mono_time_ += route_start_ts_;
}
}
@ -193,22 +182,16 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
});
delete prev_events;
} else {
updateEvents([]() { return true; });
}
// free segments out of current semgnt window.
for (int i = 0; i < segments_.size(); i++) {
if ((i < begin_idx || i > end_idx) && segments_[i]) {
segments_[i].reset(nullptr);
}
updateEvents([=]() { return begin->second->isLoaded(); });
}
}
void Replay::publishFrame(const Event *e) {
auto publish = [=](CameraType cam_type, const cereal::EncodeIndex::Reader &eidx) {
auto &seg = segments_[eidx.getSegmentNum()];
if (seg && seg->isLoaded() && seg->frames[cam_type] && eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) {
camera_server_->pushFrame(cam_type, seg->frames[cam_type].get(), eidx);
int n = eidx.getSegmentNum();
bool segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end();
if (segment_loaded && segments_[n]->frames[cam_type] && eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) {
camera_server_->pushFrame(cam_type, segments_[n]->frames[cam_type].get(), eidx);
}
};
if (e->which == cereal::Event::ROAD_ENCODE_IDX) {
@ -238,22 +221,21 @@ void Replay::stream() {
continue;
}
uint64_t evt_start_ts = cur_mono_time_;
uint64_t loop_start_ts = nanos_since_boot();
const uint64_t evt_start_ts = cur_mono_time_;
const uint64_t loop_start_ts = nanos_since_boot();
for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) {
const Event *evt = (*eit);
cur_which = evt->which;
cur_mono_time_ = evt->mono_time;
if (cur_which < sockets_.size() && sockets_[cur_which] != nullptr) {
int current_ts = (cur_mono_time_ - route_start_ts_) / 1e9;
if ((current_ts - last_print) > 5.0) {
const int current_ts = currentSeconds();
if (last_print > current_ts || (current_ts - last_print) > 5.0) {
last_print = current_ts;
qInfo() << "at " << current_ts << "s";
}
setCurrentSegment(current_ts / 60);
if (cur_which < sockets_.size() && sockets_[cur_which] != nullptr) {
// keep time
long etime = cur_mono_time_ - evt_start_ts;
long rtime = nanos_since_boot() - loop_start_ts;

@ -2,8 +2,6 @@
#include <QThread>
#include <capnp/dynamic.h>
#include "cereal/visionipc/visionipc_server.h"
#include "selfdrive/ui/replay/camera.h"
#include "selfdrive/ui/replay/route.h"
@ -18,23 +16,25 @@ public:
~Replay();
bool load();
void start(int seconds = 0);
void seekTo(int seconds, bool relative = false);
void relativeSeek(int seconds) { seekTo(seconds, true); }
void pause(bool pause);
bool isPaused() const { return paused_; }
signals:
void segmentChanged();
void seekTo(int seconds, bool relative);
protected slots:
void queueSegment();
void doSeek(int seconds, bool relative);
protected:
typedef std::map<int, std::unique_ptr<Segment>> SegmentMap;
void stream();
void setCurrentSegment(int n);
void mergeSegments(int begin_idx, int end_idx);
void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end);
void updateEvents(const std::function<bool()>& lambda);
void publishFrame(const Event *e);
inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; }
QThread *stream_thread_ = nullptr;
@ -43,7 +43,7 @@ protected:
std::condition_variable stream_cv_;
std::atomic<bool> updating_events_ = false;
std::atomic<int> current_segment_ = -1;
std::vector<std::unique_ptr<Segment>> segments_;
SegmentMap segments_;
// the following variables must be protected with stream_lock_
bool exit_ = false;
bool paused_ = false;

@ -77,9 +77,7 @@ Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool l
// fallback to qcamera/qlog
road_cam_path_ = files_.road_cam.isEmpty() ? files_.qcamera : files_.road_cam;
log_path_ = files_.rlog.isEmpty() ? files_.qlog : files_.rlog;
valid_ = !log_path_.isEmpty() && !road_cam_path_.isEmpty();
if (!valid_) return;
assert (!log_path_.isEmpty() && !road_cam_path_.isEmpty());
if (!load_dcam) {
files_.driver_cam = "";
@ -151,7 +149,7 @@ void Segment::load() {
}
int success_cnt = std::accumulate(futures.begin(), futures.end(), 0, [=](int v, auto &f) { return f.get() + v; });
loaded_ = valid_ = (success_cnt == futures.size());
loaded_ = (success_cnt == futures.size());
emit loadFinished();
}

@ -25,17 +25,14 @@ class Route {
public:
Route(const QString &route);
bool load();
inline const QString &name() const { return route_; };
inline int size() const { return segments_.size(); }
inline SegmentFile &at(int n) { return segments_[n]; }
// public for unit tests
std::vector<SegmentFile> segments_;
protected:
bool loadFromJson(const QString &json);
QString route_;
std::vector<SegmentFile> segments_;
};
class Segment : public QObject {
@ -44,7 +41,6 @@ class Segment : public QObject {
public:
Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam);
~Segment();
inline bool isValid() const { return valid_; };
inline bool isLoaded() const { return loaded_; }
std::unique_ptr<LogReader> log;
@ -58,7 +54,7 @@ protected:
void downloadFile(const QString &url);
QString localPath(const QUrl &url);
std::atomic<bool> loaded_ = false, valid_ = false;
std::atomic<bool> loaded_ = false;
std::atomic<bool> aborting_ = false;
std::atomic<int> downloading_ = 0;
int seg_num_ = 0;

@ -1,38 +1,12 @@
#include <QCryptographicHash>
#include <QDebug>
#include <QEventLoop>
#include <QString>
#include <set>
#include <future>
#include "catch2/catch.hpp"
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/framereader.h"
#include "selfdrive/ui/replay/replay.h"
#include "selfdrive/ui/replay/route.h"
#include "selfdrive/ui/replay/util.h"
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc";
// TEST_CASE("FrameReader") {
// SECTION("process&get") {
// FrameReader fr;
// REQUIRE(fr.load(stream_url) == true);
// REQUIRE(fr.valid() == true);
// REQUIRE(fr.getFrameCount() == 1200);
// // random get 50 frames
// // srand(time(NULL));
// // for (int i = 0; i < 50; ++i) {
// // int idx = rand() % (fr.getFrameCount() - 1);
// // REQUIRE(fr.get(idx) != nullptr);
// // }
// // sequence get 50 frames {
// for (int i = 0; i < 50; ++i) {
// REQUIRE(fr.get(i) != nullptr);
// }
// }
// }
const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36";
std::string sha_256(const QString &dat) {
return QString(QCryptographicHash::hash(dat.toUtf8(), QCryptographicHash::Sha256).toHex()).toStdString();
}
@ -43,6 +17,7 @@ TEST_CASE("httpMultiPartDownload") {
REQUIRE(fd != -1);
close(fd);
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc";
SECTION("http 200") {
REQUIRE(httpMultiPartDownload(stream_url, filename, 5));
std::string content = util::read_file(filename);
@ -76,8 +51,6 @@ bool is_events_ordered(const std::vector<Event *> &events) {
return true;
}
const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36";
TEST_CASE("Segment") {
Route demo_route(DEMO_ROUTE);
REQUIRE(demo_route.load());
@ -85,43 +58,46 @@ TEST_CASE("Segment") {
QEventLoop loop;
Segment segment(0, demo_route.at(0), false, false);
REQUIRE(segment.isValid() == true);
REQUIRE(segment.isLoaded() == false);
QObject::connect(&segment, &Segment::loadFinished, [&]() {
REQUIRE(segment.isLoaded() == true);
REQUIRE(segment.log != nullptr);
REQUIRE(segment.log->events.size() > 0);
REQUIRE(is_events_ordered(segment.log->events));
REQUIRE(segment.frames[RoadCam] != nullptr);
REQUIRE(segment.frames[RoadCam]->getFrameCount() > 0);
REQUIRE(segment.frames[DriverCam] == nullptr);
REQUIRE(segment.frames[WideRoadCam] == nullptr);
// LogReader & FrameReader
REQUIRE(segment.log->events.size() > 0);
REQUIRE(is_events_ordered(segment.log->events));
// sequence get 50 frames {
REQUIRE(segment.frames[RoadCam]->getFrameCount() == 1200);
for (int i = 0; i < 50; ++i) {
REQUIRE(segment.frames[RoadCam]->get(i));
}
loop.quit();
});
loop.exec();
}
/*
// helper class for unit tests
class TestReplay : public Replay {
public:
TestReplay(const QString &route) : Replay(route, {}, {}) {}
void test_seek();
protected:
void testSeekTo(int seek_to, const std::set<int> &invalid_segments = {});
void testSeekTo(int seek_to);
};
void TestReplay::testSeekTo(int seek_to, const std::set<int> &invalid_segments) {
seekTo(seek_to);
void TestReplay::testSeekTo(int seek_to) {
seekTo(seek_to, false);
// wait for seek finish
// wait for the seek to finish
while (true) {
std::unique_lock lk(stream_lock_);
stream_cv_.wait(lk, [=]() { return events_updated_ == true; });
events_updated_ = false;
// verify result
REQUIRE(uint64_t(route_start_ts_ + seek_to * 1e9) == cur_mono_time_);
if (cur_mono_time_ != route_start_ts_ + seek_to * 1e9) {
// wake up by the previous merging, skip it.
continue;
}
Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_);
auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan());
@ -130,26 +106,15 @@ void TestReplay::testSeekTo(int seek_to, const std::set<int> &invalid_segments)
continue;
}
INFO("seek to [" << seek_to << "s segment " << seek_to / 60 << "]");
REQUIRE(!events_->empty());
REQUIRE(is_events_ordered(*events_));
REQUIRE(eit != events_->end());
const int seek_to_segment = seek_to / 60;
const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9;
current_segment_ = event_seconds / 60;
INFO("event [" << event_seconds << "s segment " << current_segment_ << "]");
INFO("seek to [" << seek_to << "s segment " << seek_to_segment << "], events [" << event_seconds << "s segment" << current_segment_ << "]");
REQUIRE(event_seconds >= seek_to);
if (invalid_segments.find(seek_to_segment) == invalid_segments.end()) {
REQUIRE(event_seconds == seek_to); // at the same time
} else {
if (current_segment_ == seek_to_segment) {
// seek cross-boundary. e.g. seek_to 60s(segment 1), but segment 0 end at 60.021 and segemnt 1 is invalid.
REQUIRE(event_seconds == seek_to);
} else {
REQUIRE(current_segment_ > seek_to_segment);
REQUIRE(invalid_segments.find(current_segment_) == invalid_segments.end());
}
if (event_seconds > seek_to) {
auto it = segments_.lower_bound(seek_to_segment);
REQUIRE(it->first == current_segment_);
}
break;
}
@ -157,29 +122,21 @@ void TestReplay::testSeekTo(int seek_to, const std::set<int> &invalid_segments)
void TestReplay::test_seek() {
QEventLoop loop;
std::thread thread = std::thread([&]() {
const int loop_count = 100;
// random seek in one segment
for (int i = 0; i < loop_count; ++i) {
testSeekTo(random_int(0, 60));
// random seek 50 times in 3 segments
for (int i = 0; i < 50; ++i) {
testSeekTo(random_int(0, 3 * 60));
}
// random seek in 3 segments
for (int i = 0; i < loop_count; ++i) {
testSeekTo(random_int(0, 60 * 3));
// random seek 50 times in routes with invalid segments
for (int n : {5, 6, 8}) {
segments_.erase(n);
}
// random seek in invalid segments
std::set<int> invalid_segments{5, 6, 7, 9};
for (int i : invalid_segments) {
route_->segments_[i].rlog = route_->segments_[i].qlog = "";
route_->segments_[i].road_cam = route_->segments_[i].qcamera = "";
}
for (int i = 0; i < loop_count; ++i) {
testSeekTo(random_int(4 * 60, 60 * 10), invalid_segments);
for (int i =0; i < 50; ++i) {
testSeekTo(520);
testSeekTo(random_int(4 * 60, 9 * 60));
}
loop.quit();
});
loop.exec();
thread.join();
}
@ -189,4 +146,3 @@ TEST_CASE("Replay") {
REQUIRE(replay.load());
replay.test_seek();
}
*/

Loading…
Cancel
Save