diff --git a/tools/cabana/streams/replaystream.cc b/tools/cabana/streams/replaystream.cc index 3fa8bb0fe9..ddd1c1dfed 100644 --- a/tools/cabana/streams/replaystream.cc +++ b/tools/cabana/streams/replaystream.cc @@ -33,13 +33,12 @@ void ReplayStream::mergeSegments() { std::vector new_events; new_events.reserve(seg->log->events.size()); - for (auto it = seg->log->events.cbegin(); it != seg->log->events.cend(); ++it) { - if ((*it)->which == cereal::Event::Which::CAN) { - const uint64_t ts = (*it)->mono_time; - capnp::FlatArrayMessageReader reader((*it)->data); + for (const Event &e : seg->log->events) { + if (e.which == cereal::Event::Which::CAN) { + capnp::FlatArrayMessageReader reader(e.data); auto event = reader.getRoot(); for (const auto &c : event.getCan()) { - new_events.push_back(newEvent(ts, c)); + new_events.push_back(newEvent(e.mono_time, c)); } } } diff --git a/tools/cabana/videowidget.cc b/tools/cabana/videowidget.cc index ad20543755..261c540340 100644 --- a/tools/cabana/videowidget.cc +++ b/tools/cabana/videowidget.cc @@ -257,13 +257,13 @@ void Slider::setTimeRange(double min, double max) { void Slider::parseQLog(int segnum, std::shared_ptr qlog) { const auto &segments = qobject_cast(can)->route()->segments(); if (segments.size() > 0 && segnum == segments.rbegin()->first && !qlog->events.empty()) { - emit updateMaximumTime(qlog->events.back()->mono_time / 1e9 - can->routeStartTime()); + emit updateMaximumTime(qlog->events.back().mono_time / 1e9 - can->routeStartTime()); } std::mutex mutex; - QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event *e) { - if (e->which == cereal::Event::Which::THUMBNAIL) { - capnp::FlatArrayMessageReader reader(e->data); + QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event &e) { + if (e.which == cereal::Event::Which::THUMBNAIL) { + capnp::FlatArrayMessageReader reader(e.data); auto thumb = reader.getRoot().getThumbnail(); auto data = thumb.getThumbnail(); if (QPixmap pm; pm.loadFromData(data.begin(), data.size(), "jpeg")) { @@ -271,13 +271,13 @@ void Slider::parseQLog(int segnum, std::shared_ptr qlog) { std::lock_guard lk(mutex); thumbnails[thumb.getTimestampEof()] = scaled; } - } else if (e->which == cereal::Event::Which::CONTROLS_STATE) { - capnp::FlatArrayMessageReader reader(e->data); + } else if (e.which == cereal::Event::Which::CONTROLS_STATE) { + capnp::FlatArrayMessageReader reader(e.data); auto cs = reader.getRoot().getControlsState(); if (cs.getAlertType().size() > 0 && cs.getAlertText1().size() > 0 && cs.getAlertSize() != cereal::ControlsState::AlertSize::NONE) { std::lock_guard lk(mutex); - alerts.emplace(e->mono_time, AlertInfo{cs.getAlertStatus(), cs.getAlertText1().cStr(), cs.getAlertText2().cStr()}); + alerts.emplace(e.mono_time, AlertInfo{cs.getAlertStatus(), cs.getAlertText1().cStr(), cs.getAlertText2().cStr()}); } } }); diff --git a/tools/replay/consoleui.cc b/tools/replay/consoleui.cc index 54056c6cd5..eaff78c691 100644 --- a/tools/replay/consoleui.cc +++ b/tools/replay/consoleui.cc @@ -172,7 +172,7 @@ void ConsoleUI::updateStatus() { if (status != Status::Paused) { auto events = replay->events(); uint64_t current_mono_time = replay->routeStartTime() + replay->currentSeconds() * 1e9; - bool playing = !events->empty() && events->back()->mono_time > current_mono_time; + bool playing = !events->empty() && events->back().mono_time > current_mono_time; status = playing ? Status::Playing : Status::Waiting; } auto [status_str, status_color] = status_text[status]; @@ -368,7 +368,6 @@ void ConsoleUI::handleKey(char c) { } else if (c == ' ') { pauseReplay(!replay->isPaused()); } else if (c == 'q' || c == 'Q') { - replay->stop(); qApp->exit(); } } diff --git a/tools/replay/filereader.cc b/tools/replay/filereader.cc index 22af7f5f86..d74aaebaba 100644 --- a/tools/replay/filereader.cc +++ b/tools/replay/filereader.cc @@ -35,7 +35,10 @@ std::string FileReader::read(const std::string &file, std::atomic *abort) std::string FileReader::download(const std::string &url, std::atomic *abort) { for (int i = 0; i <= max_retries_ && !(abort && *abort); ++i) { - if (i > 0) rWarning("download failed, retrying %d", i); + if (i > 0) { + rWarning("download failed, retrying %d", i); + util::sleep_for(3000); + } std::string result = httpGet(url, chunk_size_, abort); if (!result.empty()) { diff --git a/tools/replay/logreader.cc b/tools/replay/logreader.cc index 36b07f19d0..f52ef4a4eb 100644 --- a/tools/replay/logreader.cc +++ b/tools/replay/logreader.cc @@ -4,16 +4,6 @@ #include "tools/replay/filereader.h" #include "tools/replay/util.h" -LogReader::LogReader(size_t memory_pool_block_size) { - events.reserve(memory_pool_block_size); -} - -LogReader::~LogReader() { - for (Event *e : events) { - delete e; - } -} - bool LogReader::load(const std::string &url, std::atomic *abort, bool local_cache, int chunk_size, int retries) { raw_ = FileReader(local_cache, chunk_size, retries).read(url, abort); if (raw_.empty()) return false; @@ -22,17 +12,13 @@ bool LogReader::load(const std::string &url, std::atomic *abort, bool loca raw_ = decompressBZ2(raw_, abort); if (raw_.empty()) return false; } - return parse(abort); + return load(raw_.data(), raw_.size(), abort); } -bool LogReader::load(const std::byte *data, size_t size, std::atomic *abort) { - raw_.assign((const char *)data, size); - return parse(abort); -} - -bool LogReader::parse(std::atomic *abort) { +bool LogReader::load(const char *data, size_t size, std::atomic *abort) { try { - kj::ArrayPtr words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); + events.reserve(65000); + kj::ArrayPtr words((const capnp::word *)data, size / sizeof(capnp::word)); while (words.size() > 0 && !(abort && *abort)) { capnp::FlatArrayMessageReader reader(words); auto event = reader.getRoot(); @@ -40,16 +26,16 @@ bool LogReader::parse(std::atomic *abort) { uint64_t mono_time = event.getLogMonoTime(); auto event_data = kj::arrayPtr(words.begin(), reader.getEnd()); - Event *evt = events.emplace_back(newEvent(which, mono_time, event_data)); + const Event &evt = events.emplace_back(which, mono_time, event_data); // Add encodeIdx packet again as a frame packet for the video stream - if (evt->which == cereal::Event::ROAD_ENCODE_IDX || - evt->which == cereal::Event::DRIVER_ENCODE_IDX || - evt->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) { + if (evt.which == cereal::Event::ROAD_ENCODE_IDX || + evt.which == cereal::Event::DRIVER_ENCODE_IDX || + evt.which == cereal::Event::WIDE_ROAD_ENCODE_IDX) { auto idx = capnp::AnyStruct::Reader(event).getPointerSection()[0].getAs(); if (uint64_t sof = idx.getTimestampSof()) { mono_time = sof; } - events.emplace_back(newEvent(which, mono_time, event_data, idx.getSegmentNum())); + events.emplace_back(which, mono_time, event_data, idx.getSegmentNum()); } words = kj::arrayPtr(reader.getEnd(), words.end()); @@ -59,16 +45,9 @@ bool LogReader::parse(std::atomic *abort) { } if (!events.empty() && !(abort && *abort)) { - std::sort(events.begin(), events.end(), Event::lessThan()); + events.shrink_to_fit(); + std::sort(events.begin(), events.end()); return true; } return false; } - -Event *LogReader::newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr &words, int eidx_segnum) { -#ifdef HAS_MEMORY_RESOURCE - return new (&mbr_) Event(which, mono_time, words, eidx_segnum); -#else - return new Event(which, mono_time, words, eidx_segnum); -#endif -} diff --git a/tools/replay/logreader.h b/tools/replay/logreader.h index 2a28d7b432..56633c191b 100644 --- a/tools/replay/logreader.h +++ b/tools/replay/logreader.h @@ -1,10 +1,5 @@ #pragma once -#if __has_include() -#define HAS_MEMORY_RESOURCE 1 -#include -#endif -#include #include #include @@ -13,27 +8,15 @@ const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam}; const int MAX_CAMERAS = std::size(ALL_CAMERAS); -const int DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE = 65000; class Event { public: Event(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr &data, int eidx_segnum = -1) : which(which), mono_time(mono_time), data(data), eidx_segnum(eidx_segnum) {} - struct lessThan { - inline bool operator()(const Event *l, const Event *r) { - return l->mono_time < r->mono_time || (l->mono_time == r->mono_time && l->which < r->which); - } - }; - -#if HAS_MEMORY_RESOURCE - void *operator new(size_t size, std::pmr::monotonic_buffer_resource *mbr) { - return mbr->allocate(size); - } - void operator delete(void *ptr) { - // No-op. memory used by EventMemoryPool increases monotonically until the logReader is destroyed. + bool operator<(const Event &other) const { + return mono_time < other.mono_time || (mono_time == other.mono_time && which < other.which); } -#endif uint64_t mono_time; cereal::Event::Which which; @@ -43,18 +26,11 @@ public: class LogReader { public: - LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE); - ~LogReader(); bool load(const std::string &url, std::atomic *abort = nullptr, bool local_cache = false, int chunk_size = -1, int retries = 0); - bool load(const std::byte *data, size_t size, std::atomic *abort = nullptr); - std::vector events; + bool load(const char *data, size_t size, std::atomic *abort = nullptr); + std::vector events; private: - Event *newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr &words, int eidx_segnum = -1); - bool parse(std::atomic *abort); std::string raw_; -#ifdef HAS_MEMORY_RESOURCE - std::pmr::monotonic_buffer_resource mbr_{DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE * sizeof(Event)}; -#endif }; diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index 2e50722551..e7657f3531 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -2,15 +2,20 @@ #include #include - #include +#include #include "cereal/services.h" #include "common/params.h" #include "common/timing.h" #include "tools/replay/util.h" +static void interrupt_sleep_handler(int signal) {} + Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_, uint32_t flags, QString data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) { + // Register signal handler for SIGUSR1 + std::signal(SIGUSR1, interrupt_sleep_handler); + if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) { block << "uiDebug" << "userFlag"; } @@ -33,28 +38,21 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s pm = std::make_unique(s); } route_ = std::make_unique(route, data_dir); - events_ = std::make_unique>(); - new_events_ = std::make_unique>(); } Replay::~Replay() { - stop(); -} - -void Replay::stop() { if (!stream_thread_ && segments_.empty()) return; rInfo("shutdown: in progress..."); if (stream_thread_ != nullptr) { - exit_ = updating_events_ = true; + exit_ =true; + paused_ = true; stream_cv_.notify_one(); stream_thread_->quit(); stream_thread_->wait(); - stream_thread_ = nullptr; + delete stream_thread_; } - camera_server_.reset(nullptr); timeline_future.waitForFinished(); - segments_.clear(); rInfo("shutdown: done"); } @@ -84,13 +82,12 @@ void Replay::start(int seconds) { seekTo(route_->identifier().begin_segment * 60 + seconds, false); } -void Replay::updateEvents(const std::function &lambda) { - // set updating_events to true to force stream thread release the lock and wait for events_updated. - updating_events_ = true; +void Replay::updateEvents(const std::function &update_events_function) { + pauseStreamThread(); { std::unique_lock lk(stream_lock_); - events_updated_ = lambda(); - updating_events_ = false; + events_ready_ = update_events_function(); + paused_ = user_paused_; } stream_cv_.notify_one(); } @@ -117,7 +114,7 @@ void Replay::seekTo(double seconds, bool relative) { } return segment_merged; }); - queueSegment(); + updateSegmentsCache(); } void Replay::seekToFlag(FindFlag flag) { @@ -146,34 +143,34 @@ void Replay::buildTimeline() { std::shared_ptr log(new LogReader()); if (!log->load(it->second.qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue; - for (const Event *e : log->events) { - if (e->which == cereal::Event::Which::CONTROLS_STATE) { - capnp::FlatArrayMessageReader reader(e->data); + for (const Event &e : log->events) { + if (e.which == cereal::Event::Which::CONTROLS_STATE) { + capnp::FlatArrayMessageReader reader(e.data); auto event = reader.getRoot(); auto cs = event.getControlsState(); if (engaged != cs.getEnabled()) { if (engaged) { std::lock_guard lk(timeline_lock); - timeline.push_back({toSeconds(engaged_begin), toSeconds(e->mono_time), TimelineType::Engaged}); + timeline.push_back({toSeconds(engaged_begin), toSeconds(e.mono_time), TimelineType::Engaged}); } - engaged_begin = e->mono_time; + engaged_begin = e.mono_time; engaged = cs.getEnabled(); } if (alert_type != cs.getAlertType().cStr() || alert_status != cs.getAlertStatus()) { if (!alert_type.empty() && alert_size != cereal::ControlsState::AlertSize::NONE) { std::lock_guard lk(timeline_lock); - timeline.push_back({toSeconds(alert_begin), toSeconds(e->mono_time), timeline_types[(int)alert_status]}); + timeline.push_back({toSeconds(alert_begin), toSeconds(e.mono_time), timeline_types[(int)alert_status]}); } - alert_begin = e->mono_time; + alert_begin = e.mono_time; alert_type = cs.getAlertType().cStr(); alert_size = cs.getAlertSize(); alert_status = cs.getAlertStatus(); } - } else if (e->which == cereal::Event::Which::USER_FLAG) { + } else if (e.which == cereal::Event::Which::USER_FLAG) { std::lock_guard lk(timeline_lock); - timeline.push_back({toSeconds(e->mono_time), toSeconds(e->mono_time), TimelineType::UserFlag}); + timeline.push_back({toSeconds(e.mono_time), toSeconds(e.mono_time), TimelineType::UserFlag}); } } std::sort(timeline.begin(), timeline.end(), [](auto &l, auto &r) { return std::get<2>(l) < std::get<2>(r); }); @@ -203,16 +200,22 @@ std::optional Replay::find(FindFlag flag) { } void Replay::pause(bool pause) { - updateEvents([=]() { - rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds()); - paused_ = pause; - return true; - }); + if (user_paused_ != pause) { + pauseStreamThread(); + { + std::unique_lock lk(stream_lock_); + rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds()); + paused_ = user_paused_ = pause; + } + stream_cv_.notify_one(); + } } -void Replay::setCurrentSegment(int n) { - if (current_segment_.exchange(n) != n) { - QMetaObject::invokeMethod(this, &Replay::queueSegment, Qt::QueuedConnection); +void Replay::pauseStreamThread() { + paused_ = true; + // Send SIGUSR1 to interrupt clock_nanosleep + if (stream_thread_ && stream_thread_id) { + pthread_kill(stream_thread_id, SIGUSR1); } } @@ -222,27 +225,22 @@ void Replay::segmentLoadFinished(bool success) { rWarning("failed to load segment %d, removing it from current replay list", seg->seg_num); updateEvents([&]() { segments_.erase(seg->seg_num); - return true; + return !segments_.empty(); }); } - queueSegment(); + updateSegmentsCache(); } -void Replay::queueSegment() { +void Replay::updateSegmentsCache() { auto cur = segments_.lower_bound(current_segment_.load()); if (cur == segments_.end()) return; + // Calculate the range of segments to load auto begin = std::prev(cur, std::min(segment_cache_limit / 2, std::distance(segments_.begin(), cur))); auto end = std::next(begin, std::min(segment_cache_limit, std::distance(begin, segments_.end()))); begin = std::prev(end, std::min(segment_cache_limit, std::distance(segments_.begin(), end))); - // load one segment at a time - auto it = std::find_if(cur, end, [](auto &it) { return !it.second || !it.second->isLoaded(); }); - if (it != end && !it->second) { - rDebug("loading segment %d...", it->first); - it->second = std::make_unique(it->first, route_->at(it->first), flags_); - QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); - } + loadSegmentInRange(begin, cur, end); mergeSegments(begin, end); // free segments out of current semgnt window. @@ -257,69 +255,81 @@ void Replay::queueSegment() { } } +void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) { + auto loadNext = [this](auto begin, auto end) { + auto it = std::find_if(begin, end, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); }); + if (it != end && !it->second) { + rDebug("loading segment %d...", it->first); + it->second = std::make_unique(it->first, route_->at(it->first), flags_); + QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); + return true; + } + return false; + }; + + // Load forward segments, then try reverse + if (!loadNext(cur, end)) { + loadNext(std::make_reverse_iterator(cur), segments_.rend()); + } +} + void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { - std::vector segments_need_merge; + std::set segments_to_merge; size_t new_events_size = 0; for (auto it = begin; it != end; ++it) { if (it->second && it->second->isLoaded()) { - segments_need_merge.push_back(it->first); + segments_to_merge.insert(it->first); new_events_size += it->second->log->events.size(); } } - if (segments_need_merge != segments_merged_) { - std::string s; - for (int i = 0; i < segments_need_merge.size(); ++i) { - s += std::to_string(segments_need_merge[i]); - if (i != segments_need_merge.size() - 1) s += ", "; - } - rDebug("merge segments %s", s.c_str()); - new_events_->clear(); - new_events_->reserve(new_events_size); - for (int n : segments_need_merge) { - size_t size = new_events_->size(); - const auto &events = segments_[n]->log->events; - std::copy_if(events.begin(), events.end(), std::back_inserter(*new_events_), - [this](auto e) { return e->which < sockets_.size() && sockets_[e->which] != nullptr; }); - std::inplace_merge(new_events_->begin(), new_events_->begin() + size, new_events_->end(), Event::lessThan()); - } + if (segments_to_merge == merged_segments_) return; - if (stream_thread_) { - emit segmentsMerged(); + rDebug("merge segments %s", std::accumulate(segments_to_merge.begin(), segments_to_merge.end(), std::string{}, + [](auto & a, int b) { return a + (a.empty() ? "" : ", ") + std::to_string(b); }).c_str()); - // Check if seeking is in progress - if (seeking_to_seconds_ >= 0) { - int target_segment = int(seeking_to_seconds_ / 60); - auto segment_found = std::find(segments_need_merge.begin(), segments_need_merge.end(), target_segment); + std::vector new_events; + new_events.reserve(new_events_size); - // If the target segment is found, emit seekedTo signal and reset seeking_to_seconds_ - if (segment_found != segments_need_merge.end()) { - emit seekedTo(seeking_to_seconds_); - seeking_to_seconds_ = -1; // Reset seeking_to_seconds_ to indicate completion of seek - } - } + // Merge events from segments_to_merge into new_events + for (int n : segments_to_merge) { + size_t size = new_events.size(); + const auto &events = segments_.at(n)->log->events; + std::copy_if(events.begin(), events.end(), std::back_inserter(new_events), + [this](const Event &e) { return e.which < sockets_.size() && sockets_[e.which] != nullptr; }); + std::inplace_merge(new_events.begin(), new_events.begin() + size, new_events.end()); + } + + if (stream_thread_) { + emit segmentsMerged(); + + // Check if seeking is in progress + int target_segment = int(seeking_to_seconds_ / 60); + if (seeking_to_seconds_ >= 0 && segments_to_merge.count(target_segment) > 0) { + emit seekedTo(seeking_to_seconds_); + seeking_to_seconds_ = -1; // Reset seeking_to_seconds_ to indicate completion of seek } - updateEvents([&]() { - events_.swap(new_events_); - segments_merged_ = segments_need_merge; - // Do not wake up the stream thread if the current segment has not been merged. - return isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0); - }); } + + updateEvents([&]() { + events_.swap(new_events); + merged_segments_ = segments_to_merge; + // Wake up the stream thread if the current segment is loaded or invalid. + return isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0); + }); } void Replay::startStream(const Segment *cur_segment) { const auto &events = cur_segment->log->events; - - route_start_ts_ = events.front()->mono_time; + route_start_ts_ = events.front().mono_time; cur_mono_time_ += route_start_ts_ - 1; // get datetime from INIT_DATA, fallback to datetime in the route name route_date_time_ = route()->datetime(); auto it = std::find_if(events.cbegin(), events.cend(), - [](auto e) { return e->which == cereal::Event::Which::INIT_DATA; }); + [](const Event &e) { return e.which == cereal::Event::Which::INIT_DATA; }); if (it != events.cend()) { - capnp::FlatArrayMessageReader reader((*it)->data); + capnp::FlatArrayMessageReader reader(it->data); auto event = reader.getRoot(); uint64_t wall_time = event.getInitData().getWallTimeNanos(); if (wall_time > 0) { @@ -328,9 +338,9 @@ void Replay::startStream(const Segment *cur_segment) { } // write CarParams - it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; }); + it = std::find_if(events.begin(), events.end(), [](const Event &e) { return e.which == cereal::Event::Which::CAR_PARAMS; }); if (it != events.end()) { - capnp::FlatArrayMessageReader reader((*it)->data); + capnp::FlatArrayMessageReader reader(it->data); auto event = reader.getRoot(); car_fingerprint_ = event.getCarParams().getCarFingerprint(); capnp::MallocMessageBuilder builder; @@ -357,8 +367,7 @@ void Replay::startStream(const Segment *cur_segment) { emit segmentsMerged(); // start stream thread stream_thread_ = new QThread(); - QObject::connect(stream_thread_, &QThread::started, [=]() { stream(); }); - QObject::connect(stream_thread_, &QThread::finished, stream_thread_, &QThread::deleteLater); + QObject::connect(stream_thread_, &QThread::started, [=]() { streamThread(); }); stream_thread_->start(); timeline_future = QtConcurrent::run(this, &Replay::buildTimeline); @@ -382,83 +391,54 @@ void Replay::publishMessage(const Event *e) { } void Replay::publishFrame(const Event *e) { - static const std::map cam_types{ - {cereal::Event::ROAD_ENCODE_IDX, RoadCam}, - {cereal::Event::DRIVER_ENCODE_IDX, DriverCam}, - {cereal::Event::WIDE_ROAD_ENCODE_IDX, WideRoadCam}, - }; - if ((e->which == cereal::Event::DRIVER_ENCODE_IDX && !hasFlag(REPLAY_FLAG_DCAM)) || - (e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX && !hasFlag(REPLAY_FLAG_ECAM))) { - return; + CameraType cam; + switch (e->which) { + case cereal::Event::ROAD_ENCODE_IDX: cam = RoadCam; break; + case cereal::Event::DRIVER_ENCODE_IDX: cam = DriverCam; break; + case cereal::Event::WIDE_ROAD_ENCODE_IDX: cam = WideRoadCam; break; + default: return; // Invalid event type } + if ((cam == DriverCam && !hasFlag(REPLAY_FLAG_DCAM)) || (cam == WideRoadCam && !hasFlag(REPLAY_FLAG_ECAM))) + return; // Camera isdisabled + if (isSegmentMerged(e->eidx_segnum)) { auto &segment = segments_.at(e->eidx_segnum); - auto cam = cam_types.at(e->which); if (auto &frame = segment->frames[cam]; frame) { camera_server_->pushFrame(cam, frame.get(), e); } } } -void Replay::stream() { +void Replay::streamThread() { + stream_thread_id = pthread_self(); cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA; - double prev_replay_speed = speed_; std::unique_lock lk(stream_lock_); while (true) { - stream_cv_.wait(lk, [=]() { return exit_ || (events_updated_ && !paused_); }); - events_updated_ = false; + stream_cv_.wait(lk, [=]() { return exit_ || ( events_ready_ && !paused_); }); if (exit_) break; - Event cur_event{cur_which, cur_mono_time_, {}}; - auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); - if (eit == events_->end()) { + Event event(cur_which, cur_mono_time_, {}); + auto first = std::upper_bound(events_.cbegin(), events_.cend(), event); + if (first == events_.cend()) { rInfo("waiting for events..."); + events_ready_ = false; continue; } - uint64_t evt_start_ts = cur_mono_time_; - uint64_t loop_start_ts = nanos_since_boot(); - - for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) { - const Event *evt = (*eit); - cur_which = evt->which; - cur_mono_time_ = evt->mono_time; - setCurrentSegment(toSeconds(cur_mono_time_) / 60); - - if (sockets_[cur_which] != nullptr) { - // keep time - long etime = (cur_mono_time_ - evt_start_ts) / speed_; - long rtime = nanos_since_boot() - loop_start_ts; - long behind_ns = etime - rtime; - // if behind_ns is greater than 1 second, it means that an invalid segment is skipped by seeking/replaying - if (behind_ns >= 1 * 1e9 || speed_ != prev_replay_speed) { - // reset event start times - evt_start_ts = cur_mono_time_; - loop_start_ts = nanos_since_boot(); - prev_replay_speed = speed_; - } else if (behind_ns > 0) { - precise_nano_sleep(behind_ns); - } + auto it = publishEvents(first, events_.cend()); - if (evt->eidx_segnum == -1) { - publishMessage(evt); - } else if (camera_server_) { - if (speed_ > 1.0) { - camera_server_->waitForSent(); - } - publishFrame(evt); - } - } - } - // wait for frame to be sent before unlock.(frameReader may be deleted after unlock) + // Ensure frames are sent before unlocking to prevent race conditions if (camera_server_) { camera_server_->waitForSent(); } - if (eit == events_->end() && !hasFlag(REPLAY_FLAG_NO_LOOP)) { - int last_segment = segments_.empty() ? 0 : segments_.rbegin()->first; + if (it != events_.cend()) { + cur_which = it->which; + } else if (!hasFlag(REPLAY_FLAG_NO_LOOP)) { + // Check for loop end and restart if necessary + int last_segment = segments_.rbegin()->first; if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) { rInfo("reaches the end of route, restart from beginning"); QMetaObject::invokeMethod(this, std::bind(&Replay::seekTo, this, 0, false), Qt::QueuedConnection); @@ -466,3 +446,48 @@ void Replay::stream() { } } } + +std::vector::const_iterator Replay::publishEvents(std::vector::const_iterator first, + std::vector::const_iterator last) { + uint64_t evt_start_ts = cur_mono_time_; + uint64_t loop_start_ts = nanos_since_boot(); + double prev_replay_speed = speed_; + + for (; !paused_ && first != last; ++first) { + const Event &evt = *first; + int segment = toSeconds(evt.mono_time) / 60; + + if (current_segment_ != segment) { + current_segment_ = segment; + QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection); + } + + // Skip events if socket is not present + if (!sockets_[evt.which]) continue; + + int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (nanos_since_boot() - loop_start_ts); + // if time_diff is greater than 1 second, it means that an invalid segment is skipped + if (time_diff >= 1e9 || speed_ != prev_replay_speed) { + // reset event start times + evt_start_ts = evt.mono_time; + loop_start_ts = nanos_since_boot(); + prev_replay_speed = speed_; + } else if (time_diff > 0) { + precise_nano_sleep(time_diff); + } + + if (paused_) break; + + cur_mono_time_ = evt.mono_time; + if (evt.eidx_segnum == -1) { + publishMessage(&evt); + } else if (camera_server_) { + if (speed_ > 1.0) { + camera_server_->waitForSent(); + } + publishFrame(&evt); + } + } + + return first; +} diff --git a/tools/replay/replay.h b/tools/replay/replay.h index c4140dc806..b8f7852e4f 100644 --- a/tools/replay/replay.h +++ b/tools/replay/replay.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -53,11 +54,10 @@ public: ~Replay(); bool load(); void start(int seconds = 0); - void stop(); void pause(bool pause); void seekToFlag(FindFlag flag); void seekTo(double seconds, bool relative); - inline bool isPaused() const { return paused_; } + inline bool isPaused() const { return user_paused_; } // the filter is called in streaming thread.try to return quickly from it to avoid blocking streaming. // the filter function must return true if the event should be filtered. // otherwise it must return false. @@ -79,7 +79,7 @@ public: inline int totalSeconds() const { return (!segments_.empty()) ? (segments_.rbegin()->first + 1) * 60 : 0; } inline void setSpeed(float speed) { speed_ = speed; } inline float getSpeed() const { return speed_; } - inline const std::vector *events() const { return events_.get(); } + inline const std::vector *events() const { return &events_; } inline const std::map> &segments() const { return segments_; } inline const std::string &carFingerprint() const { return car_fingerprint_; } inline const std::vector> getTimeline() { @@ -99,36 +99,37 @@ protected slots: protected: typedef std::map> SegmentMap; std::optional find(FindFlag flag); + void pauseStreamThread(); void startStream(const Segment *cur_segment); - void stream(); - void setCurrentSegment(int n); - void queueSegment(); + void streamThread(); + void updateSegmentsCache(); + void loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end); void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end); - void updateEvents(const std::function& lambda); + void updateEvents(const std::function& update_events_function); + std::vector::const_iterator publishEvents(std::vector::const_iterator first, + std::vector::const_iterator last); void publishMessage(const Event *e); void publishFrame(const Event *e); void buildTimeline(); - inline bool isSegmentMerged(int n) { - return std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end(); - } + inline bool isSegmentMerged(int n) const { return merged_segments_.count(n) > 0; } + pthread_t stream_thread_id = 0; QThread *stream_thread_ = nullptr; std::mutex stream_lock_; + bool user_paused_ = false; std::condition_variable stream_cv_; - std::atomic updating_events_ = false; std::atomic current_segment_ = 0; double seeking_to_seconds_ = -1; SegmentMap segments_; // the following variables must be protected with stream_lock_ std::atomic exit_ = false; - bool paused_ = false; - bool events_updated_ = false; + std::atomic paused_ = false; + bool events_ready_ = false; QDateTime route_date_time_; uint64_t route_start_ts_ = 0; std::atomic cur_mono_time_ = 0; - std::unique_ptr> events_; - std::unique_ptr> new_events_; - std::vector segments_merged_; + std::vector events_; + std::set merged_segments_; // messaging SubMaster *sm = nullptr; diff --git a/tools/replay/route.cc b/tools/replay/route.cc index db7a959595..f2a0754da1 100644 --- a/tools/replay/route.cc +++ b/tools/replay/route.cc @@ -77,7 +77,7 @@ bool Route::loadFromServer(int retries) { return false; } rWarning("Retrying %d/%d", i, retries); - util::sleep_for(500); + util::sleep_for(3000); } return false; } diff --git a/tools/replay/tests/test_replay.cc b/tools/replay/tests/test_replay.cc index a681f347bb..6c005f1bd4 100644 --- a/tools/replay/tests/test_replay.cc +++ b/tools/replay/tests/test_replay.cc @@ -1,7 +1,6 @@ #include #include -#include #include #include "catch2/catch.hpp" @@ -67,7 +66,7 @@ TEST_CASE("LogReader") { corrupt_content.resize(corrupt_content.length() / 2); corrupt_content = decompressBZ2(corrupt_content); LogReader log; - REQUIRE(log.load((std::byte *)corrupt_content.data(), corrupt_content.size())); + REQUIRE(log.load(corrupt_content.data(), corrupt_content.size())); REQUIRE(log.events.size() > 0); } } @@ -88,7 +87,7 @@ void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) { // test LogReader & FrameReader REQUIRE(segment.log->events.size() > 0); - REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end(), Event::lessThan())); + REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end())); for (auto cam : ALL_CAMERAS) { auto &fr = segment.frames[cam]; @@ -158,63 +157,20 @@ TEST_CASE("Remote route") { } } -// helper class for unit tests -class TestReplay : public Replay { - public: - TestReplay(const QString &route, uint32_t flags = REPLAY_FLAG_NO_FILE_CACHE | REPLAY_FLAG_NO_VIPC) : Replay(route, {}, {}, nullptr, flags) {} - void test_seek(); - void testSeekTo(int seek_to); -}; - -void TestReplay::testSeekTo(int seek_to) { - seekTo(seek_to, false); - - while (true) { - std::unique_lock lk(stream_lock_); - stream_cv_.wait(lk, [=]() { return events_updated_ == true; }); - events_updated_ = false; - if (cur_mono_time_ != route_start_ts_ + seek_to * 1e9) { - // wake up by the previous merging, skip it. - continue; - } - - Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_, {}); - auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan()); - if (eit == events_->end()) { - qDebug() << "waiting for events..."; - continue; - } - - REQUIRE(std::is_sorted(events_->begin(), events_->end(), Event::lessThan())); - const int seek_to_segment = seek_to / 60; - const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9; - current_segment_ = event_seconds / 60; - INFO("seek to [" << seek_to << "s segment " << seek_to_segment << "], events [" << event_seconds << "s segment" << current_segment_ << "]"); - REQUIRE(event_seconds >= seek_to); - if (event_seconds > seek_to) { - auto it = segments_.lower_bound(seek_to_segment); - REQUIRE(it->first == current_segment_); - } - break; - } -} - -void TestReplay::test_seek() { - // create a dummy stream thread - stream_thread_ = new QThread(this); +TEST_CASE("seek_to") { QEventLoop loop; - std::thread thread = std::thread([&]() { - for (int i = 0; i < 10; ++i) { - testSeekTo(util::random_int(0, 2 * 60)); - } + int seek_to = util::random_int(0, 2 * 59); + Replay replay(DEMO_ROUTE, {}, {}, nullptr, REPLAY_FLAG_NO_VIPC); + + QObject::connect(&replay, &Replay::seekedTo, [&](double sec) { + INFO("seek to " << seek_to << "s seeked to" << sec); + REQUIRE(sec >= seek_to); loop.quit(); }); - loop.exec(); - thread.join(); -} -TEST_CASE("Replay") { - TestReplay replay(DEMO_ROUTE); REQUIRE(replay.load()); - replay.test_seek(); + replay.start(); + replay.seekTo(seek_to, false); + + loop.exec(); } diff --git a/tools/replay/util.cc b/tools/replay/util.cc index deb6293745..f95e1e75b1 100644 --- a/tools/replay/util.cc +++ b/tools/replay/util.cc @@ -4,10 +4,10 @@ #include #include -#include -#include #include #include +#include +#include #include #include #include @@ -158,7 +158,10 @@ size_t getRemoteFileSize(const std::string &url, std::atomic *abort) { int still_running = 1; while (still_running > 0 && !(abort && *abort)) { CURLMcode mc = curl_multi_perform(cm, &still_running); - if (!mc) curl_multi_wait(cm, nullptr, 0, 1000, nullptr); + if (mc != CURLM_OK) break; + if (still_running > 0) { + curl_multi_wait(cm, nullptr, 0, 1000, nullptr); + } } double content_length = -1; @@ -208,10 +211,20 @@ bool httpDownload(const std::string &url, T &buf, size_t chunk_size, size_t cont } int still_running = 1; + size_t prev_written = 0; while (still_running > 0 && !(abort && *abort)) { - curl_multi_wait(cm, nullptr, 0, 1000, nullptr); - curl_multi_perform(cm, &still_running); - download_stats.update(url, written); + CURLMcode mc = curl_multi_perform(cm, &still_running); + if (mc != CURLM_OK) { + break; + } + if (still_running > 0) { + curl_multi_wait(cm, nullptr, 0, 1000, nullptr); + } + + if (((written - prev_written) / (double)content_length) >= 0.01) { + download_stats.update(url, written); + prev_written = written; + } } CURLMsg *msg; @@ -304,9 +317,11 @@ std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic return {}; } -void precise_nano_sleep(long sleep_ns) { - struct timespec req = {.tv_sec = 0, .tv_nsec = sleep_ns}; - struct timespec rem = {}; +void precise_nano_sleep(int64_t nanoseconds) { + struct timespec req, rem; + + req.tv_sec = nanoseconds / 1e9; + req.tv_nsec = nanoseconds % (int64_t)1e9; while (clock_nanosleep(CLOCK_MONOTONIC, 0, &req, &rem) && errno == EINTR) { // Retry sleep if interrupted by a signal req = rem; diff --git a/tools/replay/util.h b/tools/replay/util.h index 6c808095e8..fdb1dbf0f8 100644 --- a/tools/replay/util.h +++ b/tools/replay/util.h @@ -21,7 +21,7 @@ void logMessage(ReplyMsgType type, const char* fmt, ...); #define rError(fmt, ...) ::logMessage(ReplyMsgType::Critical , fmt, ## __VA_ARGS__) std::string sha256(const std::string &str); -void precise_nano_sleep(long sleep_ns); +void precise_nano_sleep(int64_t nanoseconds); std::string decompressBZ2(const std::string &in, std::atomic *abort = nullptr); std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic *abort = nullptr); std::string getUrlWithoutQuery(const std::string &url);