diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index 52071ad1f5..491f724858 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -24,7 +24,7 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s } route_ = std::make_unique(route); - events = new std::vector(); + events_ = new std::vector(); // queueSegment is always executed in the main thread connect(this, &Replay::segmentChanged, this, &Replay::queueSegment); } @@ -41,7 +41,7 @@ void Replay::start(int seconds){ } qDebug() << "load route" << route_->name() << route_->size() << "segments, start from" << seconds; - segments.resize(route_->size()); + segments_.resize(route_->size()); seekTo(seconds); // start stream thread @@ -50,53 +50,65 @@ void Replay::start(int seconds){ thread->start(); } -void Replay::seekTo(int seconds) { - if (segments.empty()) return; - - updating_events = true; - - std::unique_lock lk(lock); - seconds = std::clamp(seconds, 0, (int)segments.size() * 60); - qInfo() << "seeking to " << seconds; - seek_ts = seconds; - setCurrentSegment(std::clamp(seconds / 60, 0, (int)segments.size() - 1)); - updating_events = false; +void Replay::updateEvents(const std::function& lambda) { + // set updating_events to true to force stream thread relase the lock and wait for evnets_udpated. + updating_events_ = true; + { + std::unique_lock lk(lock_); + events_updated_ = lambda(); + updating_events_ = false; + } + stream_cv_.notify_one(); } -void Replay::relativeSeek(int seconds) { - seekTo(current_ts + seconds); +void Replay::seekTo(int seconds, bool relative) { + if (segments_.empty()) return; + + updateEvents([&]() { + if (relative) { + seconds += ((cur_mono_time_ - route_start_ts_) * 1e-9); + } + seconds = std::clamp(seconds, 0, (int)segments_.size() * 60 - 1); + qInfo() << "seeking to " << seconds; + + int segment = seconds / 60; + bool segment_changed = (segment != current_segment_); + + cur_mono_time_ = route_start_ts_ + seconds * 1e9; + setCurrentSegment(segment); + bool segment_loaded = std::find(segments_merged_.begin(), segments_merged_.end(), segment) != segments_merged_.end(); + // return false if segment changed and not loaded yet + return !segment_changed || segment_loaded; + }); } void Replay::pause(bool pause) { - updating_events = true; - std::unique_lock lk(lock); - qDebug() << (pause ? "paused..." : "resuming"); - paused_ = pause; - updating_events = false; - stream_cv_.notify_one(); + updateEvents([=]() { + qDebug() << (pause ? "paused..." : "resuming"); + paused_ = pause; + return true; + }); } void Replay::setCurrentSegment(int n) { - if (current_segment.exchange(n) != n) { + if (current_segment_.exchange(n) != n) { emit segmentChanged(n); } } // maintain the segment window void Replay::queueSegment() { - assert(QThread::currentThreadId() == qApp->thread()->currentThreadId()); - // fetch segments forward - int cur_seg = current_segment.load(); + int cur_seg = current_segment_.load(); int end_idx = cur_seg; - for (int i = cur_seg, fwd = 0; i < segments.size() && fwd <= FORWARD_SEGS; ++i) { - if (!segments[i]) { - segments[i] = std::make_unique(i, route_->at(i), load_dcam, load_ecam); - QObject::connect(segments[i].get(), &Segment::loadFinished, this, &Replay::queueSegment); + for (int i = cur_seg, fwd = 0; i < segments_.size() && fwd <= FORWARD_SEGS; ++i) { + if (!segments_[i]) { + segments_[i] = std::make_unique(i, route_->at(i), load_dcam, load_ecam); + QObject::connect(segments_[i].get(), &Segment::loadFinished, this, &Replay::queueSegment); } end_idx = i; // skip invalid segment - fwd += segments[i]->isValid(); + fwd += segments_[i]->isValid(); } // merge segments @@ -108,7 +120,7 @@ void Replay::mergeSegments(int cur_seg, int end_idx) { std::vector segments_need_merge; const int begin_idx = std::max(cur_seg - BACKWARD_SEGS, 0); for (int i = begin_idx; i <= end_idx; ++i) { - if (segments[i] && segments[i]->isLoaded()) { + if (segments_[i] && segments_[i]->isLoaded()) { segments_need_merge.push_back(i); } else if (i >= cur_seg) { // segment is valid,but still loading. can't skip it to merge the next one. @@ -117,15 +129,14 @@ void Replay::mergeSegments(int cur_seg, int end_idx) { } } - if (segments_need_merge != segments_merged) { + if (segments_need_merge != segments_merged_) { qDebug() << "merge segments" << segments_need_merge; - segments_merged = segments_need_merge; + // merge & sort events std::vector *new_events = new std::vector(); std::unordered_map *new_eidx = new std::unordered_map[MAX_CAMERAS]; for (int n : segments_need_merge) { - auto &log = segments[n]->log; - // merge & sort events + auto &log = segments_[n]->log; auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end()); std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan()); for (CameraType cam_type : ALL_CAMERAS) { @@ -133,66 +144,62 @@ void Replay::mergeSegments(int cur_seg, int end_idx) { } } - // update logs - // set updating_events to true to force stream thread relase the lock - updating_events = true; - lock.lock(); - - if (route_start_ts == 0) { - // get route start time from initData - auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); - if (it != new_events->end()) { - route_start_ts = (*it)->mono_time; + // update events + auto prev_events = events_; + auto prev_eidx = eidx_; + updateEvents([&]() { + if (route_start_ts_ == 0) { + // get route start time from initData + auto it = std::find_if(new_events->begin(), new_events->end(), [=](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); + if (it != new_events->end()) { + route_start_ts_ = (*it)->mono_time; + cur_mono_time_ = route_start_ts_; + } } - } - auto prev_events = std::exchange(events, new_events); - auto prev_eidx = std::exchange(eidx, new_eidx); - updating_events = false; - - lock.unlock(); - - // free segments + events_ = new_events; + eidx_ = new_eidx; + segments_merged_ = segments_need_merge; + return true; + }); delete prev_events; delete[] prev_eidx; - for (int i = 0; i < segments.size(); i++) { - if ((i < begin_idx || i > end_idx) && segments[i]) { - segments[i].reset(nullptr); - } + } + + // free segments out of current semgnt window. + for (int i = 0; i < segments_.size(); i++) { + if ((i < begin_idx || i > end_idx) && segments_[i]) { + segments_[i].reset(nullptr); } } } void Replay::stream() { - bool waiting_printed = false; - uint64_t cur_mono_time = 0; + float last_print = 0; cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA; + std::unique_lock lk(lock_); + while (true) { - std::unique_lock lk(lock); - stream_cv_.wait(lk, [=]() { return paused_ == false; }); - - uint64_t evt_start_ts = seek_ts != -1 ? route_start_ts + (seek_ts * 1e9) : cur_mono_time; - Event cur_event(cur_which, evt_start_ts); - auto eit = std::upper_bound(events->begin(), events->end(), &cur_event, Event::lessThan()); - if (eit == events->end()) { - lock.unlock(); - if (std::exchange(waiting_printed, true) == false) { - qDebug() << "waiting for events..."; - } - QThread::msleep(50); + stream_cv_.wait(lk, [=]() { return exit_ || (events_updated_ && !paused_); }); + events_updated_ = false; + if (exit_) break; + + Event cur_event(cur_which, cur_mono_time_); + auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); + if (eit == events_->end()) { + qDebug() << "waiting for events..."; continue; } - waiting_printed = false; - seek_ts = -1; + + qDebug() << "unlogging at" << (int)((cur_mono_time_ - route_start_ts_) * 1e-9); + uint64_t evt_start_ts = cur_mono_time_; uint64_t loop_start_ts = nanos_since_boot(); - qDebug() << "unlogging at" << int((evt_start_ts - route_start_ts) / 1e9); - for (/**/; !updating_events && eit != events->end(); ++eit) { + for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) { const Event *evt = (*eit); cur_which = evt->which; - cur_mono_time = evt->mono_time; - current_ts = (cur_mono_time - route_start_ts) / 1e9; + cur_mono_time_ = evt->mono_time; std::string type; KJ_IF_MAYBE(e_, static_cast(evt->event).which()) { @@ -200,14 +207,15 @@ void Replay::stream() { } if (socks.find(type) != socks.end()) { - if (std::abs(current_ts - last_print) > 5.0) { + int current_ts = (cur_mono_time_ - route_start_ts_) / 1e9; + if ((current_ts - last_print) > 5.0) { last_print = current_ts; - qInfo() << "at " << int(last_print) << "s"; + qInfo() << "at " << current_ts << "s"; } - setCurrentSegment(current_ts / 60); + // keep time - long etime = cur_mono_time - evt_start_ts; + long etime = cur_mono_time_ - evt_start_ts; long rtime = nanos_since_boot() - loop_start_ts; long us_behind = ((etime - rtime) * 1e-3) + 0.5; if (us_behind > 0 && us_behind < 1e6) { @@ -217,10 +225,10 @@ void Replay::stream() { // publish frame // TODO: publish all frames if (evt->which == cereal::Event::ROAD_CAMERA_STATE) { - auto it_ = eidx[RoadCam].find(evt->event.getRoadCameraState().getFrameId()); - if (it_ != eidx[RoadCam].end()) { + auto it_ = eidx_[RoadCam].find(evt->event.getRoadCameraState().getFrameId()); + if (it_ != eidx_[RoadCam].end()) { EncodeIdx &e = it_->second; - auto &seg = segments[e.segmentNum]; + auto &seg = segments_[e.segmentNum]; if (seg && seg->isLoaded()) { auto &frm = seg->frames[RoadCam]; if (vipc_server == nullptr) { @@ -253,7 +261,5 @@ void Replay::stream() { } } } - lk.unlock(); - usleep(0); } } diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index 7d7deab6fb..7987fe6a7a 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -18,8 +18,8 @@ public: ~Replay(); void start(int seconds = 0); - void relativeSeek(int seconds); - void seekTo(int seconds); + void seekTo(int seconds, bool relative = false); + void relativeSeek(int seconds) { seekTo(seconds, true); } void pause(bool pause); bool isPaused() const { return paused_; } @@ -33,26 +33,24 @@ protected: void stream(); void setCurrentSegment(int n); void mergeSegments(int begin_idx, int end_idx); - - bool load_dcam = false, load_ecam = false; - - float last_print = 0; - uint64_t route_start_ts = 0; - std::atomic seek_ts = 0; - std::atomic current_ts = 0; - std::atomic current_segment = -1; + void updateEvents(const std::function& lambda); QThread *thread; // logs - std::mutex lock; - bool paused_ = false; + std::mutex lock_; std::condition_variable stream_cv_; - std::atomic updating_events = false; - std::vector *events = nullptr; - std::unordered_map *eidx = nullptr; - std::vector> segments; - std::vector segments_merged; + std::atomic updating_events_ = false; + std::atomic current_segment_ = -1; + bool exit_ = false; + bool paused_ = false; + bool events_updated_ = false; + uint64_t route_start_ts_ = 0; + uint64_t cur_mono_time_ = 0; + std::vector *events_ = nullptr; + std::unordered_map *eidx_ = nullptr; + std::vector> segments_; + std::vector segments_merged_; // messaging SubMaster *sm; @@ -60,4 +58,5 @@ protected: std::set socks; VisionIpcServer *vipc_server = nullptr; std::unique_ptr route_; + bool load_dcam = false, load_ecam = false; };