replay: eliminate qt dependency (#34102)

refactor to remove qt dependency and module Replay classes
pull/34106/head
Dean Lee 5 months ago committed by GitHub
parent a58853e70e
commit 3c765a1f45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      tools/cabana/streams/abstractstream.cc
  2. 2
      tools/cabana/streams/abstractstream.h
  3. 19
      tools/cabana/streams/replaystream.cc
  4. 3
      tools/cabana/streams/replaystream.h
  5. 5
      tools/cabana/videowidget.cc
  6. 18
      tools/replay/SConscript
  7. 12
      tools/replay/consoleui.cc
  8. 14
      tools/replay/main.cc
  9. 332
      tools/replay/replay.cc
  10. 103
      tools/replay/replay.h
  11. 14
      tools/replay/route.cc
  12. 8
      tools/replay/route.h
  13. 133
      tools/replay/seg_mgr.cc
  14. 56
      tools/replay/seg_mgr.h
  15. 85
      tools/replay/tests/test_replay.cc
  16. 10
      tools/replay/tests/test_runner.cc
  17. 4
      tools/replay/util.cc
  18. 2
      tools/replay/util.h

@ -126,9 +126,8 @@ const CanData &AbstractStream::lastMessage(const MessageId &id) const {
return it != last_msgs.end() ? it->second : empty_data;
}
// it is thread safe to update data in updateLastMsgsTo.
// updateLastMsgsTo is always called in UI thread.
void AbstractStream::updateLastMsgsTo(double sec) {
std::lock_guard lk(mutex_);
current_sec_ = sec;
uint64_t last_ts = toMonoTime(sec);
std::unordered_map<MessageId, CanData> msgs;
@ -160,7 +159,10 @@ void AbstractStream::updateLastMsgsTo(double sec) {
std::any_of(messages_.cbegin(), messages_.cend(),
[this](const auto &m) { return !last_msgs.count(m.first); });
last_msgs = messages_;
mutex_.unlock();
emit msgsReceived(nullptr, id_changed);
resumeStream();
}
const CanEvent *AbstractStream::newEvent(uint64_t mono_time, const cereal::CanData::Reader &c) {

@ -108,7 +108,7 @@ protected:
void mergeEvents(const std::vector<const CanEvent *> &events);
const CanEvent *newEvent(uint64_t mono_time, const cereal::CanData::Reader &c);
void updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size);
virtual void resumeStream() {}
std::vector<const CanEvent *> all_events_;
double current_sec_ = 0;
std::optional<std::pair<double, double>> time_range_;

@ -23,13 +23,10 @@ ReplayStream::ReplayStream(QObject *parent) : AbstractStream(parent) {
});
}
static bool event_filter(const Event *e, void *opaque) {
return ((ReplayStream *)opaque)->eventFilter(e);
}
void ReplayStream::mergeSegments() {
for (auto &[n, seg] : replay->segments()) {
if (seg && seg->isLoaded() && !processed_segments.count(n)) {
auto event_data = replay->getEventData();
for (const auto &[n, seg] : event_data->segments) {
if (!processed_segments.count(n)) {
processed_segments.insert(n);
std::vector<const CanEvent *> new_events;
@ -50,16 +47,16 @@ void ReplayStream::mergeSegments() {
bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint32_t replay_flags) {
replay.reset(new Replay(route.toStdString(), {"can", "roadEncodeIdx", "driverEncodeIdx", "wideRoadEncodeIdx", "carParams"},
{}, nullptr, replay_flags, data_dir.toStdString(), this));
{}, nullptr, replay_flags, data_dir.toStdString()));
replay->setSegmentCacheLimit(settings.max_cached_minutes);
replay->installEventFilter(event_filter, this);
replay->installEventFilter([this](const Event *event) { return eventFilter(event); });
// Forward replay callbacks to corresponding Qt signals.
replay->onSeeking = [this](double sec) { emit seeking(sec); };
replay->onSeekedTo = [this](double sec) { emit seekedTo(sec); };
replay->onQLogLoaded = [this](std::shared_ptr<LogReader> qlog) { emit qLogLoaded(qlog); };
replay->onSegmentsMerged = [this]() { QMetaObject::invokeMethod(this, &ReplayStream::mergeSegments, Qt::QueuedConnection); };
QObject::connect(replay.get(), &Replay::seeking, this, &AbstractStream::seeking);
QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo);
QObject::connect(replay.get(), &Replay::segmentsMerged, this, &ReplayStream::mergeSegments);
bool success = replay->load();
if (!success) {
if (replay->lastRouteError() == RouteLoadError::Unauthorized) {

@ -22,7 +22,7 @@ public:
bool eventFilter(const Event *event);
void seekTo(double ts) override { replay->seekTo(std::max(double(0), ts), false); }
bool liveStreaming() const override { return false; }
inline QString routeName() const override { return QString::fromStdString(replay->route()->name()); }
inline QString routeName() const override { return QString::fromStdString(replay->route().name()); }
inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); }
double minSeconds() const override { return replay->minSeconds(); }
double maxSeconds() const { return replay->maxSeconds(); }
@ -32,6 +32,7 @@ public:
inline float getSpeed() const { return replay->getSpeed(); }
inline Replay *getReplay() const { return replay.get(); }
inline bool isPaused() const override { return replay->isPaused(); }
void resumeStream() override { return replay->resumeStream(); }
void pause(bool pause) override;
signals:

@ -247,8 +247,9 @@ void Slider::paintEvent(QPaintEvent *ev) {
QColor empty_color = palette().color(QPalette::Window);
empty_color.setAlpha(160);
for (const auto &[n, seg] : replay->segments()) {
if (!(seg && seg->isLoaded()))
const auto event_data = replay->getEventData();
for (const auto &[n, _] : replay->route().segments()) {
if (!event_data->isSegmentLoaded(n))
fillRange(n * 60.0, (n + 1) * 60.0, empty_color);
}
}

@ -1,8 +1,10 @@
Import('env', 'qt_env', 'arch', 'common', 'messaging', 'visionipc', 'cereal')
Import('env', 'arch', 'common', 'messaging', 'visionipc', 'cereal')
base_frameworks = qt_env['FRAMEWORKS']
base_libs = [common, messaging, cereal, visionipc,
'm', 'ssl', 'crypto', 'pthread', 'qt_util'] + qt_env["LIBS"]
replay_env = env.Clone()
replay_env['CCFLAGS'] += ['-Wno-deprecated-declarations']
base_frameworks = []
base_libs = [common, messaging, cereal, visionipc, 'm', 'ssl', 'crypto', 'pthread']
if arch == "Darwin":
base_frameworks.append('OpenCL')
@ -10,11 +12,11 @@ else:
base_libs.append('OpenCL')
replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc",
"route.cc", "util.cc", "timeline.cc", "api.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks)
"route.cc", "util.cc", "seg_mgr.cc", "timeline.cc", "api.cc"]
replay_lib = replay_env.Library("replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks)
Export('replay_lib')
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'zstd', 'curl', 'yuv', 'ncurses'] + base_libs
qt_env.Program("replay", ["main.cc"], LIBS=replay_libs, FRAMEWORKS=base_frameworks)
replay_env.Program("replay", ["main.cc"], LIBS=replay_libs, FRAMEWORKS=base_frameworks)
if GetOption('extras'):
qt_env.Program('tests/test_replay', ['tests/test_runner.cc', 'tests/test_replay.cc'], LIBS=[replay_libs, base_libs])
replay_env.Program('tests/test_replay', ['tests/test_replay.cc'], LIBS=replay_libs)

@ -6,8 +6,6 @@
#include <tuple>
#include <utility>
#include <QApplication>
#include "common/ratekeeper.h"
#include "common/util.h"
#include "common/version.h"
@ -57,6 +55,8 @@ void add_str(WINDOW *w, const char *str, Color color = Color::Default, bool bold
if (color != Color::Default) wattroff(w, COLOR_PAIR(color));
}
ExitHandler do_exit;
} // namespace
ConsoleUI::ConsoleUI(Replay *replay) : replay(replay), sm({"carState", "liveParameters"}) {
@ -95,6 +95,8 @@ ConsoleUI::ConsoleUI(Replay *replay) : replay(replay), sm({"carState", "livePara
}
ConsoleUI::~ConsoleUI() {
installDownloadProgressHandler(nullptr);
installMessageHandler(nullptr);
endwin();
}
@ -233,7 +235,7 @@ void ConsoleUI::updateProgressBar() {
void ConsoleUI::updateSummary() {
const auto &route = replay->route();
mvwprintw(w[Win::Stats], 0, 0, "Route: %s, %lu segments", route->name().c_str(), route->segments().size());
mvwprintw(w[Win::Stats], 0, 0, "Route: %s, %lu segments", route.name().c_str(), route.segments().size());
mvwprintw(w[Win::Stats], 1, 0, "Car Fingerprint: %s", replay->carFingerprint().c_str());
wrefresh(w[Win::Stats]);
}
@ -349,7 +351,8 @@ void ConsoleUI::handleKey(char c) {
int ConsoleUI::exec() {
RateKeeper rk("Replay", 20);
while (true) {
while (!do_exit) {
int c = getch();
if (c == 'q' || c == 'Q') {
break;
@ -373,7 +376,6 @@ int ConsoleUI::exec() {
logs.clear();
}
qApp->processEvents();
rk.keepTime();
}
return 0;

@ -1,6 +1,5 @@
#include <getopt.h>
#include <QApplication>
#include <iostream>
#include <map>
#include <string>
@ -126,7 +125,6 @@ int main(int argc, char *argv[]) {
util::set_file_descriptor_limit(1024);
#endif
QCoreApplication app(argc, argv);
ReplayConfig config;
if (!parseArgs(argc, argv, config)) {
@ -138,18 +136,18 @@ int main(int argc, char *argv[]) {
op_prefix = std::make_unique<OpenpilotPrefix>(config.prefix);
}
Replay *replay = new Replay(config.route, config.allow, config.block, nullptr, config.flags, config.data_dir, &app);
Replay replay(config.route, config.allow, config.block, nullptr, config.flags, config.data_dir);
if (config.cache_segments > 0) {
replay->setSegmentCacheLimit(config.cache_segments);
replay.setSegmentCacheLimit(config.cache_segments);
}
if (config.playback_speed > 0) {
replay->setSpeed(std::clamp(config.playback_speed, ConsoleUI::speed_array.front(), ConsoleUI::speed_array.back()));
replay.setSpeed(std::clamp(config.playback_speed, ConsoleUI::speed_array.front(), ConsoleUI::speed_array.back()));
}
if (!replay->load()) {
if (!replay.load()) {
return 1;
}
ConsoleUI console_ui(replay);
replay->start(config.start_seconds);
ConsoleUI console_ui(&replay);
replay.start(config.start_seconds);
return console_ui.exec();
}

@ -4,7 +4,6 @@
#include <csignal>
#include "cereal/services.h"
#include "common/params.h"
#include "common/timing.h"
#include "tools/replay/util.h"
static void interrupt_sleep_handler(int signal) {}
@ -12,145 +11,124 @@ static void interrupt_sleep_handler(int signal) {}
// Helper function to notify events with safety checks
template <typename Callback, typename... Args>
void notifyEvent(Callback &callback, Args &&...args) {
if (callback) {
callback(std::forward<Args>(args)...);
}
if (callback) callback(std::forward<Args>(args)...);
}
Replay::Replay(const std::string &route, std::vector<std::string> allow, std::vector<std::string> block, SubMaster *sm_,
uint32_t flags, const std::string &data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) {
// Register signal handler for SIGUSR1
Replay::Replay(const std::string &route, std::vector<std::string> allow, std::vector<std::string> block,
SubMaster *sm, uint32_t flags, const std::string &data_dir)
: sm_(sm), flags_(flags), seg_mgr_(std::make_unique<SegmentManager>(route, flags, data_dir)) {
std::signal(SIGUSR1, interrupt_sleep_handler);
if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) {
block.insert(block.end(), {"uiDebug", "userFlag"});
}
setupServices(allow, block);
setupSegmentManager(!allow.empty() || !block.empty());
}
void Replay::setupServices(const std::vector<std::string> &allow, const std::vector<std::string> &block) {
auto event_schema = capnp::Schema::from<cereal::Event>().asStruct();
sockets_.resize(event_schema.getUnionFields().size());
std::vector<std::string> active_services;
sockets_.resize(event_schema.getUnionFields().size(), nullptr);
std::vector<const char *> active_services;
for (const auto &[name, _] : services) {
bool in_block = std::find(block.begin(), block.end(), name) != block.end();
bool in_allow = std::find(allow.begin(), allow.end(), name) != allow.end();
if (!in_block && (allow.empty() || in_allow)) {
bool is_blocked = std::find(block.begin(), block.end(), name) != block.end();
bool is_allowed = allow.empty() || std::find(allow.begin(), allow.end(), name) != allow.end();
if (is_allowed && !is_blocked) {
uint16_t which = event_schema.getFieldByName(name).getProto().getDiscriminantValue();
sockets_[which] = name.c_str();
active_services.push_back(name);
active_services.push_back(name.c_str());
}
}
if (!allow.empty()) {
for (int i = 0; i < sockets_.size(); ++i) {
filters_.push_back(i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]);
rInfo("active services: %s", join(active_services, ", ").c_str());
if (!sm_) {
pm_ = std::make_unique<PubMaster>(active_services);
}
}
rInfo("active services: %s", join(active_services, ", ").c_str());
rInfo("loading route %s", route.c_str());
void Replay::setupSegmentManager(bool has_filters) {
seg_mgr_->setCallback([this]() { handleSegmentMerge(); });
if (sm == nullptr) {
std::vector<const char *> socket_names;
std::copy_if(sockets_.begin(), sockets_.end(), std::back_inserter(socket_names),
[](const char *name) { return name != nullptr; });
pm = std::make_unique<PubMaster>(socket_names);
if (has_filters) {
std::vector<bool> filters(sockets_.size(), false);
for (size_t i = 0; i < sockets_.size(); ++i) {
filters[i] = (i == cereal::Event::Which::INIT_DATA || i == cereal::Event::Which::CAR_PARAMS || sockets_[i]);
}
route_ = std::make_unique<Route>(route, data_dir);
seg_mgr_->setFilters(filters);
}
Replay::~Replay() {
stop();
}
void Replay::stop() {
exit_ = true;
if (stream_thread_ != nullptr) {
Replay::~Replay() {
seg_mgr_.reset();
if (stream_thread_.joinable()) {
rInfo("shutdown: in progress...");
pauseStreamThread();
stream_cv_.notify_one();
stream_thread_->quit();
stream_thread_->wait();
stream_thread_->deleteLater();
stream_thread_ = nullptr;
interruptStream([this]() {
exit_ = true;
return false;
});
stream_thread_.join();
rInfo("shutdown: done");
}
camera_server_.reset(nullptr);
segments_.clear();
camera_server_.reset();
}
bool Replay::load() {
if (!route_->load()) {
rError("failed to load route %s from %s", route_->name().c_str(),
route_->dir().empty() ? "server" : route_->dir().c_str());
return false;
}
rInfo("loading route %s", seg_mgr_->route_.name().c_str());
if (!seg_mgr_->load()) return false;
for (auto &[n, f] : route_->segments()) {
bool has_log = !f.rlog.empty() || !f.qlog.empty();
bool has_video = !f.road_cam.empty() || !f.qcamera.empty();
if (has_log && (has_video || hasFlag(REPLAY_FLAG_NO_VIPC))) {
segments_.insert({n, nullptr});
}
}
if (segments_.empty()) {
rInfo("no valid segments in route: %s", route_->name().c_str());
return false;
}
rInfo("load route %s with %zu valid segments", route_->name().c_str(), segments_.size());
max_seconds_ = (segments_.rbegin()->first + 1) * 60;
min_seconds_ = seg_mgr_->route_.segments().begin()->first * 60;
max_seconds_ = (seg_mgr_->route_.segments().rbegin()->first + 1) * 60;
return true;
}
void Replay::start(int seconds) {
seekTo(route_->identifier().begin_segment * 60 + seconds, false);
void Replay::interruptStream(const std::function<bool()> &update_fn) {
if (stream_thread_.joinable() && stream_thread_id) {
pthread_kill(stream_thread_id, SIGUSR1); // Interrupt sleep in stream thread
}
void Replay::updateEvents(const std::function<bool()> &update_events_function) {
pauseStreamThread();
{
std::unique_lock lk(stream_lock_);
events_ready_ = update_events_function();
paused_ = user_paused_;
interrupt_requested_ = true;
std::unique_lock lock(stream_lock_);
events_ready_ = update_fn();
interrupt_requested_ = user_paused_;
}
stream_cv_.notify_one();
}
void Replay::seekTo(double seconds, bool relative) {
updateEvents([&]() {
double target_time = relative ? seconds + currentSeconds() : seconds;
target_time = std::max(double(0.0), target_time);
int target_segment = (int)target_time / 60;
if (segments_.count(target_segment) == 0) {
rWarning("Can't seek to %.2f s segment %d is invalid", target_time, target_segment);
return true;
}
if (target_time > max_seconds_) {
rWarning("Can't seek to %.2f s, time is invalid", target_time);
return true;
target_time = std::max(0.0, target_time);
int target_segment = target_time / 60;
if (!seg_mgr_->hasSegment(target_segment)) {
rWarning("Invalid seek to %.2f s (segment %d)", target_time, target_segment);
return;
}
rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment);
notifyEvent(onSeeking, target_time);
double seeked_to_sec = -1;
interruptStream([&]() {
current_segment_ = target_segment;
cur_mono_time_ = route_start_ts_ + target_time * 1e9;
seeking_to_ = target_time;
if (event_data_->isSegmentLoaded(target_segment)) {
seeked_to_sec = *seeking_to_;
seeking_to_.reset();
}
return false;
});
checkSeekProgress();
updateSegmentsCache();
checkSeekProgress(seeked_to_sec);
seg_mgr_->setCurrentSegment(target_segment);
}
void Replay::checkSeekProgress() {
if (seeking_to_) {
auto it = segments_.find(int(*seeking_to_ / 60));
if (it != segments_.end() && it->second && it->second->isLoaded()) {
emit seekedTo(*seeking_to_);
seeking_to_ = std::nullopt;
// wake up stream thread
updateEvents([]() { return true; });
void Replay::checkSeekProgress(double seeked_to_sec) {
if (seeked_to_sec >= 0) {
if (onSeekedTo) {
onSeekedTo(seeked_to_sec);
} else {
// Emit signal indicating the ongoing seek operation
emit seeking(*seeking_to_);
interruptStream([]() { return true; });
}
}
}
@ -163,125 +141,45 @@ void Replay::seekToFlag(FindFlag flag) {
void Replay::pause(bool pause) {
if (user_paused_ != pause) {
pauseStreamThread();
{
std::unique_lock lk(stream_lock_);
interruptStream([=]() {
rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds());
paused_ = user_paused_ = pause;
}
stream_cv_.notify_one();
}
}
void Replay::pauseStreamThread() {
paused_ = true;
// Send SIGUSR1 to interrupt clock_nanosleep
if (stream_thread_ && stream_thread_id) {
pthread_kill(stream_thread_id, SIGUSR1);
}
}
void Replay::segmentLoadFinished(int seg_num, bool success) {
if (!success) {
rWarning("failed to load segment %d, removing it from current replay list", seg_num);
updateEvents([&]() {
segments_.erase(seg_num);
return !segments_.empty();
user_paused_ = pause;
return !pause;
});
}
QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection);
}
void Replay::updateSegmentsCache() {
auto cur = segments_.lower_bound(current_segment_.load());
if (cur == segments_.end()) return;
// Calculate the range of segments to load
auto begin = std::prev(cur, std::min<int>(segment_cache_limit / 2, std::distance(segments_.begin(), cur)));
auto end = std::next(begin, std::min<int>(segment_cache_limit, std::distance(begin, segments_.end())));
begin = std::prev(end, std::min<int>(segment_cache_limit, std::distance(segments_.begin(), end)));
loadSegmentInRange(begin, cur, end);
mergeSegments(begin, end);
void Replay::handleSegmentMerge() {
if (exit_) return;
// free segments out of current semgnt window.
std::for_each(segments_.begin(), begin, [](auto &e) { e.second.reset(nullptr); });
std::for_each(end, segments_.end(), [](auto &e) { e.second.reset(nullptr); });
double seeked_to_sec = -1;
interruptStream([&]() {
event_data_ = seg_mgr_->getEventData();
notifyEvent(onSegmentsMerged);
// start stream thread
const auto &cur_segment = cur->second;
if (stream_thread_ == nullptr && cur_segment->isLoaded()) {
startStream(cur_segment.get());
}
}
void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) {
auto loadNextSegment = [this](auto first, auto last) {
auto it = std::find_if(first, last, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); });
if (it != last && !it->second) {
rDebug("loading segment %d...", it->first);
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_, filters_,
[this](int seg_num, bool success) {
segmentLoadFinished(seg_num, success);
});
return true;
}
bool segment_loaded = event_data_->isSegmentLoaded(current_segment_);
if (seeking_to_ && segment_loaded) {
seeked_to_sec = *seeking_to_;
seeking_to_.reset();
return false;
};
// Try loading forward segments, then reverse segments
if (!loadNextSegment(cur, end)) {
loadNextSegment(std::make_reverse_iterator(cur), std::make_reverse_iterator(begin));
}
}
void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
std::set<int> segments_to_merge;
size_t new_events_size = 0;
for (auto it = begin; it != end; ++it) {
if (it->second && it->second->isLoaded()) {
segments_to_merge.insert(it->first);
new_events_size += it->second->log->events.size();
}
}
if (segments_to_merge == merged_segments_) return;
rDebug("merge segments %s", std::accumulate(segments_to_merge.begin(), segments_to_merge.end(), std::string{},
[](auto & a, int b) { return a + (a.empty() ? "" : ", ") + std::to_string(b); }).c_str());
std::vector<Event> new_events;
new_events.reserve(new_events_size);
// Merge events from segments_to_merge into new_events
for (int n : segments_to_merge) {
size_t size = new_events.size();
const auto &events = segments_.at(n)->log->events;
std::copy_if(events.begin(), events.end(), std::back_inserter(new_events),
[this](const Event &e) { return e.which < sockets_.size() && sockets_[e.which] != nullptr; });
std::inplace_merge(new_events.begin(), new_events.begin() + size, new_events.end());
}
return segment_loaded;
});
if (stream_thread_) {
emit segmentsMerged();
checkSeekProgress(seeked_to_sec);
if (!stream_thread_.joinable() && !event_data_->events.empty()) {
startStream();
}
updateEvents([&]() {
events_.swap(new_events);
merged_segments_ = segments_to_merge;
// Wake up the stream thread if the current segment is loaded or invalid.
return !seeking_to_ && (isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0));
});
checkSeekProgress();
}
void Replay::startStream(const Segment *cur_segment) {
void Replay::startStream() {
const auto &cur_segment = event_data_->segments.begin()->second;
const auto &events = cur_segment->log->events;
route_start_ts_ = events.front().mono_time;
cur_mono_time_ += route_start_ts_ - 1;
// get datetime from INIT_DATA, fallback to datetime in the route name
route_date_time_ = route()->datetime();
route_date_time_ = route().datetime();
auto it = std::find_if(events.cbegin(), events.cend(),
[](const Event &e) { return e.which == cereal::Event::Which::INIT_DATA; });
if (it != events.cend()) {
@ -299,6 +197,7 @@ void Replay::startStream(const Segment *cur_segment) {
capnp::FlatArrayMessageReader reader(it->data);
auto event = reader.getRoot<cereal::Event>();
car_fingerprint_ = event.getCarParams().getCarFingerprint();
capnp::MallocMessageBuilder builder;
builder.setRoot(event.getCarParams());
auto words = capnp::messageToFlatArray(builder);
@ -320,26 +219,18 @@ void Replay::startStream(const Segment *cur_segment) {
camera_server_ = std::make_unique<CameraServer>(camera_size);
}
emit segmentsMerged();
timeline_.initialize(seg_mgr_->route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE),
[this](std::shared_ptr<LogReader> log) { notifyEvent(onQLogLoaded, log); });
timeline_.initialize(*route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE),
[this](std::shared_ptr<LogReader> log) {
notifyEvent(onQLogLoaded, log);
});
// start stream thread
stream_thread_ = new QThread();
QObject::connect(stream_thread_, &QThread::started, [=]() { streamThread(); });
stream_thread_->start();
emit streamStarted();
stream_thread_ = std::thread(&Replay::streamThread, this);
}
void Replay::publishMessage(const Event *e) {
if (event_filter && event_filter(e, filter_opaque)) return;
if (event_filter_ && event_filter_(e)) return;
if (sm == nullptr) {
if (!sm_) {
auto bytes = e->data.asBytes();
int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size());
int ret = pm_->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size());
if (ret == -1) {
rWarning("stop publishing %s due to multiple publishers error", sockets_[e->which]);
sockets_[e->which] = nullptr;
@ -347,7 +238,7 @@ void Replay::publishMessage(const Event *e) {
} else {
capnp::FlatArrayMessageReader reader(e->data);
auto event = reader.getRoot<cereal::Event>();
sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}});
sm_->update_msgs(nanos_since_boot(), {{sockets_[e->which], event}});
}
}
@ -363,9 +254,9 @@ void Replay::publishFrame(const Event *e) {
if ((cam == DriverCam && !hasFlag(REPLAY_FLAG_DCAM)) || (cam == WideRoadCam && !hasFlag(REPLAY_FLAG_ECAM)))
return; // Camera isdisabled
if (isSegmentMerged(e->eidx_segnum)) {
auto &segment = segments_.at(e->eidx_segnum);
if (auto &frame = segment->frames[cam]; frame) {
auto seg_it = event_data_->segments.find(e->eidx_segnum);
if (seg_it != event_data_->segments.end()) {
if (auto &frame = seg_it->second->frames[cam]; frame) {
camera_server_->pushFrame(cam, frame.get(), e);
}
}
@ -377,32 +268,33 @@ void Replay::streamThread() {
std::unique_lock lk(stream_lock_);
while (true) {
stream_cv_.wait(lk, [=]() { return exit_ || ( events_ready_ && !paused_); });
stream_cv_.wait(lk, [this]() { return exit_ || (events_ready_ && !interrupt_requested_); });
if (exit_) break;
Event event(cur_which, cur_mono_time_, {});
auto first = std::upper_bound(events_.cbegin(), events_.cend(), event);
if (first == events_.cend()) {
const auto &events = event_data_->events;
auto first = std::upper_bound(events.cbegin(), events.cend(), Event(cur_which, cur_mono_time_, {}));
if (first == events.cend()) {
rInfo("waiting for events...");
events_ready_ = false;
continue;
}
auto it = publishEvents(first, events_.cend());
auto it = publishEvents(first, events.cend());
// Ensure frames are sent before unlocking to prevent race conditions
if (camera_server_) {
camera_server_->waitForSent();
}
if (it != events_.cend()) {
if (it != events.cend()) {
cur_which = it->which;
} else if (!hasFlag(REPLAY_FLAG_NO_LOOP)) {
// Check for loop end and restart if necessary
int last_segment = segments_.rbegin()->first;
if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) {
int last_segment = seg_mgr_->route_.segments().rbegin()->first;
if (event_data_->isSegmentLoaded(last_segment)) {
rInfo("reaches the end of route, restart from beginning");
QMetaObject::invokeMethod(this, std::bind(&Replay::seekTo, this, minSeconds(), false), Qt::QueuedConnection);
stream_lock_.unlock();
seekTo(minSeconds(), false);
stream_lock_.lock();
}
}
}
@ -414,13 +306,13 @@ std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::con
uint64_t loop_start_ts = nanos_since_boot();
double prev_replay_speed = speed_;
for (; !paused_ && first != last; ++first) {
for (; !interrupt_requested_ && first != last; ++first) {
const Event &evt = *first;
int segment = toSeconds(evt.mono_time) / 60;
if (current_segment_ != segment) {
current_segment_ = segment;
QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection);
seg_mgr_->setCurrentSegment(current_segment_);
}
// Skip events if socket is not present
@ -438,10 +330,10 @@ std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::con
loop_start_ts = current_nanos;
prev_replay_speed = speed_;
} else if (time_diff > 0) {
precise_nano_sleep(time_diff, paused_);
precise_nano_sleep(time_diff, interrupt_requested_);
}
if (paused_) break;
if (interrupt_requested_) break;
if (evt.eidx_segnum == -1) {
publishMessage(&evt);

@ -1,25 +1,18 @@
#pragma once
#include <algorithm>
#include <map>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <set>
#include <string>
#include <vector>
#include <utility>
#include <QThread>
#include "tools/replay/camera.h"
#include "tools/replay/route.h"
#include "tools/replay/seg_mgr.h"
#include "tools/replay/timeline.h"
#define DEMO_ROUTE "a2a0ccea32023010|2023-07-27--13-01-19"
// one segment uses about 100M of memory
constexpr int MIN_SEGMENTS_CACHE = 5;
enum REPLAY_FLAGS {
REPLAY_FLAG_NONE = 0x0000,
REPLAY_FLAG_DCAM = 0x0002,
@ -32,111 +25,85 @@ enum REPLAY_FLAGS {
REPLAY_FLAG_ALL_SERVICES = 0x0800,
};
typedef bool (*replayEventFilter)(const Event *, void *);
typedef std::map<int, std::unique_ptr<Segment>> SegmentMap;
class Replay : public QObject {
Q_OBJECT
class Replay {
public:
Replay(const std::string &route, std::vector<std::string> allow, std::vector<std::string> block, SubMaster *sm = nullptr,
uint32_t flags = REPLAY_FLAG_NONE, const std::string &data_dir = "", QObject *parent = 0);
uint32_t flags = REPLAY_FLAG_NONE, const std::string &data_dir = "");
~Replay();
bool load();
RouteLoadError lastRouteError() const { return route_->lastError(); }
void start(int seconds = 0);
void stop();
RouteLoadError lastRouteError() const { return route().lastError(); }
void start(int seconds = 0) { seekTo(min_seconds_ + seconds, false); }
void pause(bool pause);
void seekToFlag(FindFlag flag);
void seekTo(double seconds, bool relative);
inline bool isPaused() const { return user_paused_; }
// the filter is called in streaming thread.try to return quickly from it to avoid blocking streaming.
// the filter function must return true if the event should be filtered.
// otherwise it must return false.
inline void installEventFilter(replayEventFilter filter, void *opaque) {
filter_opaque = opaque;
event_filter = filter;
}
inline int segmentCacheLimit() const { return segment_cache_limit; }
inline void setSegmentCacheLimit(int n) { segment_cache_limit = std::max(MIN_SEGMENTS_CACHE, n); }
inline int segmentCacheLimit() const { return seg_mgr_->segment_cache_limit_; }
inline void setSegmentCacheLimit(int n) { seg_mgr_->segment_cache_limit_ = std::max(MIN_SEGMENTS_CACHE, n); }
inline bool hasFlag(REPLAY_FLAGS flag) const { return flags_ & flag; }
void setLoop(bool loop) { loop ? flags_ &= ~REPLAY_FLAG_NO_LOOP : flags_ |= REPLAY_FLAG_NO_LOOP; }
bool loop() const { return !(flags_ & REPLAY_FLAG_NO_LOOP); }
inline const Route* route() const { return route_.get(); }
const Route &route() const { return seg_mgr_->route_; }
inline double currentSeconds() const { return double(cur_mono_time_ - route_start_ts_) / 1e9; }
inline std::time_t routeDateTime() const { return route_date_time_; }
inline uint64_t routeStartNanos() const { return route_start_ts_; }
inline double toSeconds(uint64_t mono_time) const { return (mono_time - route_start_ts_) / 1e9; }
inline double minSeconds() const { return !segments_.empty() ? segments_.begin()->first * 60 : 0; }
inline double minSeconds() const { return min_seconds_; }
inline double maxSeconds() const { return max_seconds_; }
inline void setSpeed(float speed) { speed_ = speed; }
inline float getSpeed() const { return speed_; }
inline const SegmentMap &segments() const { return segments_; }
inline const std::string &carFingerprint() const { return car_fingerprint_; }
inline const std::shared_ptr<std::vector<Timeline::Entry>> getTimeline() const { return timeline_.getEntries(); }
inline const std::optional<Timeline::Entry> findAlertAtTime(double sec) const { return timeline_.findAlertAtTime(sec); }
const std::shared_ptr<SegmentManager::EventData> getEventData() const { return event_data_; }
void installEventFilter(std::function<bool(const Event *)> filter) { event_filter_ = filter; }
void resumeStream() { interruptStream([]() { return true; }); }
// Event callback functions
std::function<void()> onSegmentsMerged = nullptr;
std::function<void(double)> onSeeking = nullptr;
std::function<void(double)> onSeekedTo = nullptr;
std::function<void(std::shared_ptr<LogReader>)> onQLogLoaded = nullptr;
signals:
void streamStarted();
void segmentsMerged();
void seeking(double sec);
void seekedTo(double sec);
void minMaxTimeChanged(double min_sec, double max_sec);
protected:
std::optional<uint64_t> find(FindFlag flag);
void pauseStreamThread();
void startStream(const Segment *cur_segment);
private:
void setupServices(const std::vector<std::string> &allow, const std::vector<std::string> &block);
void setupSegmentManager(bool has_filters);
void startStream();
void streamThread();
void updateSegmentsCache();
void loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end);
void segmentLoadFinished(int seg_num, bool success);
void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end);
void updateEvents(const std::function<bool()>& update_events_function);
void handleSegmentMerge();
void interruptStream(const std::function<bool()>& update_fn);
std::vector<Event>::const_iterator publishEvents(std::vector<Event>::const_iterator first,
std::vector<Event>::const_iterator last);
void publishMessage(const Event *e);
void publishFrame(const Event *e);
void checkSeekProgress();
inline bool isSegmentMerged(int n) const { return merged_segments_.count(n) > 0; }
void checkSeekProgress(double seeked_to_sec);
std::unique_ptr<SegmentManager> seg_mgr_;
Timeline timeline_;
pthread_t stream_thread_id = 0;
QThread *stream_thread_ = nullptr;
std::thread stream_thread_;
std::mutex stream_lock_;
bool user_paused_ = false;
std::condition_variable stream_cv_;
std::atomic<int> current_segment_ = 0;
int current_segment_ = 0;
std::optional<double> seeking_to_;
SegmentMap segments_;
// the following variables must be protected with stream_lock_
std::atomic<bool> exit_ = false;
std::atomic<bool> paused_ = false;
std::atomic<bool> interrupt_requested_ = false;
bool events_ready_ = false;
std::time_t route_date_time_;
uint64_t route_start_ts_ = 0;
std::atomic<uint64_t> cur_mono_time_ = 0;
std::atomic<double> max_seconds_ = 0;
std::vector<Event> events_;
std::set<int> merged_segments_;
// messaging
SubMaster *sm = nullptr;
std::unique_ptr<PubMaster> pm;
double min_seconds_ = 0;
double max_seconds_ = 0;
SubMaster *sm_ = nullptr;
std::unique_ptr<PubMaster> pm_;
std::vector<const char*> sockets_;
std::vector<bool> filters_;
std::unique_ptr<Route> route_;
std::unique_ptr<CameraServer> camera_server_;
std::atomic<uint32_t> flags_ = REPLAY_FLAG_NONE;
std::string car_fingerprint_;
std::atomic<float> speed_ = 1.0;
replayEventFilter event_filter = nullptr;
void *filter_opaque = nullptr;
int segment_cache_limit = MIN_SEGMENTS_CACHE;
std::function<bool(const Event *)> event_filter_ = nullptr;
std::shared_ptr<SegmentManager::EventData> event_data_ = std::make_shared<SegmentManager::EventData>();
};

@ -159,7 +159,7 @@ void Route::addFileToSegment(int n, const std::string &file) {
Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector<bool> &filters,
std::function<void(int, bool)> callback)
: seg_num(n), flags(flags), filters_(filters), onLoadFinished_(callback) {
: seg_num(n), flags(flags), filters_(filters), on_load_finished_(callback) {
// [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog
const std::array file_list = {
(flags & REPLAY_FLAG_QCAMERA) || files.road_cam.empty() ? files.qcamera : files.road_cam,
@ -178,7 +178,7 @@ Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vec
Segment::~Segment() {
{
std::lock_guard lock(mutex_);
onLoadFinished_ = nullptr; // Prevent callback after destruction
on_load_finished_ = nullptr; // Prevent callback after destruction
}
abort_ = true;
for (auto &thread : threads_) {
@ -204,8 +204,14 @@ void Segment::loadFile(int id, const std::string file) {
if (--loading_ == 0) {
std::lock_guard lock(mutex_);
if (onLoadFinished_) {
onLoadFinished_(seg_num, !abort_);
load_state_ = !abort_ ? LoadState::Loaded : LoadState::Failed;
if (on_load_finished_) {
on_load_finished_(seg_num, !abort_);
}
}
}
Segment::LoadState Segment::getState() {
std::scoped_lock lock(mutex_);
return load_state_;
}

@ -1,5 +1,6 @@
#pragma once
#include <ctime>
#include <map>
#include <memory>
#include <mutex>
@ -64,10 +65,12 @@ protected:
class Segment {
public:
enum class LoadState {Loading, Loaded, Failed};
Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector<bool> &filters,
std::function<void(int, bool)> callback);
~Segment();
inline bool isLoaded() const { return !loading_ && !abort_; }
LoadState getState();
const int seg_num = 0;
std::unique_ptr<LogReader> log;
@ -80,7 +83,8 @@ protected:
std::atomic<int> loading_ = 0;
std::mutex mutex_;
std::vector<std::thread> threads_;
std::function<void(int, bool)> onLoadFinished_ = nullptr;
std::function<void(int, bool)> on_load_finished_ = nullptr;
uint32_t flags;
std::vector<bool> filters_;
LoadState load_state_ = LoadState::Loading;
};

@ -0,0 +1,133 @@
#include "tools/replay/seg_mgr.h"
SegmentManager::~SegmentManager() {
{
std::unique_lock lock(mutex_);
exit_ = true;
onSegmentMergedCallback_ = nullptr;
}
cv_.notify_one();
if (thread_.joinable()) thread_.join();
}
bool SegmentManager::load() {
if (!route_.load()) {
rError("failed to load route: %s", route_.name().c_str());
return false;
}
for (const auto &[n, file] : route_.segments()) {
if (!file.rlog.empty() || !file.qlog.empty()) {
segments_.insert({n, nullptr});
}
}
if (segments_.empty()) {
rInfo("no valid segments in route: %s", route_.name().c_str());
return false;
}
rInfo("loaded route %s with %zu valid segments", route_.name().c_str(), segments_.size());
thread_ = std::thread(&SegmentManager::manageSegmentCache, this);
return true;
}
void SegmentManager::setCurrentSegment(int seg_num) {
{
std::unique_lock lock(mutex_);
cur_seg_num_ = seg_num;
needs_update_ = true;
}
cv_.notify_one();
}
void SegmentManager::manageSegmentCache() {
while (true) {
std::unique_lock lock(mutex_);
cv_.wait(lock, [this]() { return exit_ || needs_update_; });
if (exit_) break;
needs_update_ = false;
auto cur = segments_.lower_bound(cur_seg_num_);
if (cur == segments_.end()) continue;
// Calculate the range of segments to load
auto begin = std::prev(cur, std::min<int>(segment_cache_limit_ / 2, std::distance(segments_.begin(), cur)));
auto end = std::next(begin, std::min<int>(segment_cache_limit_, std::distance(begin, segments_.end())));
begin = std::prev(end, std::min<int>(segment_cache_limit_, std::distance(segments_.begin(), end)));
loadSegmentsInRange(begin, cur, end);
bool merged = mergeSegments(begin, end);
// Free segments outside the current range
std::for_each(segments_.begin(), begin, [](auto &segment) { segment.second.reset(); });
std::for_each(end, segments_.end(), [](auto &segment) { segment.second.reset(); });
lock.unlock();
if (merged && onSegmentMergedCallback_) {
onSegmentMergedCallback_(); // Notify listener that segments have been merged
}
}
}
bool SegmentManager::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
std::set<int> segments_to_merge;
size_t total_event_count = 0;
for (auto it = begin; it != end; ++it) {
const auto &segment = it->second;
if (segment && segment->getState() == Segment::LoadState::Loaded) {
segments_to_merge.insert(segment->seg_num);
total_event_count += segment->log->events.size();
}
}
if (segments_to_merge == merged_segments_) return false;
auto merged_event_data = std::make_shared<EventData>();
auto &merged_events = merged_event_data->events;
merged_events.reserve(total_event_count);
rDebug("merging segments: %s", join(segments_to_merge, ", ").c_str());
for (int n : segments_to_merge) {
const auto &events = segments_.at(n)->log->events;
if (events.empty()) continue;
// Skip INIT_DATA if present
auto events_begin = (events.front().which == cereal::Event::Which::INIT_DATA) ? std::next(events.begin()) : events.begin();
size_t previous_size = merged_events.size();
merged_events.insert(merged_events.end(), events_begin, events.end());
std::inplace_merge(merged_events.begin(), merged_events.begin() + previous_size, merged_events.end());
merged_event_data->segments[n] = segments_.at(n);
}
event_data_ = merged_event_data;
merged_segments_ = segments_to_merge;
return true;
}
void SegmentManager::loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) {
auto tryLoadSegment = [this](auto first, auto last) {
for (auto it = first; it != last; ++it) {
auto &segment_ptr = it->second;
if (!segment_ptr) {
segment_ptr = std::make_shared<Segment>(
it->first, route_.at(it->first), flags_, filters_,
[this](int seg_num, bool success) { setCurrentSegment(cur_seg_num_); });
}
if (segment_ptr->getState() == Segment::LoadState::Loading) {
return true; // Segment is still loading
}
}
return false; // No segments need loading
};
// Try forward loading, then reverse if necessary
if (!tryLoadSegment(cur, end)) {
tryLoadSegment(std::make_reverse_iterator(cur), std::make_reverse_iterator(begin));
}
}

@ -0,0 +1,56 @@
#pragma once
#include <condition_variable>
#include <map>
#include <mutex>
#include <set>
#include <vector>
#include "tools/replay/route.h"
constexpr int MIN_SEGMENTS_CACHE = 5;
using SegmentMap = std::map<int, std::shared_ptr<Segment>>;
class SegmentManager {
public:
struct EventData {
std::vector<Event> events; // Events extracted from the segments
SegmentMap segments; // Associated segments that contributed to these events
bool isSegmentLoaded(int n) const { return segments.find(n) != segments.end(); }
};
SegmentManager(const std::string &route_name, uint32_t flags, const std::string &data_dir = "")
: flags_(flags), route_(route_name, data_dir) {};
~SegmentManager();
bool load();
void setCurrentSegment(int seg_num);
void setCallback(const std::function<void()> &callback) { onSegmentMergedCallback_ = callback; }
void setFilters(const std::vector<bool> &filters) { filters_ = filters; }
const std::shared_ptr<EventData> getEventData() const { return event_data_; }
bool hasSegment(int n) const { return segments_.find(n) != segments_.end(); }
Route route_;
int segment_cache_limit_ = MIN_SEGMENTS_CACHE;
private:
void manageSegmentCache();
void loadSegmentsInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end);
bool mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end);
std::vector<bool> filters_;
uint32_t flags_;
std::mutex mutex_;
std::condition_variable cv_;
std::thread thread_;
std::atomic<int> cur_seg_num_ = -1;
bool needs_update_ = false;
bool exit_ = false;
SegmentMap segments_;
std::shared_ptr<EventData> event_data_;
std::function<void()> onSegmentMergedCallback_ = nullptr;
std::set<int> merged_segments_;
};

@ -1,27 +1,8 @@
#include <chrono>
#include <thread>
#include <QEventLoop>
#define CATCH_CONFIG_MAIN
#include "catch2/catch.hpp"
#include "common/util.h"
#include "tools/replay/replay.h"
#include "tools/replay/util.h"
const std::string TEST_RLOG_URL = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2";
const std::string TEST_RLOG_CHECKSUM = "5b966d4bb21a100a8c4e59195faeb741b975ccbe268211765efd1763d892bfb3";
const int TEST_REPLAY_SEGMENTS = std::getenv("TEST_REPLAY_SEGMENTS") ? atoi(std::getenv("TEST_REPLAY_SEGMENTS")) : 1;
bool download_to_file(const std::string &url, const std::string &local_file, int chunk_size = 5 * 1024 * 1024, int retries = 3) {
do {
if (httpDownload(url, local_file, chunk_size)) {
return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
} while (--retries >= 0);
return false;
}
TEST_CASE("LogReader") {
SECTION("corrupt log") {
@ -34,67 +15,3 @@ TEST_CASE("LogReader") {
REQUIRE(log.events.size() > 0);
}
}
void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) {
std::mutex mutex;
std::condition_variable cv;
Segment segment(n, segment_file, flags, {}, [&](int, bool) {
REQUIRE(segment.isLoaded() == true);
REQUIRE(segment.log != nullptr);
REQUIRE(segment.frames[RoadCam] != nullptr);
if (flags & REPLAY_FLAG_DCAM) {
REQUIRE(segment.frames[DriverCam] != nullptr);
}
if (flags & REPLAY_FLAG_ECAM) {
REQUIRE(segment.frames[WideRoadCam] != nullptr);
}
// test LogReader & FrameReader
REQUIRE(segment.log->events.size() > 0);
REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end()));
for (auto cam : ALL_CAMERAS) {
auto &fr = segment.frames[cam];
if (!fr) continue;
if (cam == RoadCam || cam == WideRoadCam) {
REQUIRE(fr->getFrameCount() == 1200);
}
auto [nv12_width, nv12_height, nv12_buffer_size] = get_nv12_info(fr->width, fr->height);
VisionBuf buf;
buf.allocate(nv12_buffer_size);
buf.init_yuv(fr->width, fr->height, nv12_width, nv12_width * nv12_height);
// sequence get 100 frames
for (int i = 0; i < 100; ++i) {
REQUIRE(fr->get(i, &buf));
}
}
cv.notify_one();
});
std::unique_lock lock(mutex);
cv.wait(lock);
}
std::string download_demo_route() {
static std::string data_dir;
if (data_dir == "") {
char tmp_path[] = "/tmp/root_XXXXXX";
data_dir = mkdtemp(tmp_path);
Route remote_route(DEMO_ROUTE);
assert(remote_route.load());
// Create a local route from remote for testing
const std::string route_name = std::string(DEMO_ROUTE).substr(17);
for (int i = 0; i < 2; ++i) {
std::string log_path = util::string_format("%s/%s--%d/", data_dir.c_str(), route_name.c_str(), i);
util::create_directories(log_path, 0755);
REQUIRE(download_to_file(remote_route.at(i).rlog, log_path + "rlog.bz2"));
REQUIRE(download_to_file(remote_route.at(i).qcamera, log_path + "qcamera.ts"));
}
}
return data_dir;
}

@ -1,10 +0,0 @@
#define CATCH_CONFIG_RUNNER
#include "catch2/catch.hpp"
#include <QCoreApplication>
int main(int argc, char **argv) {
// unit tests for Qt
QCoreApplication app(argc, argv);
const int res = Catch::Session().run(argc, argv);
return (res < 0xff ? res : 0xff);
}

@ -362,11 +362,11 @@ std::string decompressZST(const std::byte *in, size_t in_size, std::atomic<bool>
return {};
}
void precise_nano_sleep(int64_t nanoseconds, std::atomic<bool> &should_exit) {
void precise_nano_sleep(int64_t nanoseconds, std::atomic<bool> &interrupt_requested) {
struct timespec req, rem;
req.tv_sec = nanoseconds / 1000000000;
req.tv_nsec = nanoseconds % 1000000000;
while (!should_exit) {
while (!interrupt_requested) {
#ifdef __APPLE__
int ret = nanosleep(&req, &rem);
if (ret == 0 || errno != EINTR)

@ -47,7 +47,7 @@ private:
};
std::string sha256(const std::string &str);
void precise_nano_sleep(int64_t nanoseconds, std::atomic<bool> &should_exit);
void precise_nano_sleep(int64_t nanoseconds, std::atomic<bool> &interrupt_requested);
std::string decompressBZ2(const std::string &in, std::atomic<bool> *abort = nullptr);
std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool> *abort = nullptr);
std::string decompressZST(const std::string &in, std::atomic<bool> *abort = nullptr);

Loading…
Cancel
Save