replay: start streaming after segment loaded (#22575)

* start streaming  after segment loaded

dd

* loop from beginning if reaches the end

* isSegmentLoaded

* one loop

* move to ctor

* delete stream_thread_ on exit

* pause streaming while testing seek

* Revert "one loop"

This reverts commit f029cd118f7ac876dee1dbf2b91478403211ad47.

* test:dummy stream thread

* cleanup

* start thread after vipcserver
pull/22598/head
Dean Lee 4 years ago committed by GitHub
parent c73d9ddaa6
commit f6de10b55a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      selfdrive/ui/replay/camera.cc
  2. 2
      selfdrive/ui/replay/camera.h
  3. 92
      selfdrive/ui/replay/replay.cc
  4. 4
      selfdrive/ui/replay/replay.h
  5. 24
      selfdrive/ui/replay/tests/test_replay.cc

@ -5,7 +5,13 @@
const int YUV_BUF_COUNT = 50; const int YUV_BUF_COUNT = 50;
CameraServer::CameraServer() { CameraServer::CameraServer(std::pair<int, int> cameras[MAX_CAMERAS]) {
if (cameras) {
for (auto type : ALL_CAMERAS) {
std::tie(cameras_[type].width, cameras_[type].height) = cameras[type];
}
startVipcServer();
}
camera_thread_ = std::thread(&CameraServer::thread, this); camera_thread_ = std::thread(&CameraServer::thread, this);
} }
@ -33,7 +39,7 @@ void CameraServer::thread() {
if (!fr) break; if (!fr) break;
auto &cam = cameras_[type]; auto &cam = cameras_[type];
// start|restart the vipc server if frame size changed // restart vipc server if new camera incoming.
if (cam.width != fr->width || cam.height != fr->height) { if (cam.width != fr->width || cam.height != fr->height) {
cam.width = fr->width; cam.width = fr->width;
cam.height = fr->height; cam.height = fr->height;

@ -8,7 +8,7 @@
class CameraServer { class CameraServer {
public: public:
CameraServer(); CameraServer(std::pair<int, int> cameras[MAX_CAMERAS] = nullptr);
~CameraServer(); ~CameraServer();
void pushFrame(CameraType type, FrameReader* fr, const cereal::EncodeIndex::Reader& eidx); void pushFrame(CameraType type, FrameReader* fr, const cereal::EncodeIndex::Reader& eidx);
inline void waitFinish() { inline void waitFinish() {

@ -6,6 +6,7 @@
#include <capnp/dynamic.h> #include <capnp/dynamic.h>
#include "cereal/services.h" #include "cereal/services.h"
#include "selfdrive/common/timing.h" #include "selfdrive/common/timing.h"
#include "selfdrive/common/params.h"
#include "selfdrive/hardware/hw.h" #include "selfdrive/hardware/hw.h"
#include "selfdrive/ui/replay/util.h" #include "selfdrive/ui/replay/util.h"
@ -15,11 +16,10 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
auto event_struct = capnp::Schema::from<cereal::Event>().asStruct(); auto event_struct = capnp::Schema::from<cereal::Event>().asStruct();
sockets_.resize(event_struct.getUnionFields().size()); sockets_.resize(event_struct.getUnionFields().size());
for (const auto &it : services) { for (const auto &it : services) {
if ((allow.size() == 0 || allow.contains(it.name)) && if ((allow.empty() || allow.contains(it.name)) && !block.contains(it.name)) {
!block.contains(it.name)) {
s.push_back(it.name);
uint16_t which = event_struct.getFieldByName(it.name).getProto().getDiscriminantValue(); uint16_t which = event_struct.getFieldByName(it.name).getProto().getDiscriminantValue();
sockets_[which] = it.name; sockets_[which] = it.name;
s.push_back(it.name);
} }
} }
qDebug() << "services " << s; qDebug() << "services " << s;
@ -29,15 +29,14 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
} }
route_ = std::make_unique<Route>(route, data_dir); route_ = std::make_unique<Route>(route, data_dir);
events_ = new std::vector<Event *>(); events_ = new std::vector<Event *>();
// doSeek & queueSegment are always executed in the same thread
connect(this, &Replay::seekTo, this, &Replay::doSeek); connect(this, &Replay::seekTo, this, &Replay::doSeek);
connect(this, &Replay::segmentChanged, this, &Replay::queueSegment); connect(this, &Replay::segmentChanged, this, &Replay::queueSegment);
} }
Replay::~Replay() { Replay::~Replay() {
qDebug() << "shutdown: in progress..."; qDebug() << "shutdown: in progress...";
exit_ = true; exit_ = updating_events_ = true;
updating_events_ = true;
if (stream_thread_) { if (stream_thread_) {
stream_cv_.notify_one(); stream_cv_.notify_one();
stream_thread_->quit(); stream_thread_->quit();
@ -72,12 +71,6 @@ bool Replay::load() {
void Replay::start(int seconds) { void Replay::start(int seconds) {
seekTo(seconds, false); seekTo(seconds, false);
camera_server_ = std::make_unique<CameraServer>();
// start stream thread
stream_thread_ = new QThread(this);
QObject::connect(stream_thread_, &QThread::started, [=]() { stream(); });
stream_thread_->start();
} }
void Replay::updateEvents(const std::function<bool()> &lambda) { void Replay::updateEvents(const std::function<bool()> &lambda) {
@ -133,15 +126,16 @@ void Replay::segmentLoadFinished(bool success) {
void Replay::queueSegment() { void Replay::queueSegment() {
// get the current segment window // get the current segment window
SegmentMap::iterator begin, end; SegmentMap::iterator begin, cur, end;
begin = end = segments_.lower_bound(current_segment_); begin = cur = end = segments_.lower_bound(current_segment_);
for (int i = 0; i < BACKWARD_SEGS && begin != segments_.begin(); ++i) { for (int i = 0; i < BACKWARD_SEGS && begin != segments_.begin(); ++i) {
--begin; --begin;
} }
for (int i = 0; i <= FORWARD_SEGS && end != segments_.end(); ++i) { for (int i = 0; i <= FORWARD_SEGS && end != segments_.end(); ++i) {
++end; ++end;
} }
// load segments
// load & merge segments
for (auto it = begin; it != end; ++it) { for (auto it = begin; it != end; ++it) {
auto &[n, seg] = *it; auto &[n, seg] = *it;
if (!seg) { if (!seg) {
@ -150,8 +144,8 @@ void Replay::queueSegment() {
qInfo() << "loading segment" << n << "..."; qInfo() << "loading segment" << n << "...";
} }
} }
// merge segments
mergeSegments(begin, end); mergeSegments(begin, end);
// free segments out of current semgnt window. // free segments out of current semgnt window.
for (auto it = segments_.begin(); it != begin; ++it) { for (auto it = segments_.begin(); it != begin; ++it) {
it->second.reset(nullptr); it->second.reset(nullptr);
@ -159,6 +153,11 @@ void Replay::queueSegment() {
for (auto it = end; it != segments_.end(); ++it) { for (auto it = end; it != segments_.end(); ++it) {
it->second.reset(nullptr); it->second.reset(nullptr);
} }
// start stream thread
if (stream_thread_ == nullptr && cur != segments_.end() && cur->second->isLoaded()) {
startStream(cur->second.get());
}
} }
void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
@ -170,7 +169,6 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
if (segments_need_merge != segments_merged_) { if (segments_need_merge != segments_merged_) {
qDebug() << "merge segments" << segments_need_merge; qDebug() << "merge segments" << segments_need_merge;
// merge & sort events
std::vector<Event *> *new_events = new std::vector<Event *>(); std::vector<Event *> *new_events = new std::vector<Event *>();
new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0, new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0,
[=](int v, int n) { return v + segments_[n]->log->events.size(); })); [=](int v, int n) { return v + segments_[n]->log->events.size(); }));
@ -179,19 +177,9 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
auto middle = new_events->insert(new_events->end(), e.begin(), e.end()); auto middle = new_events->insert(new_events->end(), e.begin(), e.end());
std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan()); std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan());
} }
// update events
auto prev_events = events_; auto prev_events = events_;
updateEvents([&]() { updateEvents([&]() {
if (route_start_ts_ == 0) {
// get route start time from initData
auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; });
if (it != new_events->end()) {
route_start_ts_ = (*it)->mono_time;
// cur_mono_time_ is set by seekTo in start() before get route_start_ts_
cur_mono_time_ += route_start_ts_;
}
}
events_ = new_events; events_ = new_events;
segments_merged_ = segments_need_merge; segments_merged_ = segments_need_merge;
return true; return true;
@ -202,6 +190,38 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
} }
} }
void Replay::startStream(const Segment *cur_segment) {
const auto &events = cur_segment->log->events;
// get route start time from initData
auto it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::INIT_DATA; });
route_start_ts_ = it != events.end() ? (*it)->mono_time : events[0]->mono_time;
cur_mono_time_ += route_start_ts_;
// write CarParams
it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; });
if (it != events.end()) {
auto bytes = (*it)->bytes();
Params().put("CarParams", (const char *)bytes.begin(), bytes.size());
} else {
qInfo() << "failed to read CarParams from current segment";
}
// start camera server
std::pair<int, int> cameras[MAX_CAMERAS] = {};
for (auto type : ALL_CAMERAS) {
if (auto &fr = cur_segment->frames[type]) {
cameras[type] = {fr->width, fr->height};
}
}
camera_server_ = std::make_unique<CameraServer>(cameras);
// start stream thread
stream_thread_ = QThread::create(&Replay::stream, this);
QObject::connect(stream_thread_, &QThread::finished, stream_thread_, &QThread::deleteLater);
stream_thread_->start();
}
void Replay::publishMessage(const Event *e) { void Replay::publishMessage(const Event *e) {
if (sm == nullptr) { if (sm == nullptr) {
auto bytes = e->bytes(); auto bytes = e->bytes();
@ -221,15 +241,13 @@ void Replay::publishFrame(const Event *e) {
{cereal::Event::DRIVER_ENCODE_IDX, DriverCam}, {cereal::Event::DRIVER_ENCODE_IDX, DriverCam},
{cereal::Event::WIDE_ROAD_ENCODE_IDX, WideRoadCam}, {cereal::Event::WIDE_ROAD_ENCODE_IDX, WideRoadCam},
}; };
auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs<cereal::EncodeIndex>(); if ((e->which == cereal::Event::DRIVER_ENCODE_IDX && !load_dcam) || (e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX && !load_ecam)) {
if (std::find(segments_merged_.begin(), segments_merged_.end(), eidx.getSegmentNum()) == segments_merged_.end()) {
// eidx's segment is not loaded
return; return;
} }
auto eidx = capnp::AnyStruct::Reader(e->event).getPointerSection()[0].getAs<cereal::EncodeIndex>();
if (eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C && isSegmentLoaded(eidx.getSegmentNum())) {
CameraType cam = cam_types.at(e->which); CameraType cam = cam_types.at(e->which);
auto &fr = segments_[eidx.getSegmentNum()]->frames[cam]; camera_server_->pushFrame(cam, segments_[eidx.getSegmentNum()]->frames[cam].get(), eidx);
if (fr && eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) {
camera_server_->pushFrame(cam, fr.get(), eidx);
} }
} }
@ -295,8 +313,12 @@ void Replay::stream() {
} }
} }
} }
// wait for frame to be sent before unlock.(frameReader may be deleted after unlock) // wait for frame to be sent before unlock.(frameReader may be deleted after unlock)
camera_server_->waitFinish(); camera_server_->waitFinish();
if (eit == events_->end() && (current_segment_ == segments_.rbegin()->first) && isSegmentLoaded(current_segment_)) {
qInfo() << "reaches the end of route, restart from beginning";
emit seekTo(0, false);
}
} }
} }

