c++ replay: improve seeking&updating events (#22319)

* improve seeking

* qDebug

* cleanup

* wait on cv

robust

* Trailing underscores for member variables

* group log variables

* small cleanup,remove unnecess std::atomic

* remove debug output

* fix seek problem

* add comment

* better lambda

* faster seek.don't block stream thread if segment already loaded

* remove assert
pull/22195/head^2
Dean Lee 4 years ago committed by GitHub
parent 4e6ff308a8
commit a548d4b5b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 158
      selfdrive/ui/replay/replay.cc
  2. 33
      selfdrive/ui/replay/replay.h

@ -24,7 +24,7 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
}
route_ = std::make_unique<Route>(route);
events = new std::vector<Event *>();
events_ = new std::vector<Event *>();
// queueSegment is always executed in the main thread
connect(this, &Replay::segmentChanged, this, &Replay::queueSegment);
}
@ -41,7 +41,7 @@ void Replay::start(int seconds){
}
qDebug() << "load route" << route_->name() << route_->size() << "segments, start from" << seconds;
segments.resize(route_->size());
segments_.resize(route_->size());
seekTo(seconds);
// start stream thread
@ -50,53 +50,65 @@ void Replay::start(int seconds){
thread->start();
}
void Replay::seekTo(int seconds) {
if (segments.empty()) return;
void Replay::updateEvents(const std::function<bool()>& lambda) {
// set updating_events to true to force stream thread relase the lock and wait for evnets_udpated.
updating_events_ = true;
{
std::unique_lock lk(lock_);
events_updated_ = lambda();
updating_events_ = false;
}
stream_cv_.notify_one();
}
updating_events = true;
void Replay::seekTo(int seconds, bool relative) {
if (segments_.empty()) return;
std::unique_lock lk(lock);
seconds = std::clamp(seconds, 0, (int)segments.size() * 60);
updateEvents([&]() {
if (relative) {
seconds += ((cur_mono_time_ - route_start_ts_) * 1e-9);
}
seconds = std::clamp(seconds, 0, (int)segments_.size() * 60 - 1);
qInfo() << "seeking to " << seconds;
seek_ts = seconds;
setCurrentSegment(std::clamp(seconds / 60, 0, (int)segments.size() - 1));
updating_events = false;
}
void Replay::relativeSeek(int seconds) {
seekTo(current_ts + seconds);
int segment = seconds / 60;
bool segment_changed = (segment != current_segment_);
cur_mono_time_ = route_start_ts_ + seconds * 1e9;
setCurrentSegment(segment);
bool segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), segment) != segments_merged_.end();
// return false if segment changed and not loaded yet
return !segment_changed || segment_loaded;
});
}
void Replay::pause(bool pause) {
updating_events = true;
std::unique_lock lk(lock);
updateEvents([=]() {
qDebug() << (pause ? "paused..." : "resuming");
paused_ = pause;
updating_events = false;
stream_cv_.notify_one();
return true;
});
}
void Replay::setCurrentSegment(int n) {
if (current_segment.exchange(n) != n) {
if (current_segment_.exchange(n) != n) {
emit segmentChanged(n);
}
}
// maintain the segment window
void Replay::queueSegment() {
assert(QThread::currentThreadId() == qApp->thread()->currentThreadId());
// fetch segments forward
int cur_seg = current_segment.load();
int cur_seg = current_segment_.load();
int end_idx = cur_seg;
for (int i = cur_seg, fwd = 0; i < segments.size() && fwd <= FORWARD_SEGS; ++i) {
if (!segments[i]) {
segments[i] = std::make_unique<Segment>(i, route_->at(i), load_dcam, load_ecam);
QObject::connect(segments[i].get(), &Segment::loadFinished, this, &Replay::queueSegment);
for (int i = cur_seg, fwd = 0; i < segments_.size() && fwd <= FORWARD_SEGS; ++i) {
if (!segments_[i]) {
segments_[i] = std::make_unique<Segment>(i, route_->at(i), load_dcam, load_ecam);
QObject::connect(segments_[i].get(), &Segment::loadFinished, this, &Replay::queueSegment);
}
end_idx = i;
// skip invalid segment
fwd += segments[i]->isValid();
fwd += segments_[i]->isValid();
}
// merge segments
@ -108,7 +120,7 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
std::vector<int> segments_need_merge;
const int begin_idx = std::max(cur_seg - BACKWARD_SEGS, 0);
for (int i = begin_idx; i <= end_idx; ++i) {
if (segments[i] && segments[i]->isLoaded()) {
if (segments_[i] && segments_[i]->isLoaded()) {
segments_need_merge.push_back(i);
} else if (i >= cur_seg) {
// segment is valid,but still loading. can't skip it to merge the next one.
@ -117,15 +129,14 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
}
}
if (segments_need_merge != segments_merged) {
if (segments_need_merge != segments_merged_) {
qDebug() << "merge segments" << segments_need_merge;
segments_merged = segments_need_merge;
// merge & sort events
std::vector<Event *> *new_events = new std::vector<Event *>();
std::unordered_map<uint32_t, EncodeIdx> *new_eidx = new std::unordered_map<uint32_t, EncodeIdx>[MAX_CAMERAS];
for (int n : segments_need_merge) {
auto &log = segments[n]->log;
// merge & sort events
auto &log = segments_[n]->log;
auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end());
std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan());
for (CameraType cam_type : ALL_CAMERAS) {
@ -133,66 +144,62 @@ void Replay::mergeSegments(int cur_seg, int end_idx) {
}
}
// update logs
// set updating_events to true to force stream thread relase the lock
updating_events = true;
lock.lock();
if (route_start_ts == 0) {
// update events
auto prev_events = events_;
auto prev_eidx = eidx_;
updateEvents([&]() {
if (route_start_ts_ == 0) {
// get route start time from initData
auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; });
if (it != new_events->end()) {
route_start_ts = (*it)->mono_time;
route_start_ts_ = (*it)->mono_time;
cur_mono_time_ = route_start_ts_;
}
}
auto prev_events = std::exchange(events, new_events);
auto prev_eidx = std::exchange(eidx, new_eidx);
updating_events = false;
lock.unlock();
// free segments
events_ = new_events;
eidx_ = new_eidx;
segments_merged_ = segments_need_merge;
return true;
});
delete prev_events;
delete[] prev_eidx;
for (int i = 0; i < segments.size(); i++) {
if ((i < begin_idx || i > end_idx) && segments[i]) {
segments[i].reset(nullptr);
}
// free segments out of current semgnt window.
for (int i = 0; i < segments_.size(); i++) {
if ((i < begin_idx || i > end_idx) && segments_[i]) {
segments_[i].reset(nullptr);
}
}
}
void Replay::stream() {
bool waiting_printed = false;
uint64_t cur_mono_time = 0;
float last_print = 0;
cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA;
std::unique_lock lk(lock_);
while (true) {
std::unique_lock lk(lock);
stream_cv_.wait(lk, [=]() { return paused_ == false; });
uint64_t evt_start_ts = seek_ts != -1 ? route_start_ts + (seek_ts * 1e9) : cur_mono_time;
Event cur_event(cur_which, evt_start_ts);
auto eit = std::upper_bound(events->begin(), events->end(), &cur_event, Event::lessThan());
if (eit == events->end()) {
lock.unlock();
if (std::exchange(waiting_printed, true) == false) {
stream_cv_.wait(lk, [=]() { return exit_ || (events_updated_ && !paused_); });
events_updated_ = false;
if (exit_) break;
Event cur_event(cur_which, cur_mono_time_);
auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan());
if (eit == events_->end()) {
qDebug() << "waiting for events...";
}
QThread::msleep(50);
continue;
}
waiting_printed = false;
seek_ts = -1;
qDebug() << "unlogging at" << (int)((cur_mono_time_ - route_start_ts_) * 1e-9);
uint64_t evt_start_ts = cur_mono_time_;
uint64_t loop_start_ts = nanos_since_boot();
qDebug() << "unlogging at" << int((evt_start_ts - route_start_ts) / 1e9);
for (/**/; !updating_events && eit != events->end(); ++eit) {
for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) {
const Event *evt = (*eit);
cur_which = evt->which;
cur_mono_time = evt->mono_time;
current_ts = (cur_mono_time - route_start_ts) / 1e9;
cur_mono_time_ = evt->mono_time;
std::string type;
KJ_IF_MAYBE(e_, static_cast<capnp::DynamicStruct::Reader>(evt->event).which()) {
@ -200,14 +207,15 @@ void Replay::stream() {
}
if (socks.find(type) != socks.end()) {
if (std::abs(current_ts - last_print) > 5.0) {
int current_ts = (cur_mono_time_ - route_start_ts_) / 1e9;
if ((current_ts - last_print) > 5.0) {
last_print = current_ts;
qInfo() << "at " << int(last_print) << "s";
qInfo() << "at " << current_ts << "s";
}
setCurrentSegment(current_ts / 60);
// keep time
long etime = cur_mono_time - evt_start_ts;
long etime = cur_mono_time_ - evt_start_ts;
long rtime = nanos_since_boot() - loop_start_ts;
long us_behind = ((etime - rtime) * 1e-3) + 0.5;
if (us_behind > 0 && us_behind < 1e6) {
@ -217,10 +225,10 @@ void Replay::stream() {
// publish frame
// TODO: publish all frames
if (evt->which == cereal::Event::ROAD_CAMERA_STATE) {
auto it_ = eidx[RoadCam].find(evt->event.getRoadCameraState().getFrameId());
if (it_ != eidx[RoadCam].end()) {
auto it_ = eidx_[RoadCam].find(evt->event.getRoadCameraState().getFrameId());
if (it_ != eidx_[RoadCam].end()) {
EncodeIdx &e = it_->second;
auto &seg = segments[e.segmentNum];
auto &seg = segments_[e.segmentNum];
if (seg && seg->isLoaded()) {
auto &frm = seg->frames[RoadCam];
if (vipc_server == nullptr) {
@ -253,7 +261,5 @@ void Replay::stream() {
}
}
}
lk.unlock();
usleep(0);
}
}

@ -18,8 +18,8 @@ public:
~Replay();
void start(int seconds = 0);
void relativeSeek(int seconds);
void seekTo(int seconds);
void seekTo(int seconds, bool relative = false);
void relativeSeek(int seconds) { seekTo(seconds, true); }
void pause(bool pause);
bool isPaused() const { return paused_; }
@ -33,26 +33,24 @@ protected:
void stream();
void setCurrentSegment(int n);
void mergeSegments(int begin_idx, int end_idx);
bool load_dcam = false, load_ecam = false;
float last_print = 0;
uint64_t route_start_ts = 0;
std::atomic<int> seek_ts = 0;
std::atomic<int> current_ts = 0;
std::atomic<int> current_segment = -1;
void updateEvents(const std::function<bool()>& lambda);
QThread *thread;
// logs
std::mutex lock;
bool paused_ = false;
std::mutex lock_;
std::condition_variable stream_cv_;
std::atomic<bool> updating_events = false;
std::vector<Event *> *events = nullptr;
std::unordered_map<uint32_t, EncodeIdx> *eidx = nullptr;
std::vector<std::unique_ptr<Segment>> segments;
std::vector<int> segments_merged;
std::atomic<bool> updating_events_ = false;
std::atomic<int> current_segment_ = -1;
bool exit_ = false;
bool paused_ = false;
bool events_updated_ = false;
uint64_t route_start_ts_ = 0;
uint64_t cur_mono_time_ = 0;
std::vector<Event *> *events_ = nullptr;
std::unordered_map<uint32_t, EncodeIdx> *eidx_ = nullptr;
std::vector<std::unique_ptr<Segment>> segments_;
std::vector<int> segments_merged_;
// messaging
SubMaster *sm;
@ -60,4 +58,5 @@ protected:
std::set<std::string> socks;
VisionIpcServer *vipc_server = nullptr;
std::unique_ptr<Route> route_;
bool load_dcam = false, load_ecam = false;
};

Loading…
Cancel
Save