c++ replay: more robust seek (#22375)

* skip invalid segment

* test seek

* fix wrong cur_mono_time when route_start_ts is 0

* don't notify stream thread if current segment not merged

* continue merge next segment if current is invalid

* cleanup seekTo

* continue

* fix seek cross-boundary

* new demo route has 11 segments

* continue

* fix cross-boundary

* cleanup & better test cases

cleanup

remoe &

typo

update comment

* update test

* reserve space for new_events

* parallel sorting

* usleep(0)

* Revert "parallel sorting"

This reverts commit d7d1b42f49944aef0b6ca2962e45a7f5318a1152.
old-commit-hash: fa8ddd992d
commatwo_master
Dean Lee 4 years ago committed by GitHub
parent 5aa9a8e217
commit 430e9808b3
  1. 3
      selfdrive/ui/replay/main.cc
  2. 59
      selfdrive/ui/replay/replay.cc
  3. 9
      selfdrive/ui/replay/replay.h
  4. 5
      selfdrive/ui/replay/route.cc
  5. 4
      selfdrive/ui/replay/route.h
  6. 140
      selfdrive/ui/replay/tests/test_replay.cc
  7. 10
      selfdrive/ui/replay/tests/test_runner.cc

@ -85,8 +85,11 @@ int main(int argc, char *argv[]){
const QString route = args.empty() ? DEMO_ROUTE : args.first();
QStringList allow = parser.value("allow").isEmpty() ? QStringList{} : parser.value("allow").split(",");
QStringList block = parser.value("block").isEmpty() ? QStringList{} : parser.value("block").split(",");
Replay *replay = new Replay(route, allow, block, nullptr, parser.isSet("dcam"), parser.isSet("ecam"));
if (replay->load()) {
replay->start(parser.value("start").toInt());
}
// start keyboard control thread
QThread *t = QThread::create(keyboardThread, replay);

@ -2,6 +2,7 @@
#include <QApplication>
#include <QDebug>
#include "cereal/services.h"
#include "selfdrive/camerad/cameras/camera_common.h"
#include "selfdrive/common/timing.h"
@ -19,7 +20,7 @@ inline void precise_nano_sleep(long sleep_ns) {
}
// spin wait
if (sleep_ns > 0) {
while ((nanos_since_boot() - start_sleep) <= sleep_ns) {/**/}
while ((nanos_since_boot() - start_sleep) <= sleep_ns) { usleep(0); }
}
}
@ -49,17 +50,19 @@ Replay::~Replay() {
// TODO: quit stream thread and free resources.
}
void Replay::start(int seconds){
// load route
bool Replay::load() {
if (!route_->load() || route_->size() == 0) {
qDebug() << "failed load route" << route_->name() << "from server";
return;
return false;
}
qDebug() << "load route" << route_->name() << route_->size() << "segments, start from" << seconds;
qDebug() << "load route" << route_->name() << route_->size() << "segments";
segments_.resize(route_->size());
seekTo(seconds);
return true;
}
void Replay::start(int seconds) {
seekTo(seconds);
// start stream thread
thread = new QThread;
QObject::connect(thread, &QThread::started, [=]() { stream(); });
@ -70,7 +73,7 @@ void Replay::updateEvents(const std::function<bool()>& lambda) {
// set updating_events to true to force stream thread relase the lock and wait for evnets_udpated.
updating_events_ = true;
{
std::unique_lock lk(lock_);
std::unique_lock lk(stream_lock_);
events_updated_ = lambda();
updating_events_ = false;
}
@ -80,22 +83,24 @@ void Replay::updateEvents(const std::function<bool()>& lambda) {
void Replay::seekTo(int seconds, bool relative) {
if (segments_.empty()) return;
bool segment_loaded = false;
updateEvents([&]() {
if (relative) {
seconds += ((cur_mono_time_ - route_start_ts_) * 1e-9);
}
seconds = std::clamp(seconds, 0, (int)segments_.size() * 60 - 1);
qInfo() << "seeking to" << seconds;
int segment = seconds / 60;
bool segment_changed = (segment != current_segment_);
cur_mono_time_ = route_start_ts_ + seconds * 1e9;
setCurrentSegment(segment);
bool segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), segment) != segments_merged_.end();
// return false if segment changed and not loaded yet
return !segment_changed || segment_loaded;
cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (int)segments_.size() * 60) * 1e9;
current_segment_ = std::min(seconds / 60, (int)segments_.size() - 1);
segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), current_segment_) != segments_merged_.end();
return segment_loaded;
});
if (!segment_loaded) {
// always emit segmentChanged if segment is not loaded.
// the current_segment_ may not valid when seeking cross boundary or seeking to an invalid segment.
emit segmentChanged();
}
}
void Replay::pause(bool pause) {
@ -108,7 +113,7 @@ void Replay::pause(bool pause) {
void Replay::setCurrentSegment(int n) {
if (current_segment_.exchange(n) != n) {
emit segmentChanged(n);
emit segmentChanged();
}
}
@ -124,11 +129,14 @@ void Replay::queueSegment() {
}
end_idx = i;
// skip invalid segment
fwd += segments_[i]->isValid();
if (segments_[i]->isValid()) {
++fwd;
} else if (i == cur_seg) {
++cur_seg;
}
}
// merge segments
mergeSegments(cur_seg, end_idx);
mergeSegments(std::min(cur_seg, (int)segments_.size() - 1), end_idx);
}
void Replay::mergeSegments(int cur_seg, int end_idx) {
@ -138,7 +146,7 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
for (int i = begin_idx; i <= end_idx; ++i) {
if (segments_[i] && segments_[i]->isLoaded()) {
segments_need_merge.push_back(i);
} else if (i >= cur_seg) {
} else if (i >= cur_seg && segments_[i] && segments_[i]->isValid()) {
// segment is valid,but still loading. can't skip it to merge the next one.
// otherwise the stream thread may jump to the next segment.
break;
@ -150,6 +158,8 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
// merge & sort events
std::vector<Event *> *new_events = new std::vector<Event *>();
new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0,
[=](int v, int n) { return v + segments_[n]->log->events.size(); }));
for (int n : segments_need_merge) {
auto &log = segments_[n]->log;
auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end());
@ -164,7 +174,8 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; });
if (it != new_events->end()) {
route_start_ts_ = (*it)->mono_time;
cur_mono_time_ = route_start_ts_;
// cur_mono_time_ is set by seekTo int start() before get route_start_ts_
cur_mono_time_ += route_start_ts_;
}
}
@ -173,6 +184,8 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
return true;
});
delete prev_events;
} else {
updateEvents([]() { return true; });
}
// free segments out of current semgnt window.
@ -187,7 +200,7 @@ void Replay::stream() {
float last_print = 0;
cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA;
std::unique_lock lk(lock_);
std::unique_lock lk(stream_lock_);
while (true) {
stream_cv_.wait(lk, [=]() { return exit_ || (events_updated_ && !paused_); });

@ -16,7 +16,7 @@ class Replay : public QObject {
public:
Replay(QString route, QStringList allow, QStringList block, SubMaster *sm = nullptr, bool dcam = false, bool ecam = false, QObject *parent = 0);
~Replay();
bool load();
void start(int seconds = 0);
void seekTo(int seconds, bool relative = false);
void relativeSeek(int seconds) { seekTo(seconds, true); }
@ -24,7 +24,7 @@ public:
bool isPaused() const { return paused_; }
signals:
void segmentChanged(int);
void segmentChanged();
protected slots:
void queueSegment();
@ -38,17 +38,18 @@ protected:
QThread *thread;
// logs
std::mutex lock_;
std::mutex stream_lock_;
std::condition_variable stream_cv_;
std::atomic<bool> updating_events_ = false;
std::atomic<int> current_segment_ = -1;
std::vector<std::unique_ptr<Segment>> segments_;
// the following variables must be protected with stream_lock_
bool exit_ = false;
bool paused_ = false;
bool events_updated_ = false;
uint64_t route_start_ts_ = 0;
uint64_t cur_mono_time_ = 0;
std::vector<Event *> *events_ = nullptr;
std::vector<std::unique_ptr<Segment>> segments_;
std::vector<int> segments_merged_;
// messaging

@ -119,10 +119,7 @@ Route::Route(const QString &route) : route_(route) {}
bool Route::load() {
QEventLoop loop;
auto onError = [&loop](const QString &err) {
qInfo() << err;
loop.quit();
};
auto onError = [&loop](const QString &err) { loop.quit(); };
bool ret = false;
HttpRequest http(nullptr, !Hardware::PC());

@ -30,10 +30,12 @@ public:
inline int size() const { return segments_.size(); }
inline SegmentFile &at(int n) { return segments_[n]; }
// public for unit tests
std::vector<SegmentFile> segments_;
protected:
bool loadFromJson(const QString &json);
QString route_;
std::vector<SegmentFile> segments_;
};
class Segment : public QObject {

@ -1,11 +1,12 @@
#include <QCryptographicHash>
#include <QDebug>
#include <QEventLoop>
#include <QString>
#include <future>
#include "catch2/catch.hpp"
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/framereader.h"
#include "selfdrive/ui/replay/route.h"
#include "selfdrive/ui/replay/replay.h"
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc";
@ -49,3 +50,138 @@ TEST_CASE("httpMultiPartDownload") {
REQUIRE(httpMultiPartDownload(util::string_format("%s_abc", stream_url), filename, 5) == false);
}
}
int random_int(int min, int max) {
std::random_device dev;
std::mt19937 rng(dev());
std::uniform_int_distribution<std::mt19937::result_type> dist(min, max);
return dist(rng);
}
bool is_events_ordered(const std::vector<Event *> &events) {
REQUIRE(events.size() > 0);
uint64_t prev_mono_time = 0;
cereal::Event::Which prev_which = cereal::Event::INIT_DATA;
for (auto event : events) {
if (event->mono_time < prev_mono_time || (event->mono_time == prev_mono_time && event->which < prev_which)) {
return false;
}
prev_mono_time = event->mono_time;
prev_which = event->which;
}
return true;
}
const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36";
TEST_CASE("Segment") {
Route demo_route(DEMO_ROUTE);
REQUIRE(demo_route.load());
REQUIRE(demo_route.size() == 11);
QEventLoop loop;
Segment segment(0, demo_route.at(0), false, false);
REQUIRE(segment.isValid() == true);
REQUIRE(segment.isLoaded() == false);
QObject::connect(&segment, &Segment::loadFinished, [&]() {
REQUIRE(segment.isLoaded() == true);
REQUIRE(segment.log != nullptr);
REQUIRE(segment.log->events.size() > 0);
REQUIRE(is_events_ordered(segment.log->events));
REQUIRE(segment.frames[RoadCam] != nullptr);
REQUIRE(segment.frames[RoadCam]->getFrameCount() > 0);
REQUIRE(segment.frames[DriverCam] == nullptr);
REQUIRE(segment.frames[WideRoadCam] == nullptr);
loop.quit();
});
loop.exec();
}
// helper class for unit tests
class TestReplay : public Replay {
public:
TestReplay(const QString &route) : Replay(route, {}, {}) {}
void test_seek();
protected:
void testSeekTo(int seek_to, const std::set<int> &invalid_segments = {});
};
void TestReplay::testSeekTo(int seek_to, const std::set<int> &invalid_segments) {
seekTo(seek_to);
// wait for seek finish
while (true) {
std::unique_lock lk(stream_lock_);
stream_cv_.wait(lk, [=]() { return events_updated_ == true; });
events_updated_ = false;
// verify result
REQUIRE(uint64_t(route_start_ts_ + seek_to * 1e9) == cur_mono_time_);
Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_);
auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan());
if (eit == events_->end()) {
qDebug() << "waiting for events...";
continue;
}
INFO("seek to [" << seek_to << "s segment " << seek_to / 60 << "]");
REQUIRE(!events_->empty());
REQUIRE(is_events_ordered(*events_));
REQUIRE(eit != events_->end());
const int seek_to_segment = seek_to / 60;
const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9;
current_segment_ = event_seconds / 60;
INFO("event [" << event_seconds << "s segment " << current_segment_ << "]");
REQUIRE(event_seconds >= seek_to);
if (invalid_segments.find(seek_to_segment) == invalid_segments.end()) {
REQUIRE(event_seconds == seek_to); // at the same time
} else {
if (current_segment_ == seek_to_segment) {
// seek cross-boundary. e.g. seek_to 60s(segment 1), but segment 0 end at 60.021 and segemnt 1 is invalid.
REQUIRE(event_seconds == seek_to);
} else {
REQUIRE(current_segment_ > seek_to_segment);
REQUIRE(invalid_segments.find(current_segment_) == invalid_segments.end());
}
}
break;
}
}
void TestReplay::test_seek() {
QEventLoop loop;
std::thread thread = std::thread([&]() {
const int loop_count = 100;
// random seek in one segment
for (int i = 0; i < loop_count; ++i) {
testSeekTo(random_int(0, 60));
}
// random seek in 3 segments
for (int i = 0; i < loop_count; ++i) {
testSeekTo(random_int(0, 60 * 3));
}
// random seek in invalid segments
std::set<int> invalid_segments{5, 6, 7, 9};
for (int i : invalid_segments) {
route_->segments_[i].rlog = route_->segments_[i].qlog = "";
route_->segments_[i].road_cam = route_->segments_[i].qcamera = "";
}
for (int i = 0; i < loop_count; ++i) {
testSeekTo(random_int(4 * 60, 60 * 10), invalid_segments);
}
loop.quit();
});
loop.exec();
thread.join();
}
TEST_CASE("Replay") {
TestReplay replay(DEMO_ROUTE);
REQUIRE(replay.load());
replay.test_seek();
}

@ -1,2 +1,10 @@
#define CATCH_CONFIG_MAIN
#define CATCH_CONFIG_RUNNER
#include "catch2/catch.hpp"
#include <QCoreApplication>
int main(int argc, char **argv) {
// unit tests for Qt
QCoreApplication app(argc, argv);
const int res = Catch::Session().run(argc, argv);
return (res < 0xff ? res : 0xff);
}

Loading…
Cancel
Save