diff --git a/tools/cabana/streams/replaystream.cc b/tools/cabana/streams/replaystream.cc index 18b0317d46..ac137434d3 100644 --- a/tools/cabana/streams/replaystream.cc +++ b/tools/cabana/streams/replaystream.cc @@ -53,6 +53,10 @@ bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint {}, nullptr, replay_flags, data_dir.toStdString(), this)); replay->setSegmentCacheLimit(settings.max_cached_minutes); replay->installEventFilter(event_filter, this); + + // Forward replay callbacks to corresponding Qt signals. + replay->onQLogLoaded = [this](std::shared_ptr qlog) { emit qLogLoaded(qlog); }; + QObject::connect(replay.get(), &Replay::seeking, this, &AbstractStream::seeking); QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo); QObject::connect(replay.get(), &Replay::segmentsMerged, this, &ReplayStream::mergeSegments); diff --git a/tools/cabana/streams/replaystream.h b/tools/cabana/streams/replaystream.h index 2a9f343645..1d1cdaec9e 100644 --- a/tools/cabana/streams/replaystream.h +++ b/tools/cabana/streams/replaystream.h @@ -10,6 +10,8 @@ #include "tools/cabana/streams/abstractstream.h" #include "tools/replay/replay.h" +Q_DECLARE_METATYPE(std::shared_ptr); + class ReplayStream : public AbstractStream { Q_OBJECT @@ -24,7 +26,7 @@ public: inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); } double minSeconds() const override { return replay->minSeconds(); } double maxSeconds() const { return replay->maxSeconds(); } - inline QDateTime beginDateTime() const { return replay->routeDateTime(); } + inline QDateTime beginDateTime() const { return QDateTime::fromSecsSinceEpoch(replay->routeDateTime()); } inline uint64_t beginMonoTime() const override { return replay->routeStartNanos(); } inline void setSpeed(float speed) override { replay->setSpeed(speed); } inline float getSpeed() const { return replay->getSpeed(); } @@ -32,6 +34,9 @@ public: inline bool isPaused() const override { return replay->isPaused(); } void pause(bool pause) override; +signals: + void qLogLoaded(std::shared_ptr qlog); + private: void mergeSegments(); std::unique_ptr replay = nullptr; diff --git a/tools/cabana/videowidget.cc b/tools/cabana/videowidget.cc index 66a6efe569..7f9e3d6960 100644 --- a/tools/cabana/videowidget.cc +++ b/tools/cabana/videowidget.cc @@ -13,8 +13,6 @@ #include #include -#include "tools/cabana/streams/replaystream.h" - const int MIN_VIDEO_HEIGHT = 100; const int THUMBNAIL_MARGIN = 3; @@ -167,10 +165,7 @@ QWidget *VideoWidget::createCameraWidget() { QObject::connect(camera_tab, &QTabBar::currentChanged, [this](int index) { if (index != -1) cam_widget->setStreamType((VisionStreamType)camera_tab->tabData(index).toInt()); }); - - auto replay = static_cast(can)->getReplay(); - QObject::connect(replay, &Replay::qLogLoaded, slider, &Slider::parseQLog, Qt::QueuedConnection); - QObject::connect(replay, &Replay::minMaxTimeChanged, this, &VideoWidget::timeRangeChanged, Qt::QueuedConnection); + QObject::connect(static_cast(can), &ReplayStream::qLogLoaded, slider, &Slider::parseQLog, Qt::QueuedConnection); return w; } @@ -248,11 +243,8 @@ Slider::Slider(QWidget *parent) : QSlider(Qt::Horizontal, parent) { setMouseTracking(true); } -AlertInfo Slider::alertInfo(double seconds) { - uint64_t mono_time = can->toMonoTime(seconds); - auto alert_it = alerts.lower_bound(mono_time); - bool has_alert = (alert_it != alerts.end()) && ((alert_it->first - mono_time) <= 1e8); - return has_alert ? alert_it->second : AlertInfo{}; +std::optional Slider::alertInfo(double seconds) { + return getReplay()->findAlertAtTime(seconds); } QPixmap Slider::thumbnail(double seconds) { @@ -277,14 +269,6 @@ void Slider::parseQLog(std::shared_ptr qlog) { std::lock_guard lk(mutex); thumbnails[thumb.getTimestampEof()] = scaled; } - } else if (e.which == cereal::Event::Which::SELFDRIVE_STATE) { - capnp::FlatArrayMessageReader reader(e.data); - auto cs = reader.getRoot().getSelfdriveState(); - if (cs.getAlertType().size() > 0 && cs.getAlertText1().size() > 0 && - cs.getAlertSize() != cereal::SelfdriveState::AlertSize::NONE) { - std::lock_guard lk(mutex); - alerts.emplace(e.mono_time, AlertInfo{cs.getAlertStatus(), cs.getAlertText1().cStr(), cs.getAlertText2().cStr()}); - } } }); update(); @@ -306,8 +290,8 @@ void Slider::paintEvent(QPaintEvent *ev) { auto replay = getReplay(); if (replay) { - for (auto [begin, end, type] : replay->getTimeline()) { - fillRange(begin, end, timeline_colors[(int)type]); + for (const auto &entry: *replay->getTimeline()) { + fillRange(entry.start_time, entry.end_time, timeline_colors[(int)entry.type]); } QColor empty_color = palette().color(QPalette::Window); @@ -372,7 +356,7 @@ InfoLabel::InfoLabel(QWidget *parent) : QWidget(parent, Qt::WindowStaysOnTopHint setVisible(false); } -void InfoLabel::showPixmap(const QPoint &pt, const QString &sec, const QPixmap &pm, const AlertInfo &alert) { +void InfoLabel::showPixmap(const QPoint &pt, const QString &sec, const QPixmap &pm, const std::optional &alert) { second = sec; pixmap = pm; alert_info = alert; @@ -381,10 +365,10 @@ void InfoLabel::showPixmap(const QPoint &pt, const QString &sec, const QPixmap & update(); } -void InfoLabel::showAlert(const AlertInfo &alert) { +void InfoLabel::showAlert(const std::optional &alert) { alert_info = alert; pixmap = {}; - setVisible(!alert_info.text1.isEmpty()); + setVisible(alert_info.has_value()); update(); } @@ -396,18 +380,11 @@ void InfoLabel::paintEvent(QPaintEvent *event) { p.drawRect(rect()); p.drawText(rect().adjusted(0, 0, 0, -THUMBNAIL_MARGIN), second, Qt::AlignHCenter | Qt::AlignBottom); } - if (alert_info.text1.size() > 0) { - QColor color = timeline_colors[(int)TimelineType::AlertInfo]; - if (alert_info.status == cereal::SelfdriveState::AlertStatus::USER_PROMPT) { - color = timeline_colors[(int)TimelineType::AlertWarning]; - } else if (alert_info.status == cereal::SelfdriveState::AlertStatus::CRITICAL) { - color = timeline_colors[(int)TimelineType::AlertCritical]; - } + if (alert_info) { + QColor color = timeline_colors[int(alert_info->type)]; color.setAlphaF(0.5); - QString text = alert_info.text1; - if (!alert_info.text2.isEmpty()) { - text += "\n" + alert_info.text2; - } + QString text = QString::fromStdString(alert_info->text1); + if (!alert_info->text2.empty()) text += "\n" + QString::fromStdString(alert_info->text2); if (!pixmap.isNull()) { QFont font; diff --git a/tools/cabana/videowidget.h b/tools/cabana/videowidget.h index 17bec78545..d3342c34d7 100644 --- a/tools/cabana/videowidget.h +++ b/tools/cabana/videowidget.h @@ -16,22 +16,17 @@ #include "selfdrive/ui/qt/widgets/cameraview.h" #include "tools/cabana/utils/util.h" #include "tools/replay/logreader.h" - -struct AlertInfo { - cereal::SelfdriveState::AlertStatus status; - QString text1; - QString text2; -}; +#include "tools/cabana/streams/replaystream.h" class InfoLabel : public QWidget { public: InfoLabel(QWidget *parent); - void showPixmap(const QPoint &pt, const QString &sec, const QPixmap &pm, const AlertInfo &alert); - void showAlert(const AlertInfo &alert); + void showPixmap(const QPoint &pt, const QString &sec, const QPixmap &pm, const std::optional &alert); + void showAlert(const std::optional &alert); void paintEvent(QPaintEvent *event) override; QPixmap pixmap; QString second; - AlertInfo alert_info; + std::optional alert_info; }; class Slider : public QSlider { @@ -42,7 +37,7 @@ public: double currentSecond() const { return value() / factor; } void setCurrentSecond(double sec) { setValue(sec * factor); } void setTimeRange(double min, double max); - AlertInfo alertInfo(double sec); + std::optional alertInfo(double sec); QPixmap thumbnail(double sec); void parseQLog(std::shared_ptr qlog); @@ -55,7 +50,6 @@ private: void paintEvent(QPaintEvent *ev) override; QMap thumbnails; - std::map alerts; InfoLabel *thumbnail_label; }; diff --git a/tools/replay/SConscript b/tools/replay/SConscript index 1f966d4372..179af69d42 100644 --- a/tools/replay/SConscript +++ b/tools/replay/SConscript @@ -9,7 +9,8 @@ if arch == "Darwin": else: base_libs.append('OpenCL') -replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", "route.cc", "util.cc"] +replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", + "route.cc", "util.cc", "timeline.cc"] replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks) Export('replay_lib') replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'zstd', 'curl', 'yuv', 'ncurses'] + base_libs diff --git a/tools/replay/consoleui.cc b/tools/replay/consoleui.cc index 42c7238f7a..b5415ac808 100644 --- a/tools/replay/consoleui.cc +++ b/tools/replay/consoleui.cc @@ -1,5 +1,6 @@ #include "tools/replay/consoleui.h" +#include #include #include #include @@ -152,7 +153,6 @@ void ConsoleUI::updateStatus() { add_str(win, unit.c_str()); }; static const std::pair status_text[] = { - {"loading...", Color::Red}, {"playing", Color::Green}, {"paused...", Color::Yellow}, }; @@ -161,8 +161,10 @@ void ConsoleUI::updateStatus() { auto [status_str, status_color] = status_text[status]; write_item(0, 0, "STATUS: ", status_str, " ", false, status_color); + auto cur_ts = replay->routeDateTime() + (int)replay->currentSeconds(); + char *time_string = ctime(&cur_ts); std::string current_segment = " - " + std::to_string((int)(replay->currentSeconds() / 60)); - write_item(0, 25, "TIME: ", replay->currentDateTime().toString("ddd MMMM dd hh:mm:ss").toStdString(), current_segment, true); + write_item(0, 25, "TIME: ", time_string, current_segment, true); auto p = sm["liveParameters"].getLiveParameters(); write_item(1, 0, "STIFFNESS: ", util::string_format("%.2f %%", p.getStiffnessFactor() * 100), " "); @@ -247,18 +249,18 @@ void ConsoleUI::updateTimeline() { wattroff(win, COLOR_PAIR(Color::Disengaged)); const int total_sec = replay->maxSeconds() - replay->minSeconds(); - for (auto [begin, end, type] : replay->getTimeline()) { - int start_pos = ((begin - replay->minSeconds()) / total_sec) * width; - int end_pos = ((end - replay->minSeconds()) / total_sec) * width; - if (type == TimelineType::Engaged) { + for (const auto &entry : *replay->getTimeline()) { + int start_pos = ((entry.start_time - replay->minSeconds()) / total_sec) * width; + int end_pos = ((entry.end_time - replay->minSeconds()) / total_sec) * width; + if (entry.type == TimelineType::Engaged) { mvwchgat(win, 1, start_pos, end_pos - start_pos + 1, A_COLOR, Color::Engaged, NULL); mvwchgat(win, 2, start_pos, end_pos - start_pos + 1, A_COLOR, Color::Engaged, NULL); - } else if (type == TimelineType::UserFlag) { + } else if (entry.type == TimelineType::UserFlag) { mvwchgat(win, 3, start_pos, end_pos - start_pos + 1, ACS_S3, Color::Cyan, NULL); } else { auto color_id = Color::Green; - if (type != TimelineType::AlertInfo) { - color_id = type == TimelineType::AlertWarning ? Color::Yellow : Color::Red; + if (entry.type != TimelineType::AlertInfo) { + color_id = entry.type == TimelineType::AlertWarning ? Color::Yellow : Color::Red; } mvwchgat(win, 3, start_pos, end_pos - start_pos + 1, ACS_S3, color_id, NULL); } diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index a0ae90c079..82e231937c 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -1,7 +1,5 @@ #include "tools/replay/replay.h" -#include -#include #include #include #include "cereal/services.h" @@ -11,6 +9,14 @@ static void interrupt_sleep_handler(int signal) {} +// Helper function to notify events with safety checks +template +void notifyEvent(Callback &callback, Args &&...args) { + if (callback) { + callback(std::forward(args)...); + } +} + Replay::Replay(const std::string &route, std::vector allow, std::vector block, SubMaster *sm_, uint32_t flags, const std::string &data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) { // Register signal handler for SIGUSR1 @@ -40,7 +46,7 @@ Replay::Replay(const std::string &route, std::vector allow, std::ve } } - rInfo("active services: %s", join(active_services, ',').c_str()); + rInfo("active services: %s", join(active_services, ", ").c_str()); rInfo("loading route %s", route.c_str()); if (sm == nullptr) { @@ -68,7 +74,6 @@ void Replay::stop() { stream_thread_ = nullptr; rInfo("shutdown: done"); } - timeline_future.waitForFinished(); camera_server_.reset(nullptr); segments_.clear(); } @@ -151,101 +156,11 @@ void Replay::checkSeekProgress() { } void Replay::seekToFlag(FindFlag flag) { - if (auto next = find(flag)) { + if (auto next = timeline_.find(currentSeconds(), flag)) { seekTo(*next - 2, false); // seek to 2 seconds before next } } -void Replay::buildTimeline() { - uint64_t engaged_begin = 0; - bool engaged = false; - - auto alert_status = cereal::SelfdriveState::AlertStatus::NORMAL; - auto alert_size = cereal::SelfdriveState::AlertSize::NONE; - uint64_t alert_begin = 0; - std::string alert_type; - - const TimelineType timeline_types[] = { - [(int)cereal::SelfdriveState::AlertStatus::NORMAL] = TimelineType::AlertInfo, - [(int)cereal::SelfdriveState::AlertStatus::USER_PROMPT] = TimelineType::AlertWarning, - [(int)cereal::SelfdriveState::AlertStatus::CRITICAL] = TimelineType::AlertCritical, - }; - - const auto &route_segments = route_->segments(); - for (auto it = route_segments.cbegin(); it != route_segments.cend() && !exit_; ++it) { - std::shared_ptr log(new LogReader()); - if (!log->load(it->second.qlog, &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3) || log->events.empty()) continue; - - std::vector> timeline; - for (const Event &e : log->events) { - if (e.which == cereal::Event::Which::SELFDRIVE_STATE) { - capnp::FlatArrayMessageReader reader(e.data); - auto event = reader.getRoot(); - auto cs = event.getSelfdriveState(); - - if (engaged != cs.getEnabled()) { - if (engaged) { - timeline.push_back({toSeconds(engaged_begin), toSeconds(e.mono_time), TimelineType::Engaged}); - } - engaged_begin = e.mono_time; - engaged = cs.getEnabled(); - } - - if (alert_type != cs.getAlertType().cStr() || alert_status != cs.getAlertStatus()) { - if (!alert_type.empty() && alert_size != cereal::SelfdriveState::AlertSize::NONE) { - timeline.push_back({toSeconds(alert_begin), toSeconds(e.mono_time), timeline_types[(int)alert_status]}); - } - alert_begin = e.mono_time; - alert_type = cs.getAlertType().cStr(); - alert_size = cs.getAlertSize(); - alert_status = cs.getAlertStatus(); - } - } else if (e.which == cereal::Event::Which::USER_FLAG) { - timeline.push_back({toSeconds(e.mono_time), toSeconds(e.mono_time), TimelineType::UserFlag}); - } - } - - if (it->first == route_segments.rbegin()->first) { - if (engaged) { - timeline.push_back({toSeconds(engaged_begin), toSeconds(log->events.back().mono_time), TimelineType::Engaged}); - } - if (!alert_type.empty() && alert_size != cereal::SelfdriveState::AlertSize::NONE) { - timeline.push_back({toSeconds(alert_begin), toSeconds(log->events.back().mono_time), timeline_types[(int)alert_status]}); - } - - max_seconds_ = std::ceil(toSeconds(log->events.back().mono_time)); - emit minMaxTimeChanged(route_segments.cbegin()->first * 60.0, max_seconds_); - } - { - std::lock_guard lk(timeline_lock); - timeline_.insert(timeline_.end(), timeline.begin(), timeline.end()); - std::sort(timeline_.begin(), timeline_.end(), [](auto &l, auto &r) { return std::get<2>(l) < std::get<2>(r); }); - } - emit qLogLoaded(log); - } -} - -std::optional Replay::find(FindFlag flag) { - int cur_ts = currentSeconds(); - for (auto [start_ts, end_ts, type] : getTimeline()) { - if (type == TimelineType::Engaged) { - if (flag == FindFlag::nextEngagement && start_ts > cur_ts) { - return start_ts; - } else if (flag == FindFlag::nextDisEngagement && end_ts > cur_ts) { - return end_ts; - } - } else if (start_ts > cur_ts) { - if ((flag == FindFlag::nextUserFlag && type == TimelineType::UserFlag) || - (flag == FindFlag::nextInfo && type == TimelineType::AlertInfo) || - (flag == FindFlag::nextWarning && type == TimelineType::AlertWarning) || - (flag == FindFlag::nextCritical && type == TimelineType::AlertCritical)) { - return start_ts; - } - } - } - return std::nullopt; -} - void Replay::pause(bool pause) { if (user_paused_ != pause) { pauseStreamThread(); @@ -266,16 +181,15 @@ void Replay::pauseStreamThread() { } } -void Replay::segmentLoadFinished(bool success) { +void Replay::segmentLoadFinished(int seg_num, bool success) { if (!success) { - Segment *seg = qobject_cast(sender()); - rWarning("failed to load segment %d, removing it from current replay list", seg->seg_num); + rWarning("failed to load segment %d, removing it from current replay list", seg_num); updateEvents([&]() { - segments_.erase(seg->seg_num); + segments_.erase(seg_num); return !segments_.empty(); }); } - updateSegmentsCache(); + QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection); } void Replay::updateSegmentsCache() { @@ -306,8 +220,10 @@ void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator auto it = std::find_if(first, last, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); }); if (it != last && !it->second) { rDebug("loading segment %d...", it->first); - it->second = std::make_unique(it->first, route_->at(it->first), flags_, filters_); - QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); + it->second = std::make_unique(it->first, route_->at(it->first), flags_, filters_, + [this](int seg_num, bool success) { + segmentLoadFinished(seg_num, success); + }); return true; } return false; @@ -373,7 +289,7 @@ void Replay::startStream(const Segment *cur_segment) { auto event = reader.getRoot(); uint64_t wall_time = event.getInitData().getWallTimeNanos(); if (wall_time > 0) { - route_date_time_ = QDateTime::fromMSecsSinceEpoch(wall_time / 1e6); + route_date_time_ = wall_time / 1e6; } } @@ -405,12 +321,16 @@ void Replay::startStream(const Segment *cur_segment) { } emit segmentsMerged(); + + timeline_.initialize(*route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE), + [this](std::shared_ptr log) { + notifyEvent(onQLogLoaded, log); + }); // start stream thread stream_thread_ = new QThread(); QObject::connect(stream_thread_, &QThread::started, [=]() { streamThread(); }); stream_thread_->start(); - timeline_future = QtConcurrent::run(this, &Replay::buildTimeline); emit streamStarted(); } diff --git a/tools/replay/replay.h b/tools/replay/replay.h index e828b369aa..d3f5425308 100644 --- a/tools/replay/replay.h +++ b/tools/replay/replay.h @@ -6,7 +6,6 @@ #include #include #include -#include #include #include @@ -14,6 +13,7 @@ #include "tools/replay/camera.h" #include "tools/replay/route.h" +#include "tools/replay/timeline.h" #define DEMO_ROUTE "a2a0ccea32023010|2023-07-27--13-01-19" @@ -32,19 +32,8 @@ enum REPLAY_FLAGS { REPLAY_FLAG_ALL_SERVICES = 0x0800, }; -enum class FindFlag { - nextEngagement, - nextDisEngagement, - nextUserFlag, - nextInfo, - nextWarning, - nextCritical -}; - -enum class TimelineType { None, Engaged, AlertInfo, AlertWarning, AlertCritical, UserFlag }; typedef bool (*replayEventFilter)(const Event *, void *); typedef std::map> SegmentMap; -Q_DECLARE_METATYPE(std::shared_ptr); class Replay : public QObject { Q_OBJECT @@ -75,8 +64,7 @@ public: inline void removeFlag(REPLAY_FLAGS flag) { flags_ &= ~flag; } inline const Route* route() const { return route_.get(); } inline double currentSeconds() const { return double(cur_mono_time_ - route_start_ts_) / 1e9; } - inline QDateTime routeDateTime() const { return route_date_time_; } - inline QDateTime currentDateTime() const { return route_date_time_.addSecs(currentSeconds()); } + inline std::time_t routeDateTime() const { return route_date_time_; } inline uint64_t routeStartNanos() const { return route_start_ts_; } inline double toSeconds(uint64_t mono_time) const { return (mono_time - route_start_ts_) / 1e9; } inline double minSeconds() const { return !segments_.empty() ? segments_.begin()->first * 60 : 0; } @@ -85,22 +73,20 @@ public: inline float getSpeed() const { return speed_; } inline const SegmentMap &segments() const { return segments_; } inline const std::string &carFingerprint() const { return car_fingerprint_; } - inline const std::vector> getTimeline() { - std::lock_guard lk(timeline_lock); - return timeline_; - } + inline const std::shared_ptr> getTimeline() const { return timeline_.get(); } + inline const std::optional findAlertAtTime(double sec) const { return timeline_.findAlertAtTime(sec); } + + // Event callback functions + std::function)> onQLogLoaded = nullptr; + signals: void streamStarted(); void segmentsMerged(); void seeking(double sec); void seekedTo(double sec); - void qLogLoaded(std::shared_ptr qlog); void minMaxTimeChanged(double min_sec, double max_sec); -protected slots: - void segmentLoadFinished(bool success); - protected: std::optional find(FindFlag flag); void pauseStreamThread(); @@ -108,16 +94,18 @@ protected: void streamThread(); void updateSegmentsCache(); void loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end); + void segmentLoadFinished(int seg_num, bool success); void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end); void updateEvents(const std::function& update_events_function); std::vector::const_iterator publishEvents(std::vector::const_iterator first, std::vector::const_iterator last); void publishMessage(const Event *e); void publishFrame(const Event *e); - void buildTimeline(); void checkSeekProgress(); inline bool isSegmentMerged(int n) const { return merged_segments_.count(n) > 0; } + Timeline timeline_; + pthread_t stream_thread_id = 0; QThread *stream_thread_ = nullptr; std::mutex stream_lock_; @@ -130,7 +118,7 @@ protected: std::atomic exit_ = false; std::atomic paused_ = false; bool events_ready_ = false; - QDateTime route_date_time_; + std::time_t route_date_time_; uint64_t route_start_ts_ = 0; std::atomic cur_mono_time_ = 0; std::atomic max_seconds_ = 0; @@ -146,9 +134,6 @@ protected: std::unique_ptr camera_server_; std::atomic flags_ = REPLAY_FLAG_NONE; - std::mutex timeline_lock; - QFuture timeline_future; - std::vector> timeline_; std::string car_fingerprint_; std::atomic speed_ = 1.0; replayEventFilter event_filter = nullptr; diff --git a/tools/replay/route.cc b/tools/replay/route.cc index e298b88b79..0d8d6d8fb7 100644 --- a/tools/replay/route.cc +++ b/tools/replay/route.cc @@ -1,10 +1,8 @@ #include "tools/replay/route.h" -#include #include #include #include -#include #include #include #include @@ -53,7 +51,11 @@ bool Route::load() { rInfo("invalid route format"); return false; } - date_time_ = QDateTime::fromString(route_.timestamp.c_str(), "yyyy-MM-dd--HH-mm-ss"); + + struct tm tm_time = {0}; + strptime(route_.timestamp.c_str(), "%Y-%m-%d--%H-%M-%S", &tm_time); + date_time_ = mktime(&tm_time); + bool ret = data_dir_.empty() ? loadFromServer() : loadFromLocal(); if (ret) { if (route_.begin_segment == -1) route_.begin_segment = segments_.rbegin()->first; @@ -151,8 +153,9 @@ void Route::addFileToSegment(int n, const std::string &file) { // class Segment -Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector &filters) - : seg_num(n), flags(flags), filters_(filters) { +Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector &filters, + std::function callback) + : seg_num(n), flags(flags), filters_(filters), onLoadFinished_(callback) { // [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog const std::array file_list = { (flags & REPLAY_FLAG_QCAMERA) || files.road_cam.empty() ? files.qcamera : files.road_cam, @@ -163,16 +166,20 @@ Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vec for (int i = 0; i < file_list.size(); ++i) { if (!file_list[i].empty() && (!(flags & REPLAY_FLAG_NO_VIPC) || i >= MAX_CAMERAS)) { ++loading_; - synchronizer_.addFuture(QtConcurrent::run(this, &Segment::loadFile, i, file_list[i])); + threads_.emplace_back(&Segment::loadFile, this, i, file_list[i]); } } } Segment::~Segment() { - disconnect(); + { + std::lock_guard lock(mutex_); + onLoadFinished_ = nullptr; // Prevent callback after destruction + } abort_ = true; - synchronizer_.setCancelOnWait(true); - synchronizer_.waitForFinished(); + for (auto &thread : threads_) { + if (thread.joinable()) thread.join(); + } } void Segment::loadFile(int id, const std::string file) { @@ -192,6 +199,9 @@ void Segment::loadFile(int id, const std::string file) { } if (--loading_ == 0) { - emit loadFinished(!abort_); + std::lock_guard lock(mutex_); + if (onLoadFinished_) { + onLoadFinished_(seg_num, !abort_); + } } } diff --git a/tools/replay/route.h b/tools/replay/route.h index 9c1cc37e2a..a2a8121de7 100644 --- a/tools/replay/route.h +++ b/tools/replay/route.h @@ -2,11 +2,12 @@ #include #include +#include #include +#include #include -#include -#include +#include #include "tools/replay/framereader.h" #include "tools/replay/logreader.h" @@ -43,7 +44,7 @@ public: bool load(); RouteLoadError lastError() const { return err_; } inline const std::string &name() const { return route_.str; } - inline const QDateTime datetime() const { return date_time_; } + inline const std::time_t datetime() const { return date_time_; } inline const std::string &dir() const { return data_dir_; } inline const RouteIdentifier &identifier() const { return route_; } inline const std::map &segments() const { return segments_; } @@ -58,15 +59,14 @@ protected: RouteIdentifier route_ = {}; std::string data_dir_; std::map segments_; - QDateTime date_time_; + std::time_t date_time_; RouteLoadError err_ = RouteLoadError::None; }; -class Segment : public QObject { - Q_OBJECT - +class Segment { public: - Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector &filters = {}); + Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector &filters, + std::function callback); ~Segment(); inline bool isLoaded() const { return !loading_ && !abort_; } @@ -74,15 +74,14 @@ public: std::unique_ptr log; std::unique_ptr frames[MAX_CAMERAS] = {}; -signals: - void loadFinished(bool success); - protected: void loadFile(int id, const std::string file); std::atomic abort_ = false; std::atomic loading_ = 0; - QFutureSynchronizer synchronizer_; + std::mutex mutex_; + std::vector threads_; + std::function onLoadFinished_ = nullptr; uint32_t flags; std::vector filters_; }; diff --git a/tools/replay/tests/test_replay.cc b/tools/replay/tests/test_replay.cc index 2fe468e726..d350df570a 100644 --- a/tools/replay/tests/test_replay.cc +++ b/tools/replay/tests/test_replay.cc @@ -72,9 +72,9 @@ TEST_CASE("LogReader") { } void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) { - QEventLoop loop; - Segment segment(n, segment_file, flags); - QObject::connect(&segment, &Segment::loadFinished, [&]() { + std::mutex mutex; + std::condition_variable cv; + Segment segment(n, segment_file, flags, {}, [&](int, bool) { REQUIRE(segment.isLoaded() == true); REQUIRE(segment.log != nullptr); REQUIRE(segment.frames[RoadCam] != nullptr); @@ -105,10 +105,11 @@ void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) { REQUIRE(fr->get(i, &buf)); } } - - loop.quit(); + cv.notify_one(); }); - loop.exec(); + + std::unique_lock lock(mutex); + cv.wait(lock); } std::string download_demo_route() { diff --git a/tools/replay/timeline.cc b/tools/replay/timeline.cc new file mode 100644 index 0000000000..5a4f58a46f --- /dev/null +++ b/tools/replay/timeline.cc @@ -0,0 +1,109 @@ +#include "tools/replay/timeline.h" + +#include + +#include "cereal/gen/cpp/log.capnp.h" + +Timeline::~Timeline() { + should_exit_.store(true); + if (thread_.joinable()) { + thread_.join(); + } +} + +void Timeline::initialize(const Route &route, uint64_t route_start_ts, bool local_cache, + std::function)> callback) { + thread_ = std::thread(&Timeline::buildTimeline, this, route, route_start_ts, local_cache, callback); +} + +std::optional Timeline::find(double cur_ts, FindFlag flag) const { + for (const auto &entry : *get()) { + if (entry.type == TimelineType::Engaged) { + if (flag == FindFlag::nextEngagement && entry.start_time > cur_ts) { + return entry.start_time; + } else if (flag == FindFlag::nextDisEngagement && entry.end_time > cur_ts) { + return entry.end_time; + } + } else if (entry.start_time > cur_ts) { + if ((flag == FindFlag::nextUserFlag && entry.type == TimelineType::UserFlag) || + (flag == FindFlag::nextInfo && entry.type == TimelineType::AlertInfo) || + (flag == FindFlag::nextWarning && entry.type == TimelineType::AlertWarning) || + (flag == FindFlag::nextCritical && entry.type == TimelineType::AlertCritical)) { + return entry.start_time; + } + } + } + return std::nullopt; +} + +std::optional Timeline::findAlertAtTime(double target_time) const { + for (const auto &entry : *get()) { + if (entry.start_time > target_time) break; + if (entry.end_time >= target_time && entry.type >= TimelineType::AlertInfo) { + return entry; + } + } + return std::nullopt; +} + +void Timeline::buildTimeline(const Route &route, uint64_t route_start_ts, bool local_cache, + std::function)> callback) { + std::optional current_engaged_idx, current_alert_idx; + + for (const auto &segment : route.segments()) { + if (should_exit_) break; + + auto log = std::make_shared(); + if (!log->load(segment.second.qlog, &should_exit_, local_cache, 0, 3) || log->events.empty()) { + continue; // Skip if log loading fails or no events + } + + for (const Event &e : log->events) { + double seconds = (e.mono_time - route_start_ts) / 1e9; + if (e.which == cereal::Event::Which::SELFDRIVE_STATE) { + capnp::FlatArrayMessageReader reader(e.data); + auto cs = reader.getRoot().getSelfdriveState(); + updateEngagementStatus(cs, current_engaged_idx, seconds); + updateAlertStatus(cs, current_alert_idx, seconds); + } else if (e.which == cereal::Event::Which::USER_FLAG) { + staging_entries_.emplace_back(Entry{seconds, seconds, TimelineType::UserFlag}); + } + } + + callback(log); // Notify the callback once the log is processed + + // Sort and finalize the timeline entries + std::sort(staging_entries_.begin(), staging_entries_.end(), [](auto &a, auto &b) { return a.start_time < b.start_time; }); + timeline_entries_ = std::make_shared>(staging_entries_); + } +} + +void Timeline::updateEngagementStatus(const cereal::SelfdriveState::Reader &cs, std::optional &idx, double seconds) { + if (idx) staging_entries_[*idx].end_time = seconds; + if (cs.getEnabled()) { + if (!idx) { + idx = staging_entries_.size(); + staging_entries_.emplace_back(Entry{seconds, seconds, TimelineType::Engaged}); + } + } else { + idx.reset(); + } +} + +void Timeline::updateAlertStatus(const cereal::SelfdriveState::Reader &cs, std::optional &idx, double seconds) { + static auto alert_types = std::array{TimelineType::AlertInfo, TimelineType::AlertWarning, TimelineType::AlertCritical}; + + Entry *entry = idx ? &staging_entries_[*idx] : nullptr; + if (entry) entry->end_time = seconds; + if (cs.getAlertSize() != cereal::SelfdriveState::AlertSize::NONE) { + auto type = alert_types[(int)cs.getAlertStatus()]; + std::string text1 = cs.getAlertText1().cStr(); + std::string text2 = cs.getAlertText2().cStr(); + if (!entry || entry->type != type || entry->text1 != text1 || entry->text2 != text2) { + idx = staging_entries_.size(); + staging_entries_.emplace_back(Entry{seconds, seconds, type, text1, text2}); // Start a new entry + } + } else { + idx.reset(); + } +} diff --git a/tools/replay/timeline.h b/tools/replay/timeline.h new file mode 100644 index 0000000000..b2535fd8b0 --- /dev/null +++ b/tools/replay/timeline.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include +#include + +#include "tools/replay/route.h" + +enum class TimelineType { None, Engaged, AlertInfo, AlertWarning, AlertCritical, UserFlag }; +enum class FindFlag { nextEngagement, nextDisEngagement, nextUserFlag, nextInfo, nextWarning, nextCritical }; + +class Timeline { +public: + struct Entry { + double start_time; + double end_time; + TimelineType type; + std::string text1; + std::string text2; + }; + + Timeline() : timeline_entries_(std::make_shared>()) {} + ~Timeline(); + + void initialize(const Route &route, uint64_t route_start_ts, bool local_cache, + std::function)> callback); + std::optional find(double cur_ts, FindFlag flag) const; + std::optional findAlertAtTime(double target_time) const; + const std::shared_ptr> get() const { return timeline_entries_; } + +private: + void buildTimeline(const Route &route, uint64_t route_start_ts, bool local_cache, + std::function)> callback); + void updateEngagementStatus(const cereal::SelfdriveState::Reader &cs, std::optional &idx, double seconds); + void updateAlertStatus(const cereal::SelfdriveState::Reader &cs, std::optional &idx, double seconds); + + std::thread thread_; + std::atomic should_exit_ = false; + + // Temporarily holds entries before they are sorted and finalized + std::vector staging_entries_; + + // Final sorted timeline entries + std::shared_ptr> timeline_entries_; +}; diff --git a/tools/replay/util.cc b/tools/replay/util.cc index 91f6af4f84..fac1e11c47 100644 --- a/tools/replay/util.cc +++ b/tools/replay/util.cc @@ -14,7 +14,6 @@ #include #include #include -#include #include #include @@ -404,15 +403,6 @@ std::vector split(std::string_view source, char delimiter) { return fields; } -std::string join(const std::vector &elements, char separator) { - std::ostringstream oss; - for (size_t i = 0; i < elements.size(); ++i) { - if (i != 0) oss << separator; - oss << elements[i]; - } - return oss.str(); -} - std::string extractFileName(const std::string &file) { size_t queryPos = file.find_first_of("?"); std::string path = (queryPos != std::string::npos) ? file.substr(0, queryPos) : file; diff --git a/tools/replay/util.h b/tools/replay/util.h index 317b964181..46df2bc191 100644 --- a/tools/replay/util.h +++ b/tools/replay/util.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -59,6 +60,15 @@ typedef std::function Download void installDownloadProgressHandler(DownloadProgressHandler); bool httpDownload(const std::string &url, const std::string &file, size_t chunk_size = 0, std::atomic *abort = nullptr); std::string formattedDataSize(size_t size); -std::vector split(std::string_view source, char delimiter); -std::string join(const std::vector &elements, char separator); std::string extractFileName(const std::string& file); +std::vector split(std::string_view source, char delimiter); + +template +std::string join(const Iterable& elements, const std::string& separator) { + std::ostringstream oss; + for (auto it = elements.begin(); it != elements.end(); ++it) { + if (it != elements.begin()) oss << separator; + oss << *it; + } + return oss.str(); +}