diff --git a/tools/cabana/streams/abstractstream.cc b/tools/cabana/streams/abstractstream.cc index 9bf80deb98..08acba9dd8 100644 --- a/tools/cabana/streams/abstractstream.cc +++ b/tools/cabana/streams/abstractstream.cc @@ -126,9 +126,8 @@ const CanData &AbstractStream::lastMessage(const MessageId &id) const { return it != last_msgs.end() ? it->second : empty_data; } -// it is thread safe to update data in updateLastMsgsTo. -// updateLastMsgsTo is always called in UI thread. void AbstractStream::updateLastMsgsTo(double sec) { + std::lock_guard lk(mutex_); current_sec_ = sec; uint64_t last_ts = toMonoTime(sec); std::unordered_map msgs; @@ -160,7 +159,10 @@ void AbstractStream::updateLastMsgsTo(double sec) { std::any_of(messages_.cbegin(), messages_.cend(), [this](const auto &m) { return !last_msgs.count(m.first); }); last_msgs = messages_; + mutex_.unlock(); + emit msgsReceived(nullptr, id_changed); + resumeStream(); } const CanEvent *AbstractStream::newEvent(uint64_t mono_time, const cereal::CanData::Reader &c) { diff --git a/tools/cabana/streams/abstractstream.h b/tools/cabana/streams/abstractstream.h index 7ae119bcf0..5ecf086c95 100644 --- a/tools/cabana/streams/abstractstream.h +++ b/tools/cabana/streams/abstractstream.h @@ -108,7 +108,7 @@ protected: void mergeEvents(const std::vector &events); const CanEvent *newEvent(uint64_t mono_time, const cereal::CanData::Reader &c); void updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size); - + virtual void resumeStream() {} std::vector all_events_; double current_sec_ = 0; std::optional> time_range_; diff --git a/tools/cabana/streams/replaystream.cc b/tools/cabana/streams/replaystream.cc index bd48d97d52..bfe5ca74da 100644 --- a/tools/cabana/streams/replaystream.cc +++ b/tools/cabana/streams/replaystream.cc @@ -23,13 +23,10 @@ ReplayStream::ReplayStream(QObject *parent) : AbstractStream(parent) { }); } -static bool event_filter(const Event *e, void *opaque) { - return ((ReplayStream *)opaque)->eventFilter(e); -} - void ReplayStream::mergeSegments() { - for (auto &[n, seg] : replay->segments()) { - if (seg && seg->isLoaded() && !processed_segments.count(n)) { + auto event_data = replay->getEventData(); + for (const auto &[n, seg] : event_data->segments) { + if (!processed_segments.count(n)) { processed_segments.insert(n); std::vector new_events; @@ -50,16 +47,16 @@ void ReplayStream::mergeSegments() { bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags) { replay.reset(new Replay(route.toStdString(), {"can", "roadEncodeIdx", "driverEncodeIdx", "wideRoadEncodeIdx", "carParams"}, - {}, nullptr, replay_flags, data_dir.toStdString(), this)); + {}, nullptr, replay_flags, data_dir.toStdString())); replay->setSegmentCacheLimit(settings.max_cached_minutes); - replay->installEventFilter(event_filter, this); + replay->installEventFilter([this](const Event *event) { return eventFilter(event); }); // Forward replay callbacks to corresponding Qt signals. + replay->onSeeking = [this](double sec) { emit seeking(sec); }; + replay->onSeekedTo = [this](double sec) { emit seekedTo(sec); }; replay->onQLogLoaded = [this](std::shared_ptr qlog) { emit qLogLoaded(qlog); }; + replay->onSegmentsMerged = [this]() { QMetaObject::invokeMethod(this, &ReplayStream::mergeSegments, Qt::QueuedConnection); }; - QObject::connect(replay.get(), &Replay::seeking, this, &AbstractStream::seeking); - QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo); - QObject::connect(replay.get(), &Replay::segmentsMerged, this, &ReplayStream::mergeSegments); bool success = replay->load(); if (!success) { if (replay->lastRouteError() == RouteLoadError::Unauthorized) { diff --git a/tools/cabana/streams/replaystream.h b/tools/cabana/streams/replaystream.h index 1d1cdaec9e..2d7f335193 100644 --- a/tools/cabana/streams/replaystream.h +++ b/tools/cabana/streams/replaystream.h @@ -22,7 +22,7 @@ public: bool eventFilter(const Event *event); void seekTo(double ts) override { replay->seekTo(std::max(double(0), ts), false); } bool liveStreaming() const override { return false; } - inline QString routeName() const override { return QString::fromStdString(replay->route()->name()); } + inline QString routeName() const override { return QString::fromStdString(replay->route().name()); } inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); } double minSeconds() const override { return replay->minSeconds(); } double maxSeconds() const { return replay->maxSeconds(); } @@ -32,6 +32,7 @@ public: inline float getSpeed() const { return replay->getSpeed(); } inline Replay *getReplay() const { return replay.get(); } inline bool isPaused() const override { return replay->isPaused(); } + void resumeStream() override { return replay->resumeStream(); } void pause(bool pause) override; signals: diff --git a/tools/cabana/videowidget.cc b/tools/cabana/videowidget.cc index 552d86b5d7..0b2beb1dd6 100644 --- a/tools/cabana/videowidget.cc +++ b/tools/cabana/videowidget.cc @@ -247,8 +247,9 @@ void Slider::paintEvent(QPaintEvent *ev) { QColor empty_color = palette().color(QPalette::Window); empty_color.setAlpha(160); - for (const auto &[n, seg] : replay->segments()) { - if (!(seg && seg->isLoaded())) + const auto event_data = replay->getEventData(); + for (const auto &[n, _] : replay->route().segments()) { + if (!event_data->isSegmentLoaded(n)) fillRange(n * 60.0, (n + 1) * 60.0, empty_color); } } diff --git a/tools/replay/SConscript b/tools/replay/SConscript index 4a907849cb..18849407cf 100644 --- a/tools/replay/SConscript +++ b/tools/replay/SConscript @@ -1,8 +1,10 @@ -Import('env', 'qt_env', 'arch', 'common', 'messaging', 'visionipc', 'cereal') +Import('env', 'arch', 'common', 'messaging', 'visionipc', 'cereal') -base_frameworks = qt_env['FRAMEWORKS'] -base_libs = [common, messaging, cereal, visionipc, - 'm', 'ssl', 'crypto', 'pthread', 'qt_util'] + qt_env["LIBS"] +replay_env = env.Clone() +replay_env['CCFLAGS'] += ['-Wno-deprecated-declarations'] + +base_frameworks = [] +base_libs = [common, messaging, cereal, visionipc, 'm', 'ssl', 'crypto', 'pthread'] if arch == "Darwin": base_frameworks.append('OpenCL') @@ -10,11 +12,11 @@ else: base_libs.append('OpenCL') replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", - "route.cc", "util.cc", "timeline.cc", "api.cc"] -replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks) + "route.cc", "util.cc", "seg_mgr.cc", "timeline.cc", "api.cc"] +replay_lib = replay_env.Library("replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks) Export('replay_lib') replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'zstd', 'curl', 'yuv', 'ncurses'] + base_libs -qt_env.Program("replay", ["main.cc"], LIBS=replay_libs, FRAMEWORKS=base_frameworks) +replay_env.Program("replay", ["main.cc"], LIBS=replay_libs, FRAMEWORKS=base_frameworks) if GetOption('extras'): - qt_env.Program('tests/test_replay', ['tests/test_runner.cc', 'tests/test_replay.cc'], LIBS=[replay_libs, base_libs]) + replay_env.Program('tests/test_replay', ['tests/test_replay.cc'], LIBS=replay_libs) diff --git a/tools/replay/consoleui.cc b/tools/replay/consoleui.cc index b5415ac808..503902622c 100644 --- a/tools/replay/consoleui.cc +++ b/tools/replay/consoleui.cc @@ -6,8 +6,6 @@ #include #include -#include - #include "common/ratekeeper.h" #include "common/util.h" #include "common/version.h" @@ -57,6 +55,8 @@ void add_str(WINDOW *w, const char *str, Color color = Color::Default, bool bold if (color != Color::Default) wattroff(w, COLOR_PAIR(color)); } +ExitHandler do_exit; + } // namespace ConsoleUI::ConsoleUI(Replay *replay) : replay(replay), sm({"carState", "liveParameters"}) { @@ -95,6 +95,8 @@ ConsoleUI::ConsoleUI(Replay *replay) : replay(replay), sm({"carState", "livePara } ConsoleUI::~ConsoleUI() { + installDownloadProgressHandler(nullptr); + installMessageHandler(nullptr); endwin(); } @@ -233,7 +235,7 @@ void ConsoleUI::updateProgressBar() { void ConsoleUI::updateSummary() { const auto &route = replay->route(); - mvwprintw(w[Win::Stats], 0, 0, "Route: %s, %lu segments", route->name().c_str(), route->segments().size()); + mvwprintw(w[Win::Stats], 0, 0, "Route: %s, %lu segments", route.name().c_str(), route.segments().size()); mvwprintw(w[Win::Stats], 1, 0, "Car Fingerprint: %s", replay->carFingerprint().c_str()); wrefresh(w[Win::Stats]); } @@ -349,7 +351,8 @@ void ConsoleUI::handleKey(char c) { int ConsoleUI::exec() { RateKeeper rk("Replay", 20); - while (true) { + + while (!do_exit) { int c = getch(); if (c == 'q' || c == 'Q') { break; @@ -373,7 +376,6 @@ int ConsoleUI::exec() { logs.clear(); } - qApp->processEvents(); rk.keepTime(); } return 0; diff --git a/tools/replay/main.cc b/tools/replay/main.cc index b880e99e23..31493d1486 100644 --- a/tools/replay/main.cc +++ b/tools/replay/main.cc @@ -1,6 +1,5 @@ #include -#include #include #include #include @@ -126,7 +125,6 @@ int main(int argc, char *argv[]) { util::set_file_descriptor_limit(1024); #endif - QCoreApplication app(argc, argv); ReplayConfig config; if (!parseArgs(argc, argv, config)) { @@ -138,18 +136,18 @@ int main(int argc, char *argv[]) { op_prefix = std::make_unique(config.prefix); } - Replay *replay = new Replay(config.route, config.allow, config.block, nullptr, config.flags, config.data_dir, &app); + Replay replay(config.route, config.allow, config.block, nullptr, config.flags, config.data_dir); if (config.cache_segments > 0) { - replay->setSegmentCacheLimit(config.cache_segments); + replay.setSegmentCacheLimit(config.cache_segments); } if (config.playback_speed > 0) { - replay->setSpeed(std::clamp(config.playback_speed, ConsoleUI::speed_array.front(), ConsoleUI::speed_array.back())); + replay.setSpeed(std::clamp(config.playback_speed, ConsoleUI::speed_array.front(), ConsoleUI::speed_array.back())); } - if (!replay->load()) { + if (!replay.load()) { return 1; } - ConsoleUI console_ui(replay); - replay->start(config.start_seconds); + ConsoleUI console_ui(&replay); + replay.start(config.start_seconds); return console_ui.exec(); } diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index 82e231937c..2bd3614530 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -4,7 +4,6 @@ #include #include "cereal/services.h" #include "common/params.h" -#include "common/timing.h" #include "tools/replay/util.h" static void interrupt_sleep_handler(int signal) {} @@ -12,145 +11,124 @@ static void interrupt_sleep_handler(int signal) {} // Helper function to notify events with safety checks template void notifyEvent(Callback &callback, Args &&...args) { - if (callback) { - callback(std::forward(args)...); - } + if (callback) callback(std::forward(args)...); } -Replay::Replay(const std::string &route, std::vector allow, std::vector block, SubMaster *sm_, - uint32_t flags, const std::string &data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) { - // Register signal handler for SIGUSR1 +Replay::Replay(const std::string &route, std::vector allow, std::vector block, + SubMaster *sm, uint32_t flags, const std::string &data_dir) + : sm_(sm), flags_(flags), seg_mgr_(std::make_unique(route, flags, data_dir)) { std::signal(SIGUSR1, interrupt_sleep_handler); if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) { block.insert(block.end(), {"uiDebug", "userFlag"}); } + setupServices(allow, block); + setupSegmentManager(!allow.empty() || !block.empty()); +} +void Replay::setupServices(const std::vector &allow, const std::vector &block) { auto event_schema = capnp::Schema::from().asStruct(); - sockets_.resize(event_schema.getUnionFields().size()); - std::vector active_services; + sockets_.resize(event_schema.getUnionFields().size(), nullptr); + std::vector active_services; for (const auto &[name, _] : services) { - bool in_block = std::find(block.begin(), block.end(), name) != block.end(); - bool in_allow = std::find(allow.begin(), allow.end(), name) != allow.end(); - if (!in_block && (allow.empty() || in_allow)) { + bool is_blocked = std::find(block.begin(), block.end(), name) != block.end(); + bool is_allowed = allow.empty() || std::find(allow.begin(), allow.end(), name) != allow.end(); + if (is_allowed && !is_blocked) { uint16_t which = event_schema.getFieldByName(name).getProto().getDiscriminantValue(); sockets_[which] = name.c_str(); - active_services.push_back(name); + active_services.push_back(name.c_str()); } } - - if (!allow.empty()) { - for (int i = 0; i < sockets_.size(); ++i) { - filters_.push_back(i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]); - } + rInfo("active services: %s", join(active_services, ", ").c_str()); + if (!sm_) { + pm_ = std::make_unique(active_services); } +} - rInfo("active services: %s", join(active_services, ", ").c_str()); - rInfo("loading route %s", route.c_str()); +void Replay::setupSegmentManager(bool has_filters) { + seg_mgr_->setCallback([this]() { handleSegmentMerge(); }); - if (sm == nullptr) { - std::vector socket_names; - std::copy_if(sockets_.begin(), sockets_.end(), std::back_inserter(socket_names), - [](const char *name) { return name != nullptr; }); - pm = std::make_unique(socket_names); + if (has_filters) { + std::vector filters(sockets_.size(), false); + for (size_t i = 0; i < sockets_.size(); ++i) { + filters[i] = (i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]); + } + seg_mgr_->setFilters(filters); } - route_ = std::make_unique(route, data_dir); } Replay::~Replay() { - stop(); -} - -void Replay::stop() { - exit_ = true; - if (stream_thread_ != nullptr) { + seg_mgr_.reset(); + if (stream_thread_.joinable()) { rInfo("shutdown: in progress..."); - pauseStreamThread(); - stream_cv_.notify_one(); - stream_thread_->quit(); - stream_thread_->wait(); - stream_thread_->deleteLater(); - stream_thread_ = nullptr; + interruptStream([this]() { + exit_ = true; + return false; + }); + stream_thread_.join(); rInfo("shutdown: done"); } - camera_server_.reset(nullptr); - segments_.clear(); + camera_server_.reset(); } bool Replay::load() { - if (!route_->load()) { - rError("failed to load route %s from %s", route_->name().c_str(), - route_->dir().empty() ? "server" : route_->dir().c_str()); - return false; - } + rInfo("loading route %s", seg_mgr_->route_.name().c_str()); + if (!seg_mgr_->load()) return false; - for (auto &[n, f] : route_->segments()) { - bool has_log = !f.rlog.empty() || !f.qlog.empty(); - bool has_video = !f.road_cam.empty() || !f.qcamera.empty(); - if (has_log && (has_video || hasFlag(REPLAY_FLAG_NO_VIPC))) { - segments_.insert({n, nullptr}); - } - } - if (segments_.empty()) { - rInfo("no valid segments in route: %s", route_->name().c_str()); - return false; - } - rInfo("load route %s with %zu valid segments", route_->name().c_str(), segments_.size()); - max_seconds_ = (segments_.rbegin()->first + 1) * 60; + min_seconds_ = seg_mgr_->route_.segments().begin()->first * 60; + max_seconds_ = (seg_mgr_->route_.segments().rbegin()->first + 1) * 60; return true; } -void Replay::start(int seconds) { - seekTo(route_->identifier().begin_segment * 60 + seconds, false); -} - -void Replay::updateEvents(const std::function &update_events_function) { - pauseStreamThread(); +void Replay::interruptStream(const std::function &update_fn) { + if (stream_thread_.joinable() && stream_thread_id) { + pthread_kill(stream_thread_id, SIGUSR1); // Interrupt sleep in stream thread + } { - std::unique_lock lk(stream_lock_); - events_ready_ = update_events_function(); - paused_ = user_paused_; + interrupt_requested_ = true; + std::unique_lock lock(stream_lock_); + events_ready_ = update_fn(); + interrupt_requested_ = user_paused_; } stream_cv_.notify_one(); } void Replay::seekTo(double seconds, bool relative) { - updateEvents([&]() { - double target_time = relative ? seconds + currentSeconds() : seconds; - target_time = std::max(double(0.0), target_time); - int target_segment = (int)target_time / 60; - if (segments_.count(target_segment) == 0) { - rWarning("Can't seek to %.2f s segment %d is invalid", target_time, target_segment); - return true; - } - if (target_time > max_seconds_) { - rWarning("Can't seek to %.2f s, time is invalid", target_time); - return true; - } + double target_time = relative ? seconds + currentSeconds() : seconds; + target_time = std::max(0.0, target_time); + int target_segment = target_time / 60; + if (!seg_mgr_->hasSegment(target_segment)) { + rWarning("Invalid seek to %.2f s (segment %d)", target_time, target_segment); + return; + } - rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment); + rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment); + notifyEvent(onSeeking, target_time); + + double seeked_to_sec = -1; + interruptStream([&]() { current_segment_ = target_segment; cur_mono_time_ = route_start_ts_ + target_time * 1e9; seeking_to_ = target_time; + + if (event_data_->isSegmentLoaded(target_segment)) { + seeked_to_sec = *seeking_to_; + seeking_to_.reset(); + } return false; }); - checkSeekProgress(); - updateSegmentsCache(); + checkSeekProgress(seeked_to_sec); + seg_mgr_->setCurrentSegment(target_segment); } -void Replay::checkSeekProgress() { - if (seeking_to_) { - auto it = segments_.find(int(*seeking_to_ / 60)); - if (it != segments_.end() && it->second && it->second->isLoaded()) { - emit seekedTo(*seeking_to_); - seeking_to_ = std::nullopt; - // wake up stream thread - updateEvents([]() { return true; }); +void Replay::checkSeekProgress(double seeked_to_sec) { + if (seeked_to_sec >= 0) { + if (onSeekedTo) { + onSeekedTo(seeked_to_sec); } else { - // Emit signal indicating the ongoing seek operation - emit seeking(*seeking_to_); + interruptStream([]() { return true; }); } } } @@ -163,125 +141,45 @@ void Replay::seekToFlag(FindFlag flag) { void Replay::pause(bool pause) { if (user_paused_ != pause) { - pauseStreamThread(); - { - std::unique_lock lk(stream_lock_); + interruptStream([=]() { rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds()); - paused_ = user_paused_ = pause; - } - stream_cv_.notify_one(); - } -} - -void Replay::pauseStreamThread() { - paused_ = true; - // Send SIGUSR1 to interrupt clock_nanosleep - if (stream_thread_ && stream_thread_id) { - pthread_kill(stream_thread_id, SIGUSR1); - } -} - -void Replay::segmentLoadFinished(int seg_num, bool success) { - if (!success) { - rWarning("failed to load segment %d, removing it from current replay list", seg_num); - updateEvents([&]() { - segments_.erase(seg_num); - return !segments_.empty(); + user_paused_ = pause; + return !pause; }); } - QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection); } -void Replay::updateSegmentsCache() { - auto cur = segments_.lower_bound(current_segment_.load()); - if (cur == segments_.end()) return; - - // Calculate the range of segments to load - auto begin = std::prev(cur, std::min(segment_cache_limit / 2, std::distance(segments_.begin(), cur))); - auto end = std::next(begin, std::min(segment_cache_limit, std::distance(begin, segments_.end()))); - begin = std::prev(end, std::min(segment_cache_limit, std::distance(segments_.begin(), end))); - - loadSegmentInRange(begin, cur, end); - mergeSegments(begin, end); +void Replay::handleSegmentMerge() { + if (exit_) return; - // free segments out of current semgnt window. - std::for_each(segments_.begin(), begin, [](auto &e) { e.second.reset(nullptr); }); - std::for_each(end, segments_.end(), [](auto &e) { e.second.reset(nullptr); }); + double seeked_to_sec = -1; + interruptStream([&]() { + event_data_ = seg_mgr_->getEventData(); + notifyEvent(onSegmentsMerged); - // start stream thread - const auto &cur_segment = cur->second; - if (stream_thread_ == nullptr && cur_segment->isLoaded()) { - startStream(cur_segment.get()); - } -} - -void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) { - auto loadNextSegment = [this](auto first, auto last) { - auto it = std::find_if(first, last, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); }); - if (it != last && !it->second) { - rDebug("loading segment %d...", it->first); - it->second = std::make_unique(it->first, route_->at(it->first), flags_, filters_, - [this](int seg_num, bool success) { - segmentLoadFinished(seg_num, success); - }); - return true; + bool segment_loaded = event_data_->isSegmentLoaded(current_segment_); + if (seeking_to_ && segment_loaded) { + seeked_to_sec = *seeking_to_; + seeking_to_.reset(); + return false; } - return false; - }; - - // Try loading forward segments, then reverse segments - if (!loadNextSegment(cur, end)) { - loadNextSegment(std::make_reverse_iterator(cur), std::make_reverse_iterator(begin)); - } -} - -void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { - std::set segments_to_merge; - size_t new_events_size = 0; - for (auto it = begin; it != end; ++it) { - if (it->second && it->second->isLoaded()) { - segments_to_merge.insert(it->first); - new_events_size += it->second->log->events.size(); - } - } - - if (segments_to_merge == merged_segments_) return; - - rDebug("merge segments %s", std::accumulate(segments_to_merge.begin(), segments_to_merge.end(), std::string{}, - [](auto & a, int b) { return a + (a.empty() ? "" : ", ") + std::to_string(b); }).c_str()); - - std::vector new_events; - new_events.reserve(new_events_size); - - // Merge events from segments_to_merge into new_events - for (int n : segments_to_merge) { - size_t size = new_events.size(); - const auto &events = segments_.at(n)->log->events; - std::copy_if(events.begin(), events.end(), std::back_inserter(new_events), - [this](const Event &e) { return e.which < sockets_.size() && sockets_[e.which] != nullptr; }); - std::inplace_merge(new_events.begin(), new_events.begin() + size, new_events.end()); - } + return segment_loaded; + }); - if (stream_thread_) { - emit segmentsMerged(); + checkSeekProgress(seeked_to_sec); + if (!stream_thread_.joinable() && !event_data_->events.empty()) { + startStream(); } - - updateEvents([&]() { - events_.swap(new_events); - merged_segments_ = segments_to_merge; - // Wake up the stream thread if the current segment is loaded or invalid. - return !seeking_to_ && (isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0)); - }); - checkSeekProgress(); } -void Replay::startStream(const Segment *cur_segment) { +void Replay::startStream() { + const auto &cur_segment = event_data_->segments.begin()->second; const auto &events = cur_segment->log->events; route_start_ts_ = events.front().mono_time; cur_mono_time_ += route_start_ts_ - 1; // get datetime from INIT_DATA, fallback to datetime in the route name - route_date_time_ = route()->datetime(); + route_date_time_ = route().datetime(); auto it = std::find_if(events.cbegin(), events.cend(), [](const Event &e) { return e.which == cereal::Event::Which::INIT_DATA; }); if (it != events.cend()) { @@ -299,6 +197,7 @@ void Replay::startStream(const Segment *cur_segment) { capnp::FlatArrayMessageReader reader(it->data); auto event = reader.getRoot(); car_fingerprint_ = event.getCarParams().getCarFingerprint(); + capnp::MallocMessageBuilder builder; builder.setRoot(event.getCarParams()); auto words = capnp::messageToFlatArray(builder); @@ -320,26 +219,18 @@ void Replay::startStream(const Segment *cur_segment) { camera_server_ = std::make_unique(camera_size); } - emit segmentsMerged(); - - timeline_.initialize(*route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE), - [this](std::shared_ptr log) { - notifyEvent(onQLogLoaded, log); - }); - // start stream thread - stream_thread_ = new QThread(); - QObject::connect(stream_thread_, &QThread::started, [=]() { streamThread(); }); - stream_thread_->start(); + timeline_.initialize(seg_mgr_->route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE), + [this](std::shared_ptr log) { notifyEvent(onQLogLoaded, log); }); - emit streamStarted(); + stream_thread_ = std::thread(&Replay::streamThread, this); } void Replay::publishMessage(const Event *e) { - if (event_filter && event_filter(e, filter_opaque)) return; + if (event_filter_ && event_filter_(e)) return; - if (sm == nullptr) { + if (!sm_) { auto bytes = e->data.asBytes(); - int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size()); + int ret = pm_->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size()); if (ret == -1) { rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]); sockets_[e->which] = nullptr; @@ -347,7 +238,7 @@ void Replay::publishMessage(const Event *e) { } else { capnp::FlatArrayMessageReader reader(e->data); auto event = reader.getRoot(); - sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}}); + sm_->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}}); } } @@ -363,9 +254,9 @@ void Replay::publishFrame(const Event *e) { if ((cam == DriverCam && !hasFlag(REPLAY_FLAG_DCAM)) || (cam == WideRoadCam && !hasFlag(REPLAY_FLAG_ECAM))) return; // Camera isdisabled - if (isSegmentMerged(e->eidx_segnum)) { - auto &segment = segments_.at(e->eidx_segnum); - if (auto &frame = segment->frames[cam]; frame) { + auto seg_it = event_data_->segments.find(e->eidx_segnum); + if (seg_it != event_data_->segments.end()) { + if (auto &frame = seg_it->second->frames[cam]; frame) { camera_server_->pushFrame(cam, frame.get(), e); } } @@ -377,32 +268,33 @@ void Replay::streamThread() { std::unique_lock lk(stream_lock_); while (true) { - stream_cv_.wait(lk, [=]() { return exit_ || ( events_ready_ && !paused_); }); + stream_cv_.wait(lk, [this]() { return exit_ || (events_ready_ && !interrupt_requested_); }); if (exit_) break; - Event event(cur_which, cur_mono_time_, {}); - auto first = std::upper_bound(events_.cbegin(), events_.cend(), event); - if (first == events_.cend()) { + const auto &events = event_data_->events; + auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which, cur_mono_time_, {})); + if (first == events.cend()) { rInfo("waiting for events..."); events_ready_ = false; continue; } - auto it = publishEvents(first, events_.cend()); + auto it = publishEvents(first, events.cend()); // Ensure frames are sent before unlocking to prevent race conditions if (camera_server_) { camera_server_->waitForSent(); } - if (it != events_.cend()) { + if (it != events.cend()) { cur_which = it->which; } else if (!hasFlag(REPLAY_FLAG_NO_LOOP)) { - // Check for loop end and restart if necessary - int last_segment = segments_.rbegin()->first; - if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) { + int last_segment = seg_mgr_->route_.segments().rbegin()->first; + if (event_data_->isSegmentLoaded(last_segment)) { rInfo("reaches the end of route, restart from beginning"); - QMetaObject::invokeMethod(this, std::bind(&Replay::seekTo, this, minSeconds(), false), Qt::QueuedConnection); + stream_lock_.unlock(); + seekTo(minSeconds(), false); + stream_lock_.lock(); } } } @@ -414,16 +306,16 @@ std::vector::const_iterator Replay::publishEvents(std::vector::con uint64_t loop_start_ts = nanos_since_boot(); double prev_replay_speed = speed_; - for (; !paused_ && first != last; ++first) { + for (; !interrupt_requested_ && first != last; ++first) { const Event &evt = *first; int segment = toSeconds(evt.mono_time) / 60; if (current_segment_ != segment) { current_segment_ = segment; - QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection); + seg_mgr_->setCurrentSegment(current_segment_); } - // Skip events if socket is not present + // Skip events if socket is not present if (!sockets_[evt.which]) continue; cur_mono_time_ = evt.mono_time; @@ -438,10 +330,10 @@ std::vector::const_iterator Replay::publishEvents(std::vector::con loop_start_ts = current_nanos; prev_replay_speed = speed_; } else if (time_diff > 0) { - precise_nano_sleep(time_diff, paused_); + precise_nano_sleep(time_diff, interrupt_requested_); } - if (paused_) break; + if (interrupt_requested_) break; if (evt.eidx_segnum == -1) { publishMessage(&evt); diff --git a/tools/replay/replay.h b/tools/replay/replay.h index 61c538c0a7..5169e88629 100644 --- a/tools/replay/replay.h +++ b/tools/replay/replay.h @@ -1,25 +1,18 @@ #pragma once -#include -#include +#include #include +#include #include -#include #include #include -#include - -#include #include "tools/replay/camera.h" -#include "tools/replay/route.h" +#include "tools/replay/seg_mgr.h" #include "tools/replay/timeline.h" #define DEMO_ROUTE "a2a0ccea32023010|2023-07-27--13-01-19" -// one segment uses about 100M of memory -constexpr int MIN_SEGMENTS_CACHE = 5; - enum REPLAY_FLAGS { REPLAY_FLAG_NONE = 0x0000, REPLAY_FLAG_DCAM = 0x0002, @@ -32,111 +25,85 @@ enum REPLAY_FLAGS { REPLAY_FLAG_ALL_SERVICES = 0x0800, }; -typedef bool (*replayEventFilter)(const Event *, void *); -typedef std::map> SegmentMap; - -class Replay : public QObject { - Q_OBJECT - +class Replay { public: Replay(const std::string &route, std::vector allow, std::vector block, SubMaster *sm = nullptr, - uint32_t flags = REPLAY_FLAG_NONE, const std::string &data_dir = "", QObject *parent = 0); + uint32_t flags = REPLAY_FLAG_NONE, const std::string &data_dir = ""); ~Replay(); bool load(); - RouteLoadError lastRouteError() const { return route_->lastError(); } - void start(int seconds = 0); - void stop(); + RouteLoadError lastRouteError() const { return route().lastError(); } + void start(int seconds = 0) { seekTo(min_seconds_ + seconds, false); } void pause(bool pause); void seekToFlag(FindFlag flag); void seekTo(double seconds, bool relative); inline bool isPaused() const { return user_paused_; } - // the filter is called in streaming thread.try to return quickly from it to avoid blocking streaming. - // the filter function must return true if the event should be filtered. - // otherwise it must return false. - inline void installEventFilter(replayEventFilter filter, void *opaque) { - filter_opaque = opaque; - event_filter = filter; - } - inline int segmentCacheLimit() const { return segment_cache_limit; } - inline void setSegmentCacheLimit(int n) { segment_cache_limit = std::max(MIN_SEGMENTS_CACHE, n); } + inline int segmentCacheLimit() const { return seg_mgr_->segment_cache_limit_; } + inline void setSegmentCacheLimit(int n) { seg_mgr_->segment_cache_limit_ = std::max(MIN_SEGMENTS_CACHE, n); } inline bool hasFlag(REPLAY_FLAGS flag) const { return flags_ & flag; } void setLoop(bool loop) { loop ? flags_ &= ~REPLAY_FLAG_NO_LOOP : flags_ |= REPLAY_FLAG_NO_LOOP; } bool loop() const { return !(flags_ & REPLAY_FLAG_NO_LOOP); } - inline const Route* route() const { return route_.get(); } + const Route &route() const { return seg_mgr_->route_; } inline double currentSeconds() const { return double(cur_mono_time_ - route_start_ts_) / 1e9; } inline std::time_t routeDateTime() const { return route_date_time_; } inline uint64_t routeStartNanos() const { return route_start_ts_; } inline double toSeconds(uint64_t mono_time) const { return (mono_time - route_start_ts_) / 1e9; } - inline double minSeconds() const { return !segments_.empty() ? segments_.begin()->first * 60 : 0; } + inline double minSeconds() const { return min_seconds_; } inline double maxSeconds() const { return max_seconds_; } inline void setSpeed(float speed) { speed_ = speed; } inline float getSpeed() const { return speed_; } - inline const SegmentMap &segments() const { return segments_; } inline const std::string &carFingerprint() const { return car_fingerprint_; } inline const std::shared_ptr> getTimeline() const { return timeline_.getEntries(); } inline const std::optional findAlertAtTime(double sec) const { return timeline_.findAlertAtTime(sec); } + const std::shared_ptr getEventData() const { return event_data_; } + void installEventFilter(std::function filter) { event_filter_ = filter; } + void resumeStream() { interruptStream([]() { return true; }); } // Event callback functions + std::function onSegmentsMerged = nullptr; + std::function onSeeking = nullptr; + std::function onSeekedTo = nullptr; std::function)> onQLogLoaded = nullptr; - -signals: - void streamStarted(); - void segmentsMerged(); - void seeking(double sec); - void seekedTo(double sec); - void minMaxTimeChanged(double min_sec, double max_sec); - -protected: - std::optional find(FindFlag flag); - void pauseStreamThread(); - void startStream(const Segment *cur_segment); +private: + void setupServices(const std::vector &allow, const std::vector &block); + void setupSegmentManager(bool has_filters); + void startStream(); void streamThread(); - void updateSegmentsCache(); - void loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end); - void segmentLoadFinished(int seg_num, bool success); - void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end); - void updateEvents(const std::function& update_events_function); + void handleSegmentMerge(); + void interruptStream(const std::function& update_fn); std::vector::const_iterator publishEvents(std::vector::const_iterator first, std::vector::const_iterator last); void publishMessage(const Event *e); void publishFrame(const Event *e); - void checkSeekProgress(); - inline bool isSegmentMerged(int n) const { return merged_segments_.count(n) > 0; } + void checkSeekProgress(double seeked_to_sec); + std::unique_ptr seg_mgr_; Timeline timeline_; pthread_t stream_thread_id = 0; - QThread *stream_thread_ = nullptr; + std::thread stream_thread_; std::mutex stream_lock_; bool user_paused_ = false; std::condition_variable stream_cv_; - std::atomic current_segment_ = 0; + int current_segment_ = 0; std::optional seeking_to_; - SegmentMap segments_; - // the following variables must be protected with stream_lock_ std::atomic exit_ = false; - std::atomic paused_ = false; + std::atomic interrupt_requested_ = false; bool events_ready_ = false; std::time_t route_date_time_; uint64_t route_start_ts_ = 0; std::atomic cur_mono_time_ = 0; - std::atomic max_seconds_ = 0; - std::vector events_; - std::set merged_segments_; - - // messaging - SubMaster *sm = nullptr; - std::unique_ptr pm; + double min_seconds_ = 0; + double max_seconds_ = 0; + SubMaster *sm_ = nullptr; + std::unique_ptr pm_; std::vector sockets_; - std::vector filters_; - std::unique_ptr route_; std::unique_ptr camera_server_; std::atomic flags_ = REPLAY_FLAG_NONE; std::string car_fingerprint_; std::atomic speed_ = 1.0; - replayEventFilter event_filter = nullptr; - void *filter_opaque = nullptr; - int segment_cache_limit = MIN_SEGMENTS_CACHE; + std::function event_filter_ = nullptr; + + std::shared_ptr event_data_ = std::make_shared(); }; diff --git a/tools/replay/route.cc b/tools/replay/route.cc index 9306b9fb07..7731d0daf4 100644 --- a/tools/replay/route.cc +++ b/tools/replay/route.cc @@ -159,7 +159,7 @@ void Route::addFileToSegment(int n, const std::string &file) { Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector &filters, std::function callback) - : seg_num(n), flags(flags), filters_(filters), onLoadFinished_(callback) { + : seg_num(n), flags(flags), filters_(filters), on_load_finished_(callback) { // [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog const std::array file_list = { (flags & REPLAY_FLAG_QCAMERA) || files.road_cam.empty() ? files.qcamera : files.road_cam, @@ -178,7 +178,7 @@ Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vec Segment::~Segment() { { std::lock_guard lock(mutex_); - onLoadFinished_ = nullptr; // Prevent callback after destruction + on_load_finished_ = nullptr; // Prevent callback after destruction } abort_ = true; for (auto &thread : threads_) { @@ -204,8 +204,14 @@ void Segment::loadFile(int id, const std::string file) { if (--loading_ == 0) { std::lock_guard lock(mutex_); - if (onLoadFinished_) { - onLoadFinished_(seg_num, !abort_); + load_state_ = !abort_ ? LoadState::Loaded : LoadState::Failed; + if (on_load_finished_) { + on_load_finished_(seg_num, !abort_); } } } + +Segment::LoadState Segment::getState() { + std::scoped_lock lock(mutex_); + return load_state_; +} diff --git a/tools/replay/route.h b/tools/replay/route.h index c2c7af6bc7..1806be5afa 100644 --- a/tools/replay/route.h +++ b/tools/replay/route.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -64,10 +65,12 @@ protected: class Segment { public: + enum class LoadState {Loading, Loaded, Failed}; + Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector &filters, std::function callback); ~Segment(); - inline bool isLoaded() const { return !loading_ && !abort_; } + LoadState getState(); const int seg_num = 0; std::unique_ptr log; @@ -80,7 +83,8 @@ protected: std::atomic loading_ = 0; std::mutex mutex_; std::vector threads_; - std::function onLoadFinished_ = nullptr; + std::function on_load_finished_ = nullptr; uint32_t flags; std::vector filters_; + LoadState load_state_ = LoadState::Loading; }; diff --git a/tools/replay/seg_mgr.cc b/tools/replay/seg_mgr.cc new file mode 100644 index 0000000000..315356b833 --- /dev/null +++ b/tools/replay/seg_mgr.cc @@ -0,0 +1,133 @@ +#include "tools/replay/seg_mgr.h" + +SegmentManager::~SegmentManager() { + { + std::unique_lock lock(mutex_); + exit_ = true; + onSegmentMergedCallback_ = nullptr; + } + cv_.notify_one(); + if (thread_.joinable()) thread_.join(); +} + +bool SegmentManager::load() { + if (!route_.load()) { + rError("failed to load route: %s", route_.name().c_str()); + return false; + } + + for (const auto &[n, file] : route_.segments()) { + if (!file.rlog.empty() || !file.qlog.empty()) { + segments_.insert({n, nullptr}); + } + } + + if (segments_.empty()) { + rInfo("no valid segments in route: %s", route_.name().c_str()); + return false; + } + + rInfo("loaded route %s with %zu valid segments", route_.name().c_str(), segments_.size()); + thread_ = std::thread(&SegmentManager::manageSegmentCache, this); + return true; +} + +void SegmentManager::setCurrentSegment(int seg_num) { + { + std::unique_lock lock(mutex_); + cur_seg_num_ = seg_num; + needs_update_ = true; + } + cv_.notify_one(); +} + +void SegmentManager::manageSegmentCache() { + while (true) { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this]() { return exit_ || needs_update_; }); + if (exit_) break; + + needs_update_ = false; + auto cur = segments_.lower_bound(cur_seg_num_); + if (cur == segments_.end()) continue; + + // Calculate the range of segments to load + auto begin = std::prev(cur, std::min(segment_cache_limit_ / 2, std::distance(segments_.begin(), cur))); + auto end = std::next(begin, std::min(segment_cache_limit_, std::distance(begin, segments_.end()))); + begin = std::prev(end, std::min(segment_cache_limit_, std::distance(segments_.begin(), end))); + + loadSegmentsInRange(begin, cur, end); + bool merged = mergeSegments(begin, end); + + // Free segments outside the current range + std::for_each(segments_.begin(), begin, [](auto &segment) { segment.second.reset(); }); + std::for_each(end, segments_.end(), [](auto &segment) { segment.second.reset(); }); + + lock.unlock(); + + if (merged && onSegmentMergedCallback_) { + onSegmentMergedCallback_(); // Notify listener that segments have been merged + } + } +} + +bool SegmentManager::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { + std::set segments_to_merge; + size_t total_event_count = 0; + for (auto it = begin; it != end; ++it) { + const auto &segment = it->second; + if (segment && segment->getState() == Segment::LoadState::Loaded) { + segments_to_merge.insert(segment->seg_num); + total_event_count += segment->log->events.size(); + } + } + + if (segments_to_merge == merged_segments_) return false; + + auto merged_event_data = std::make_shared(); + auto &merged_events = merged_event_data->events; + merged_events.reserve(total_event_count); + + rDebug("merging segments: %s", join(segments_to_merge, ", ").c_str()); + for (int n : segments_to_merge) { + const auto &events = segments_.at(n)->log->events; + if (events.empty()) continue; + + // Skip INIT_DATA if present + auto events_begin = (events.front().which == cereal::Event::Which::INIT_DATA) ? std::next(events.begin()) : events.begin(); + + size_t previous_size = merged_events.size(); + merged_events.insert(merged_events.end(), events_begin, events.end()); + std::inplace_merge(merged_events.begin(), merged_events.begin() + previous_size, merged_events.end()); + + merged_event_data->segments[n] = segments_.at(n); + } + + event_data_ = merged_event_data; + merged_segments_ = segments_to_merge; + + return true; +} + +void SegmentManager::loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) { + auto tryLoadSegment = [this](auto first, auto last) { + for (auto it = first; it != last; ++it) { + auto &segment_ptr = it->second; + if (!segment_ptr) { + segment_ptr = std::make_shared( + it->first, route_.at(it->first), flags_, filters_, + [this](int seg_num, bool success) { setCurrentSegment(cur_seg_num_); }); + } + + if (segment_ptr->getState() == Segment::LoadState::Loading) { + return true; // Segment is still loading + } + } + return false; // No segments need loading + }; + + // Try forward loading, then reverse if necessary + if (!tryLoadSegment(cur, end)) { + tryLoadSegment(std::make_reverse_iterator(cur), std::make_reverse_iterator(begin)); + } +} diff --git a/tools/replay/seg_mgr.h b/tools/replay/seg_mgr.h new file mode 100644 index 0000000000..efb3d7f0ea --- /dev/null +++ b/tools/replay/seg_mgr.h @@ -0,0 +1,56 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "tools/replay/route.h" + +constexpr int MIN_SEGMENTS_CACHE = 5; + +using SegmentMap = std::map>; + +class SegmentManager { +public: + struct EventData { + std::vector events; // Events extracted from the segments + SegmentMap segments; // Associated segments that contributed to these events + bool isSegmentLoaded(int n) const { return segments.find(n) != segments.end(); } + }; + + SegmentManager(const std::string &route_name, uint32_t flags, const std::string &data_dir = "") + : flags_(flags), route_(route_name, data_dir) {}; + ~SegmentManager(); + + bool load(); + void setCurrentSegment(int seg_num); + void setCallback(const std::function &callback) { onSegmentMergedCallback_ = callback; } + void setFilters(const std::vector &filters) { filters_ = filters; } + const std::shared_ptr getEventData() const { return event_data_; } + bool hasSegment(int n) const { return segments_.find(n) != segments_.end(); } + + Route route_; + int segment_cache_limit_ = MIN_SEGMENTS_CACHE; + +private: + void manageSegmentCache(); + void loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end); + bool mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end); + + std::vector filters_; + uint32_t flags_; + + std::mutex mutex_; + std::condition_variable cv_; + std::thread thread_; + std::atomic cur_seg_num_ = -1; + bool needs_update_ = false; + bool exit_ = false; + + SegmentMap segments_; + std::shared_ptr event_data_; + std::function onSegmentMergedCallback_ = nullptr; + std::set merged_segments_; +}; diff --git a/tools/replay/tests/test_replay.cc b/tools/replay/tests/test_replay.cc index 6b366169bb..aed3de59a8 100644 --- a/tools/replay/tests/test_replay.cc +++ b/tools/replay/tests/test_replay.cc @@ -1,27 +1,8 @@ -#include -#include - -#include - +#define CATCH_CONFIG_MAIN #include "catch2/catch.hpp" -#include "common/util.h" #include "tools/replay/replay.h" -#include "tools/replay/util.h" const std::string TEST_RLOG_URL = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2"; -const std::string TEST_RLOG_CHECKSUM = "5b966d4bb21a100a8c4e59195faeb741b975ccbe268211765efd1763d892bfb3"; - -const int TEST_REPLAY_SEGMENTS = std::getenv("TEST_REPLAY_SEGMENTS") ? atoi(std::getenv("TEST_REPLAY_SEGMENTS")) : 1; - -bool download_to_file(const std::string &url, const std::string &local_file, int chunk_size = 5 * 1024 * 1024, int retries = 3) { - do { - if (httpDownload(url, local_file, chunk_size)) { - return true; - } - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - } while (--retries >= 0); - return false; -} TEST_CASE("LogReader") { SECTION("corrupt log") { @@ -34,67 +15,3 @@ TEST_CASE("LogReader") { REQUIRE(log.events.size() > 0); } } - -void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) { - std::mutex mutex; - std::condition_variable cv; - Segment segment(n, segment_file, flags, {}, [&](int, bool) { - REQUIRE(segment.isLoaded() == true); - REQUIRE(segment.log != nullptr); - REQUIRE(segment.frames[RoadCam] != nullptr); - if (flags & REPLAY_FLAG_DCAM) { - REQUIRE(segment.frames[DriverCam] != nullptr); - } - if (flags & REPLAY_FLAG_ECAM) { - REQUIRE(segment.frames[WideRoadCam] != nullptr); - } - - // test LogReader & FrameReader - REQUIRE(segment.log->events.size() > 0); - REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end())); - - for (auto cam : ALL_CAMERAS) { - auto &fr = segment.frames[cam]; - if (!fr) continue; - - if (cam == RoadCam || cam == WideRoadCam) { - REQUIRE(fr->getFrameCount() == 1200); - } - auto [nv12_width, nv12_height, nv12_buffer_size] = get_nv12_info(fr->width, fr->height); - VisionBuf buf; - buf.allocate(nv12_buffer_size); - buf.init_yuv(fr->width, fr->height, nv12_width, nv12_width * nv12_height); - // sequence get 100 frames - for (int i = 0; i < 100; ++i) { - REQUIRE(fr->get(i, &buf)); - } - } - cv.notify_one(); - }); - - std::unique_lock lock(mutex); - cv.wait(lock); -} - -std::string download_demo_route() { - static std::string data_dir; - - if (data_dir == "") { - char tmp_path[] = "/tmp/root_XXXXXX"; - data_dir = mkdtemp(tmp_path); - - Route remote_route(DEMO_ROUTE); - assert(remote_route.load()); - - // Create a local route from remote for testing - const std::string route_name = std::string(DEMO_ROUTE).substr(17); - for (int i = 0; i < 2; ++i) { - std::string log_path = util::string_format("%s/%s--%d/", data_dir.c_str(), route_name.c_str(), i); - util::create_directories(log_path, 0755); - REQUIRE(download_to_file(remote_route.at(i).rlog, log_path + "rlog.bz2")); - REQUIRE(download_to_file(remote_route.at(i).qcamera, log_path + "qcamera.ts")); - } - } - - return data_dir; -} diff --git a/tools/replay/tests/test_runner.cc b/tools/replay/tests/test_runner.cc deleted file mode 100644 index b20ac86c64..0000000000 --- a/tools/replay/tests/test_runner.cc +++ /dev/null @@ -1,10 +0,0 @@ -#define CATCH_CONFIG_RUNNER -#include "catch2/catch.hpp" -#include - -int main(int argc, char **argv) { - // unit tests for Qt - QCoreApplication app(argc, argv); - const int res = Catch::Session().run(argc, argv); - return (res < 0xff ? res : 0xff); -} diff --git a/tools/replay/util.cc b/tools/replay/util.cc index fac1e11c47..94cea961ff 100644 --- a/tools/replay/util.cc +++ b/tools/replay/util.cc @@ -362,11 +362,11 @@ std::string decompressZST(const std::byte *in, size_t in_size, std::atomic return {}; } -void precise_nano_sleep(int64_t nanoseconds, std::atomic &should_exit) { +void precise_nano_sleep(int64_t nanoseconds, std::atomic &interrupt_requested) { struct timespec req, rem; req.tv_sec = nanoseconds / 1000000000; req.tv_nsec = nanoseconds % 1000000000; - while (!should_exit) { + while (!interrupt_requested) { #ifdef __APPLE__ int ret = nanosleep(&req, &rem); if (ret == 0 || errno != EINTR) diff --git a/tools/replay/util.h b/tools/replay/util.h index 46df2bc191..1f61951d21 100644 --- a/tools/replay/util.h +++ b/tools/replay/util.h @@ -47,7 +47,7 @@ private: }; std::string sha256(const std::string &str); -void precise_nano_sleep(int64_t nanoseconds, std::atomic &should_exit); +void precise_nano_sleep(int64_t nanoseconds, std::atomic &interrupt_requested); std::string decompressBZ2(const std::string &in, std::atomic *abort = nullptr); std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic *abort = nullptr); std::string decompressZST(const std::string &in, std::atomic *abort = nullptr);