diff --git a/selfdrive/ui/replay/main.cc b/selfdrive/ui/replay/main.cc index 1bb4344956..196672d9b2 100644 --- a/selfdrive/ui/replay/main.cc +++ b/selfdrive/ui/replay/main.cc @@ -46,24 +46,24 @@ void keyboardThread(Replay *replay) { try { if (r[0] == '#') { r.erase(0, 1); - replay->seekTo(std::stoi(r) * 60); + replay->seekTo(std::stoi(r) * 60, false); } else { - replay->seekTo(std::stoi(r)); + replay->seekTo(std::stoi(r), false); } } catch (std::invalid_argument) { qDebug() << "invalid argument"; } getch(); // remove \n from entering seek } else if (c == 'm') { - replay->relativeSeek(+60); + replay->seekTo(+60, true); } else if (c == 'M') { - replay->relativeSeek(-60); + replay->seekTo(-60, true); } else if (c == 's') { - replay->relativeSeek(+10); + replay->seekTo(+10, true); } else if (c == 'S') { - replay->relativeSeek(-10); + replay->seekTo(-10, true); } else if (c == 'G') { - replay->relativeSeek(0); + replay->seekTo(0, true); } else if (c == ' ') { replay->pause(!replay->isPaused()); } @@ -97,10 +97,10 @@ int main(int argc, char *argv[]){ QStringList block = parser.value("block").isEmpty() ? QStringList{} : parser.value("block").split(","); Replay *replay = new Replay(route, allow, block, nullptr, parser.isSet("dcam"), parser.isSet("ecam"), &app); - if (replay->load()) { - replay->start(parser.value("start").toInt()); + if (!replay->load()) { + return 0; } - + replay->start(parser.value("start").toInt()); // start keyboard control thread QThread *t = QThread::create(keyboardThread, replay); QObject::connect(t, &QThread::finished, t, &QThread::deleteLater); diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index d977b56752..dec204b7b3 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -3,8 +3,8 @@ #include #include +#include #include "cereal/services.h" -#include "selfdrive/camerad/cameras/camera_common.h" #include "selfdrive/common/timing.h" #include "selfdrive/hardware/hw.h" #include "selfdrive/ui/replay/util.h" @@ -27,16 +27,15 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s if (sm == nullptr) { pm = new PubMaster(s); } - route_ = std::make_unique(route); events_ = new std::vector(); - // queueSegment is always executed in the main thread + // doSeek & queueSegment are always executed in the same thread + connect(this, &Replay::seekTo, this, &Replay::doSeek); connect(this, &Replay::segmentChanged, this, &Replay::queueSegment); } Replay::~Replay() { qDebug() << "shutdown: in progress..."; - exit_ = true; updating_events_ = true; if (stream_thread_) { @@ -53,18 +52,27 @@ Replay::~Replay() { } bool Replay::load() { - if (!route_->load() || route_->size() == 0) { - qDebug() << "failed load route" << route_->name() << "from server"; + if (!route_->load()) { + qDebug() << "failed to load route" << route_->name() << "from server"; return false; } - qDebug() << "load route" << route_->name() << route_->size() << "segments"; - segments_.resize(route_->size()); + for (int i = 0; i < route_->size(); ++i) { + const SegmentFile &f = route_->at(i); + if ((!f.rlog.isEmpty() || !f.qlog.isEmpty()) && (!f.road_cam.isEmpty() || !f.qcamera.isEmpty())) { + segments_[i] = nullptr; + } + } + if (segments_.empty()) { + qDebug() << "no valid segments in route" << route_->name(); + return false; + } + qDebug() << "load route" << route_->name() << "with" << segments_.size() << "valid segments"; return true; } void Replay::start(int seconds) { - seekTo(seconds); + seekTo(seconds, false); camera_server_ = std::make_unique(); // start stream thread @@ -84,35 +92,24 @@ void Replay::updateEvents(const std::function &lambda) { stream_cv_.notify_one(); } -void Replay::seekTo(int seconds, bool relative) { - if (segments_.empty()) return; - - bool segment_loaded = false; - bool segment_changed = false; +void Replay::doSeek(int seconds, bool relative) { updateEvents([&]() { if (relative) { - seconds += ((cur_mono_time_ - route_start_ts_) * 1e-9); + seconds += currentSeconds(); } qInfo() << "seeking to" << seconds; - - cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (int)segments_.size() * 60) * 1e9; - int segment = std::min(seconds / 60, (int)segments_.size() - 1); - segment_changed = current_segment_.exchange(segment) != segment; - segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), segment) != segments_merged_.end(); - return segment_loaded; + cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (int)segments_.rbegin()->first * 60) * 1e9; + current_segment_ = std::min(seconds / 60, (int)segments_.rbegin()->first - 1); + return false; }); - - if (segment_changed || !segment_loaded) { - emit segmentChanged(); - } + queueSegment(); } void Replay::pause(bool pause) { updateEvents([=]() { qDebug() << (pause ? "paused..." : "resuming"); if (pause) { - float current_ts = (cur_mono_time_ - route_start_ts_) / 1e9; - qInfo() << "at " << current_ts << "s"; + qInfo() << "at " << currentSeconds() << "s"; } paused_ = pause; return true; @@ -127,43 +124,36 @@ void Replay::setCurrentSegment(int n) { // maintain the segment window void Replay::queueSegment() { - // fetch segments forward - int cur_seg = current_segment_.load(); - int end_idx = cur_seg; - for (int i = cur_seg, fwd = 0; i < segments_.size() && fwd <= FORWARD_SEGS; ++i) { - if (!segments_[i]) { - segments_[i] = std::make_unique(i, route_->at(i), load_dcam, load_ecam); - QObject::connect(segments_[i].get(), &Segment::loadFinished, this, &Replay::queueSegment); - } - end_idx = i; - // skip invalid segment - if (segments_[i]->isValid()) { - ++fwd; - } else if (i == cur_seg) { - ++cur_seg; + // forward fetch segments + SegmentMap::iterator begin, end; + begin = end = segments_.lower_bound(current_segment_); + for (int fwd = 0; end != segments_.end() && fwd <= FORWARD_SEGS; ++end, ++fwd) { + auto &[n, seg] = *end; + if (!seg) { + seg = std::make_unique(n, route_->at(n), load_dcam, load_ecam); + QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::queueSegment); } } - - mergeSegments(std::min(cur_seg, (int)segments_.size() - 1), end_idx); + // merge segments + mergeSegments(begin, end); + // free segments out of current semgnt window. + for (auto it = segments_.begin(); it != begin; ++it) { + it->second.reset(nullptr); + } + for (auto it = end; it != segments_.end(); ++it) { + it->second.reset(nullptr); + } } -void Replay::mergeSegments(int cur_seg, int end_idx) { +void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { // segments must be merged in sequence. std::vector segments_need_merge; - const int begin_idx = std::max(cur_seg - BACKWARD_SEGS, 0); - for (int i = begin_idx; i <= end_idx; ++i) { - if (segments_[i] && segments_[i]->isLoaded()) { - segments_need_merge.push_back(i); - } else if (i >= cur_seg && segments_[i] && segments_[i]->isValid()) { - // segment is valid,but still loading. can't skip it to merge the next one. - // otherwise the stream thread may jump to the next segment. - break; - } + for (auto it = begin; it != end && it->second->isLoaded(); ++it) { + segments_need_merge.push_back(it->first); } if (segments_need_merge != segments_merged_) { qDebug() << "merge segments" << segments_need_merge; - // merge & sort events std::vector *new_events = new std::vector(); new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0, @@ -173,7 +163,6 @@ void Replay::mergeSegments(int cur_seg, int end_idx) { auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end()); std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan()); } - // update events auto prev_events = events_; updateEvents([&]() { @@ -182,7 +171,7 @@ void Replay::mergeSegments(int cur_seg, int end_idx) { auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); if (it != new_events->end()) { route_start_ts_ = (*it)->mono_time; - // cur_mono_time_ is set by seekTo int start() before get route_start_ts_ + // cur_mono_time_ is set by seekTo in start() before get route_start_ts_ cur_mono_time_ += route_start_ts_; } } @@ -193,22 +182,16 @@ void Replay::mergeSegments(int cur_seg, int end_idx) { }); delete prev_events; } else { - updateEvents([]() { return true; }); - } - - // free segments out of current semgnt window. - for (int i = 0; i < segments_.size(); i++) { - if ((i < begin_idx || i > end_idx) && segments_[i]) { - segments_[i].reset(nullptr); - } + updateEvents([=]() { return begin->second->isLoaded(); }); } } void Replay::publishFrame(const Event *e) { auto publish = [=](CameraType cam_type, const cereal::EncodeIndex::Reader &eidx) { - auto &seg = segments_[eidx.getSegmentNum()]; - if (seg && seg->isLoaded() && seg->frames[cam_type] && eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) { - camera_server_->pushFrame(cam_type, seg->frames[cam_type].get(), eidx); + int n = eidx.getSegmentNum(); + bool segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end(); + if (segment_loaded && segments_[n]->frames[cam_type] && eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) { + camera_server_->pushFrame(cam_type, segments_[n]->frames[cam_type].get(), eidx); } }; if (e->which == cereal::Event::ROAD_ENCODE_IDX) { @@ -238,22 +221,21 @@ void Replay::stream() { continue; } - uint64_t evt_start_ts = cur_mono_time_; - uint64_t loop_start_ts = nanos_since_boot(); + const uint64_t evt_start_ts = cur_mono_time_; + const uint64_t loop_start_ts = nanos_since_boot(); for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) { const Event *evt = (*eit); cur_which = evt->which; cur_mono_time_ = evt->mono_time; + const int current_ts = currentSeconds(); + if (last_print > current_ts || (current_ts - last_print) > 5.0) { + last_print = current_ts; + qInfo() << "at " << current_ts << "s"; + } + setCurrentSegment(current_ts / 60); if (cur_which < sockets_.size() && sockets_[cur_which] != nullptr) { - int current_ts = (cur_mono_time_ - route_start_ts_) / 1e9; - if ((current_ts - last_print) > 5.0) { - last_print = current_ts; - qInfo() << "at " << current_ts << "s"; - } - setCurrentSegment(current_ts / 60); - // keep time long etime = cur_mono_time_ - evt_start_ts; long rtime = nanos_since_boot() - loop_start_ts; diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index 65f4cc5cfc..e6243bdf27 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -2,8 +2,6 @@ #include -#include -#include "cereal/visionipc/visionipc_server.h" #include "selfdrive/ui/replay/camera.h" #include "selfdrive/ui/replay/route.h" @@ -18,23 +16,25 @@ public: ~Replay(); bool load(); void start(int seconds = 0); - void seekTo(int seconds, bool relative = false); - void relativeSeek(int seconds) { seekTo(seconds, true); } void pause(bool pause); bool isPaused() const { return paused_; } signals: void segmentChanged(); + void seekTo(int seconds, bool relative); protected slots: void queueSegment(); + void doSeek(int seconds, bool relative); protected: + typedef std::map> SegmentMap; void stream(); void setCurrentSegment(int n); - void mergeSegments(int begin_idx, int end_idx); + void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end); void updateEvents(const std::function& lambda); void publishFrame(const Event *e); + inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; } QThread *stream_thread_ = nullptr; @@ -43,7 +43,7 @@ protected: std::condition_variable stream_cv_; std::atomic updating_events_ = false; std::atomic current_segment_ = -1; - std::vector> segments_; + SegmentMap segments_; // the following variables must be protected with stream_lock_ bool exit_ = false; bool paused_ = false; diff --git a/selfdrive/ui/replay/route.cc b/selfdrive/ui/replay/route.cc index 0d574d1140..f43bd47f23 100644 --- a/selfdrive/ui/replay/route.cc +++ b/selfdrive/ui/replay/route.cc @@ -77,9 +77,7 @@ Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool l // fallback to qcamera/qlog road_cam_path_ = files_.road_cam.isEmpty() ? files_.qcamera : files_.road_cam; log_path_ = files_.rlog.isEmpty() ? files_.qlog : files_.rlog; - - valid_ = !log_path_.isEmpty() && !road_cam_path_.isEmpty(); - if (!valid_) return; + assert (!log_path_.isEmpty() && !road_cam_path_.isEmpty()); if (!load_dcam) { files_.driver_cam = ""; @@ -151,7 +149,7 @@ void Segment::load() { } int success_cnt = std::accumulate(futures.begin(), futures.end(), 0, [=](int v, auto &f) { return f.get() + v; }); - loaded_ = valid_ = (success_cnt == futures.size()); + loaded_ = (success_cnt == futures.size()); emit loadFinished(); } diff --git a/selfdrive/ui/replay/route.h b/selfdrive/ui/replay/route.h index 0cce753414..2ece59c48f 100644 --- a/selfdrive/ui/replay/route.h +++ b/selfdrive/ui/replay/route.h @@ -25,17 +25,14 @@ class Route { public: Route(const QString &route); bool load(); - inline const QString &name() const { return route_; }; inline int size() const { return segments_.size(); } inline SegmentFile &at(int n) { return segments_[n]; } - // public for unit tests - std::vector segments_; - protected: bool loadFromJson(const QString &json); QString route_; + std::vector segments_; }; class Segment : public QObject { @@ -44,7 +41,6 @@ class Segment : public QObject { public: Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam); ~Segment(); - inline bool isValid() const { return valid_; }; inline bool isLoaded() const { return loaded_; } std::unique_ptr log; @@ -58,7 +54,7 @@ protected: void downloadFile(const QString &url); QString localPath(const QUrl &url); - std::atomic loaded_ = false, valid_ = false; + std::atomic loaded_ = false; std::atomic aborting_ = false; std::atomic downloading_ = 0; int seg_num_ = 0; diff --git a/selfdrive/ui/replay/tests/test_replay.cc b/selfdrive/ui/replay/tests/test_replay.cc index 91b5742067..b374239f36 100644 --- a/selfdrive/ui/replay/tests/test_replay.cc +++ b/selfdrive/ui/replay/tests/test_replay.cc @@ -1,38 +1,12 @@ #include #include #include -#include -#include -#include #include "catch2/catch.hpp" -#include "selfdrive/common/util.h" -#include "selfdrive/ui/replay/framereader.h" #include "selfdrive/ui/replay/replay.h" -#include "selfdrive/ui/replay/route.h" #include "selfdrive/ui/replay/util.h" -const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc"; - -// TEST_CASE("FrameReader") { -// SECTION("process&get") { -// FrameReader fr; -// REQUIRE(fr.load(stream_url) == true); -// REQUIRE(fr.valid() == true); -// REQUIRE(fr.getFrameCount() == 1200); -// // random get 50 frames -// // srand(time(NULL)); -// // for (int i = 0; i < 50; ++i) { -// // int idx = rand() % (fr.getFrameCount() - 1); -// // REQUIRE(fr.get(idx) != nullptr); -// // } -// // sequence get 50 frames { -// for (int i = 0; i < 50; ++i) { -// REQUIRE(fr.get(i) != nullptr); -// } -// } -// } - +const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36"; std::string sha_256(const QString &dat) { return QString(QCryptographicHash::hash(dat.toUtf8(), QCryptographicHash::Sha256).toHex()).toStdString(); } @@ -43,6 +17,7 @@ TEST_CASE("httpMultiPartDownload") { REQUIRE(fd != -1); close(fd); + const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc"; SECTION("http 200") { REQUIRE(httpMultiPartDownload(stream_url, filename, 5)); std::string content = util::read_file(filename); @@ -76,8 +51,6 @@ bool is_events_ordered(const std::vector &events) { return true; } -const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36"; - TEST_CASE("Segment") { Route demo_route(DEMO_ROUTE); REQUIRE(demo_route.load()); @@ -85,43 +58,46 @@ TEST_CASE("Segment") { QEventLoop loop; Segment segment(0, demo_route.at(0), false, false); - REQUIRE(segment.isValid() == true); - REQUIRE(segment.isLoaded() == false); QObject::connect(&segment, &Segment::loadFinished, [&]() { REQUIRE(segment.isLoaded() == true); REQUIRE(segment.log != nullptr); - REQUIRE(segment.log->events.size() > 0); - REQUIRE(is_events_ordered(segment.log->events)); REQUIRE(segment.frames[RoadCam] != nullptr); - REQUIRE(segment.frames[RoadCam]->getFrameCount() > 0); REQUIRE(segment.frames[DriverCam] == nullptr); REQUIRE(segment.frames[WideRoadCam] == nullptr); + + // LogReader & FrameReader + REQUIRE(segment.log->events.size() > 0); + REQUIRE(is_events_ordered(segment.log->events)); + // sequence get 50 frames { + REQUIRE(segment.frames[RoadCam]->getFrameCount() == 1200); + for (int i = 0; i < 50; ++i) { + REQUIRE(segment.frames[RoadCam]->get(i)); + } loop.quit(); }); loop.exec(); } -/* + // helper class for unit tests class TestReplay : public Replay { public: TestReplay(const QString &route) : Replay(route, {}, {}) {} void test_seek(); - -protected: - void testSeekTo(int seek_to, const std::set &invalid_segments = {}); + void testSeekTo(int seek_to); }; -void TestReplay::testSeekTo(int seek_to, const std::set &invalid_segments) { - seekTo(seek_to); +void TestReplay::testSeekTo(int seek_to) { + seekTo(seek_to, false); - // wait for seek finish + // wait for the seek to finish while (true) { std::unique_lock lk(stream_lock_); stream_cv_.wait(lk, [=]() { return events_updated_ == true; }); events_updated_ = false; - - // verify result - REQUIRE(uint64_t(route_start_ts_ + seek_to * 1e9) == cur_mono_time_); + if (cur_mono_time_ != route_start_ts_ + seek_to * 1e9) { + // wake up by the previous merging, skip it. + continue; + } Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_); auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); @@ -130,26 +106,15 @@ void TestReplay::testSeekTo(int seek_to, const std::set &invalid_segments) continue; } - INFO("seek to [" << seek_to << "s segment " << seek_to / 60 << "]"); - REQUIRE(!events_->empty()); REQUIRE(is_events_ordered(*events_)); - - REQUIRE(eit != events_->end()); const int seek_to_segment = seek_to / 60; const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9; current_segment_ = event_seconds / 60; - INFO("event [" << event_seconds << "s segment " << current_segment_ << "]"); + INFO("seek to [" << seek_to << "s segment " << seek_to_segment << "], events [" << event_seconds << "s segment" << current_segment_ << "]"); REQUIRE(event_seconds >= seek_to); - if (invalid_segments.find(seek_to_segment) == invalid_segments.end()) { - REQUIRE(event_seconds == seek_to); // at the same time - } else { - if (current_segment_ == seek_to_segment) { - // seek cross-boundary. e.g. seek_to 60s(segment 1), but segment 0 end at 60.021 and segemnt 1 is invalid. - REQUIRE(event_seconds == seek_to); - } else { - REQUIRE(current_segment_ > seek_to_segment); - REQUIRE(invalid_segments.find(current_segment_) == invalid_segments.end()); - } + if (event_seconds > seek_to) { + auto it = segments_.lower_bound(seek_to_segment); + REQUIRE(it->first == current_segment_); } break; } @@ -157,29 +122,21 @@ void TestReplay::testSeekTo(int seek_to, const std::set &invalid_segments) void TestReplay::test_seek() { QEventLoop loop; - std::thread thread = std::thread([&]() { - const int loop_count = 100; - // random seek in one segment - for (int i = 0; i < loop_count; ++i) { - testSeekTo(random_int(0, 60)); + // random seek 50 times in 3 segments + for (int i = 0; i < 50; ++i) { + testSeekTo(random_int(0, 3 * 60)); } - // random seek in 3 segments - for (int i = 0; i < loop_count; ++i) { - testSeekTo(random_int(0, 60 * 3)); + // random seek 50 times in routes with invalid segments + for (int n : {5, 6, 8}) { + segments_.erase(n); } - // random seek in invalid segments - std::set invalid_segments{5, 6, 7, 9}; - for (int i : invalid_segments) { - route_->segments_[i].rlog = route_->segments_[i].qlog = ""; - route_->segments_[i].road_cam = route_->segments_[i].qcamera = ""; - } - for (int i = 0; i < loop_count; ++i) { - testSeekTo(random_int(4 * 60, 60 * 10), invalid_segments); + for (int i =0; i < 50; ++i) { + testSeekTo(520); + testSeekTo(random_int(4 * 60, 9 * 60)); } loop.quit(); }); - loop.exec(); thread.join(); } @@ -189,4 +146,3 @@ TEST_CASE("Replay") { REQUIRE(replay.load()); replay.test_seek(); } -*/