diff --git a/selfdrive/ui/replay/camera.cc b/selfdrive/ui/replay/camera.cc index 91eb91add1..39c41e2bd6 100644 --- a/selfdrive/ui/replay/camera.cc +++ b/selfdrive/ui/replay/camera.cc @@ -5,7 +5,13 @@ const int YUV_BUF_COUNT = 50; -CameraServer::CameraServer() { +CameraServer::CameraServer(std::pair cameras[MAX_CAMERAS]) { + if (cameras) { + for (auto type : ALL_CAMERAS) { + std::tie(cameras_[type].width, cameras_[type].height) = cameras[type]; + } + startVipcServer(); + } camera_thread_ = std::thread(&CameraServer::thread, this); } @@ -33,7 +39,7 @@ void CameraServer::thread() { if (!fr) break; auto &cam = cameras_[type]; - // start|restart the vipc server if frame size changed + // restart vipc server if new camera incoming. if (cam.width != fr->width || cam.height != fr->height) { cam.width = fr->width; cam.height = fr->height; diff --git a/selfdrive/ui/replay/camera.h b/selfdrive/ui/replay/camera.h index 457b842140..b57654c186 100644 --- a/selfdrive/ui/replay/camera.h +++ b/selfdrive/ui/replay/camera.h @@ -8,7 +8,7 @@ class CameraServer { public: - CameraServer(); + CameraServer(std::pair cameras[MAX_CAMERAS] = nullptr); ~CameraServer(); void pushFrame(CameraType type, FrameReader* fr, const cereal::EncodeIndex::Reader& eidx); inline void waitFinish() { diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index 79805d5b5c..caa63fd5e9 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -6,6 +6,7 @@ #include #include "cereal/services.h" #include "selfdrive/common/timing.h" +#include "selfdrive/common/params.h" #include "selfdrive/hardware/hw.h" #include "selfdrive/ui/replay/util.h" @@ -15,11 +16,10 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s auto event_struct = capnp::Schema::from().asStruct(); sockets_.resize(event_struct.getUnionFields().size()); for (const auto &it : services) { - if ((allow.size() == 0 || allow.contains(it.name)) && - !block.contains(it.name)) { - s.push_back(it.name); + if ((allow.empty() || allow.contains(it.name)) && !block.contains(it.name)) { uint16_t which = event_struct.getFieldByName(it.name).getProto().getDiscriminantValue(); sockets_[which] = it.name; + s.push_back(it.name); } } qDebug() << "services " << s; @@ -29,15 +29,14 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s } route_ = std::make_unique(route, data_dir); events_ = new std::vector(); - // doSeek & queueSegment are always executed in the same thread + connect(this, &Replay::seekTo, this, &Replay::doSeek); connect(this, &Replay::segmentChanged, this, &Replay::queueSegment); } Replay::~Replay() { qDebug() << "shutdown: in progress..."; - exit_ = true; - updating_events_ = true; + exit_ = updating_events_ = true; if (stream_thread_) { stream_cv_.notify_one(); stream_thread_->quit(); @@ -72,12 +71,6 @@ bool Replay::load() { void Replay::start(int seconds) { seekTo(seconds, false); - - camera_server_ = std::make_unique(); - // start stream thread - stream_thread_ = new QThread(this); - QObject::connect(stream_thread_, &QThread::started, [=]() { stream(); }); - stream_thread_->start(); } void Replay::updateEvents(const std::function &lambda) { @@ -133,15 +126,16 @@ void Replay::segmentLoadFinished(bool success) { void Replay::queueSegment() { // get the current segment window - SegmentMap::iterator begin, end; - begin = end = segments_.lower_bound(current_segment_); + SegmentMap::iterator begin, cur, end; + begin = cur = end = segments_.lower_bound(current_segment_); for (int i = 0; i < BACKWARD_SEGS && begin != segments_.begin(); ++i) { --begin; } for (int i = 0; i <= FORWARD_SEGS && end != segments_.end(); ++i) { ++end; } - // load segments + + // load & merge segments for (auto it = begin; it != end; ++it) { auto &[n, seg] = *it; if (!seg) { @@ -150,8 +144,8 @@ void Replay::queueSegment() { qInfo() << "loading segment" << n << "..."; } } - // merge segments mergeSegments(begin, end); + // free segments out of current semgnt window. for (auto it = segments_.begin(); it != begin; ++it) { it->second.reset(nullptr); @@ -159,6 +153,11 @@ void Replay::queueSegment() { for (auto it = end; it != segments_.end(); ++it) { it->second.reset(nullptr); } + + // start stream thread + if (stream_thread_ == nullptr && cur != segments_.end() && cur->second->isLoaded()) { + startStream(cur->second.get()); + } } void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { @@ -170,7 +169,6 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: if (segments_need_merge != segments_merged_) { qDebug() << "merge segments" << segments_need_merge; - // merge & sort events std::vector *new_events = new std::vector(); new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0, [=](int v, int n) { return v + segments_[n]->log->events.size(); })); @@ -179,19 +177,9 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: auto middle = new_events->insert(new_events->end(), e.begin(), e.end()); std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan()); } - // update events + auto prev_events = events_; updateEvents([&]() { - if (route_start_ts_ == 0) { - // get route start time from initData - auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); - if (it != new_events->end()) { - route_start_ts_ = (*it)->mono_time; - // cur_mono_time_ is set by seekTo in start() before get route_start_ts_ - cur_mono_time_ += route_start_ts_; - } - } - events_ = new_events; segments_merged_ = segments_need_merge; return true; @@ -202,6 +190,38 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: } } +void Replay::startStream(const Segment *cur_segment) { + const auto &events = cur_segment->log->events; + + // get route start time from initData + auto it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); + route_start_ts_ = it != events.end() ? (*it)->mono_time : events[0]->mono_time; + cur_mono_time_ += route_start_ts_; + + // write CarParams + it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; }); + if (it != events.end()) { + auto bytes = (*it)->bytes(); + Params().put("CarParams", (const char *)bytes.begin(), bytes.size()); + } else { + qInfo() << "failed to read CarParams from current segment"; + } + + // start camera server + std::pair cameras[MAX_CAMERAS] = {}; + for (auto type : ALL_CAMERAS) { + if (auto &fr = cur_segment->frames[type]) { + cameras[type] = {fr->width, fr->height}; + } + } + camera_server_ = std::make_unique(cameras); + + // start stream thread + stream_thread_ = QThread::create(&Replay::stream, this); + QObject::connect(stream_thread_, &QThread::finished, stream_thread_, &QThread::deleteLater); + stream_thread_->start(); +} + void Replay::publishMessage(const Event *e) { if (sm == nullptr) { auto bytes = e->bytes(); @@ -221,15 +241,13 @@ void Replay::publishFrame(const Event *e) { {cereal::Event::DRIVER_ENCODE_IDX, DriverCam}, {cereal::Event::WIDE_ROAD_ENCODE_IDX, WideRoadCam}, }; - auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs(); - if (std::find(segments_merged_.begin(), segments_merged_.end(), eidx.getSegmentNum()) == segments_merged_.end()) { - // eidx's segment is not loaded + if ((e->which == cereal::Event::DRIVER_ENCODE_IDX && !load_dcam) || (e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX && !load_ecam)) { return; } - CameraType cam = cam_types.at(e->which); - auto &fr = segments_[eidx.getSegmentNum()]->frames[cam]; - if (fr && eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) { - camera_server_->pushFrame(cam, fr.get(), eidx); + auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs(); + if (eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C && isSegmentLoaded(eidx.getSegmentNum())) { + CameraType cam = cam_types.at(e->which); + camera_server_->pushFrame(cam, segments_[eidx.getSegmentNum()]->frames[cam].get(), eidx); } } @@ -295,8 +313,12 @@ void Replay::stream() { } } } - // wait for frame to be sent before unlock.(frameReader may be deleted after unlock) camera_server_->waitFinish(); + + if (eit == events_->end() && (current_segment_ == segments_.rbegin()->first) && isSegmentLoaded(current_segment_)) { + qInfo() << "reaches the end of route, restart from beginning"; + emit seekTo(0, false); + } } } diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index 55383d8270..624c62735d 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -31,6 +31,7 @@ protected slots: protected: typedef std::map> SegmentMap; + void startStream(const Segment *cur_segment); void stream(); void setCurrentSegment(int n); void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end); @@ -38,6 +39,9 @@ protected: void publishMessage(const Event *e); void publishFrame(const Event *e); inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; } + inline bool isSegmentLoaded(int n) { + return std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end(); + } QThread *stream_thread_ = nullptr; diff --git a/selfdrive/ui/replay/tests/test_replay.cc b/selfdrive/ui/replay/tests/test_replay.cc index fa4f196346..3f80d0a35f 100644 --- a/selfdrive/ui/replay/tests/test_replay.cc +++ b/selfdrive/ui/replay/tests/test_replay.cc @@ -36,20 +36,6 @@ int random_int(int min, int max) { return dist(rng); } -bool is_events_ordered(const std::vector &events) { - REQUIRE(events.size() > 0); - uint64_t prev_mono_time = 0; - cereal::Event::Which prev_which = cereal::Event::INIT_DATA; - for (auto event : events) { - if (event->mono_time < prev_mono_time || (event->mono_time == prev_mono_time && event->which < prev_which)) { - return false; - } - prev_mono_time = event->mono_time; - prev_which = event->which; - } - return true; -} - TEST_CASE("Segment") { Route demo_route(DEMO_ROUTE); REQUIRE(demo_route.load()); @@ -66,7 +52,7 @@ TEST_CASE("Segment") { // LogReader & FrameReader REQUIRE(segment.log->events.size() > 0); - REQUIRE(is_events_ordered(segment.log->events)); + REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end(), Event::lessThan())); // sequence get 50 frames { REQUIRE(segment.frames[RoadCam]->getFrameCount() == 1200); for (int i = 0; i < 50; ++i) { @@ -88,7 +74,6 @@ public: void TestReplay::testSeekTo(int seek_to) { seekTo(seek_to, false); - // wait for the seek to finish while (true) { std::unique_lock lk(stream_lock_); stream_cv_.wait(lk, [=]() { return events_updated_ == true; }); @@ -105,7 +90,7 @@ void TestReplay::testSeekTo(int seek_to) { continue; } - REQUIRE(is_events_ordered(*events_)); + REQUIRE(std::is_sorted(events_->begin(), events_->end(), Event::lessThan())); const int seek_to_segment = seek_to / 60; const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9; current_segment_ = event_seconds / 60; @@ -120,13 +105,14 @@ void TestReplay::testSeekTo(int seek_to) { } void TestReplay::test_seek() { + // create a dummy stream thread + stream_thread_ = new QThread(this); QEventLoop loop; std::thread thread = std::thread([&]() { - // random seek 50 times in 3 segments for (int i = 0; i < 50; ++i) { testSeekTo(random_int(0, 3 * 60)); } - // random seek 50 times in routes with invalid segments + // remove 3 segments for (int n : {5, 6, 8}) { segments_.erase(n); }