@ -31,6 +31,7 @@ protected slots:
protected: protected:
typedef std::map<int, std::unique_ptr<Segment>> SegmentMap; typedef std::map<int, std::unique_ptr<Segment>> SegmentMap;
void startStream(const Segment *cur_segment);
void stream(); void stream();
void setCurrentSegment(int n); void setCurrentSegment(int n);
void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end); void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end);
@ -38,6 +39,9 @@ protected:
void publishMessage(const Event *e); void publishMessage(const Event *e);
void publishFrame(const Event *e); void publishFrame(const Event *e);
inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; } inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; }
inline bool isSegmentLoaded(int n) {
return std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end();
}
QThread *stream_thread_ = nullptr; QThread *stream_thread_ = nullptr;

@ -36,20 +36,6 @@ int random_int(int min, int max) {
return dist(rng); return dist(rng);
} }
bool is_events_ordered(const std::vector<Event *> &events) {
REQUIRE(events.size() > 0);
uint64_t prev_mono_time = 0;
cereal::Event::Which prev_which = cereal::Event::INIT_DATA;
for (auto event : events) {
if (event->mono_time < prev_mono_time || (event->mono_time == prev_mono_time && event->which < prev_which)) {
return false;
}
prev_mono_time = event->mono_time;
prev_which = event->which;
}
return true;
}
TEST_CASE("Segment") { TEST_CASE("Segment") {
Route demo_route(DEMO_ROUTE); Route demo_route(DEMO_ROUTE);
REQUIRE(demo_route.load()); REQUIRE(demo_route.load());
@ -66,7 +52,7 @@ TEST_CASE("Segment") {
// LogReader & FrameReader // LogReader & FrameReader
REQUIRE(segment.log->events.size() > 0); REQUIRE(segment.log->events.size() > 0);
REQUIRE(is_events_ordered(segment.log->events)); REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end(), Event::lessThan()));
// sequence get 50 frames { // sequence get 50 frames {
REQUIRE(segment.frames[RoadCam]->getFrameCount() == 1200); REQUIRE(segment.frames[RoadCam]->getFrameCount() == 1200);
for (int i = 0; i < 50; ++i) { for (int i = 0; i < 50; ++i) {
@ -88,7 +74,6 @@ public:
void TestReplay::testSeekTo(int seek_to) { void TestReplay::testSeekTo(int seek_to) {
seekTo(seek_to, false); seekTo(seek_to, false);
// wait for the seek to finish
while (true) { while (true) {
std::unique_lock lk(stream_lock_); std::unique_lock lk(stream_lock_);
stream_cv_.wait(lk, [=]() { return events_updated_ == true; }); stream_cv_.wait(lk, [=]() { return events_updated_ == true; });
@ -105,7 +90,7 @@ void TestReplay::testSeekTo(int seek_to) {
continue; continue;
} }
REQUIRE(is_events_ordered(*events_)); REQUIRE(std::is_sorted(events_->begin(), events_->end(), Event::lessThan()));
const int seek_to_segment = seek_to / 60; const int seek_to_segment = seek_to / 60;
const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9; const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9;
current_segment_ = event_seconds / 60; current_segment_ = event_seconds / 60;
@ -120,13 +105,14 @@ void TestReplay::testSeekTo(int seek_to) {
} }
void TestReplay::test_seek() { void TestReplay::test_seek() {
// create a dummy stream thread
stream_thread_ = new QThread(this);
QEventLoop loop; QEventLoop loop;
std::thread thread = std::thread([&]() { std::thread thread = std::thread([&]() {
// random seek 50 times in 3 segments
for (int i = 0; i < 50; ++i) { for (int i = 0; i < 50; ++i) {
testSeekTo(random_int(0, 3 * 60)); testSeekTo(random_int(0, 3 * 60));
} }
// random seek 50 times in routes with invalid segments // remove 3 segments
for (int n : {5, 6, 8}) { for (int n : {5, 6, 8}) {
segments_.erase(n); segments_.erase(n);
} }

Loading…
Cancel
Save