replay: remove Qt dependency from Segment and Timeline (#33847)

remove Segment, Timeline dependency on Qt
pull/33867/head
Dean Lee 6 months ago committed by GitHub
parent db98ba88ab
commit 24a32c3dec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      tools/cabana/streams/replaystream.cc
  2. 7
      tools/cabana/streams/replaystream.h
  3. 47
      tools/cabana/videowidget.cc
  4. 16
      tools/cabana/videowidget.h
  5. 3
      tools/replay/SConscript
  6. 20
      tools/replay/consoleui.cc
  7. 128
      tools/replay/replay.cc
  8. 39
      tools/replay/replay.h
  9. 30
      tools/replay/route.cc
  10. 23
      tools/replay/route.h
  11. 13
      tools/replay/tests/test_replay.cc
  12. 109
      tools/replay/timeline.cc
  13. 46
      tools/replay/timeline.h
  14. 10
      tools/replay/util.cc
  15. 14
      tools/replay/util.h

@ -53,6 +53,10 @@ bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint
{}, nullptr, replay_flags, data_dir.toStdString(), this));
replay->setSegmentCacheLimit(settings.max_cached_minutes);
replay->installEventFilter(event_filter, this);
// Forward replay callbacks to corresponding Qt signals.
replay->onQLogLoaded = [this](std::shared_ptr<LogReader> qlog) { emit qLogLoaded(qlog); };
QObject::connect(replay.get(), &Replay::seeking, this, &AbstractStream::seeking);
QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo);
QObject::connect(replay.get(), &Replay::segmentsMerged, this, &ReplayStream::mergeSegments);

@ -10,6 +10,8 @@
#include "tools/cabana/streams/abstractstream.h"
#include "tools/replay/replay.h"
Q_DECLARE_METATYPE(std::shared_ptr<LogReader>);
class ReplayStream : public AbstractStream {
Q_OBJECT
@ -24,7 +26,7 @@ public:
inline QString carFingerprint() const override { return replay->carFingerprint().c_str(); }
double minSeconds() const override { return replay->minSeconds(); }
double maxSeconds() const { return replay->maxSeconds(); }
inline QDateTime beginDateTime() const { return replay->routeDateTime(); }
inline QDateTime beginDateTime() const { return QDateTime::fromSecsSinceEpoch(replay->routeDateTime()); }
inline uint64_t beginMonoTime() const override { return replay->routeStartNanos(); }
inline void setSpeed(float speed) override { replay->setSpeed(speed); }
inline float getSpeed() const { return replay->getSpeed(); }
@ -32,6 +34,9 @@ public:
inline bool isPaused() const override { return replay->isPaused(); }
void pause(bool pause) override;
signals:
void qLogLoaded(std::shared_ptr<LogReader> qlog);
private:
void mergeSegments();
std::unique_ptr<Replay> replay = nullptr;

@ -13,8 +13,6 @@
#include <QVBoxLayout>
#include <QtConcurrent>
#include "tools/cabana/streams/replaystream.h"
const int MIN_VIDEO_HEIGHT = 100;
const int THUMBNAIL_MARGIN = 3;
@ -167,10 +165,7 @@ QWidget *VideoWidget::createCameraWidget() {
QObject::connect(camera_tab, &QTabBar::currentChanged, [this](int index) {
if (index != -1) cam_widget->setStreamType((VisionStreamType)camera_tab->tabData(index).toInt());
});
auto replay = static_cast<ReplayStream*>(can)->getReplay();
QObject::connect(replay, &Replay::qLogLoaded, slider, &Slider::parseQLog, Qt::QueuedConnection);
QObject::connect(replay, &Replay::minMaxTimeChanged, this, &VideoWidget::timeRangeChanged, Qt::QueuedConnection);
QObject::connect(static_cast<ReplayStream*>(can), &ReplayStream::qLogLoaded, slider, &Slider::parseQLog, Qt::QueuedConnection);
return w;
}
@ -248,11 +243,8 @@ Slider::Slider(QWidget *parent) : QSlider(Qt::Horizontal, parent) {
setMouseTracking(true);
}
AlertInfo Slider::alertInfo(double seconds) {
uint64_t mono_time = can->toMonoTime(seconds);
auto alert_it = alerts.lower_bound(mono_time);
bool has_alert = (alert_it != alerts.end()) && ((alert_it->first - mono_time) <= 1e8);
return has_alert ? alert_it->second : AlertInfo{};
std::optional<Timeline::Entry> Slider::alertInfo(double seconds) {
return getReplay()->findAlertAtTime(seconds);
}
QPixmap Slider::thumbnail(double seconds) {
@ -277,14 +269,6 @@ void Slider::parseQLog(std::shared_ptr<LogReader> qlog) {
std::lock_guard lk(mutex);
thumbnails[thumb.getTimestampEof()] = scaled;
}
} else if (e.which == cereal::Event::Which::SELFDRIVE_STATE) {
capnp::FlatArrayMessageReader reader(e.data);
auto cs = reader.getRoot<cereal::Event>().getSelfdriveState();
if (cs.getAlertType().size() > 0 && cs.getAlertText1().size() > 0 &&
cs.getAlertSize() != cereal::SelfdriveState::AlertSize::NONE) {
std::lock_guard lk(mutex);
alerts.emplace(e.mono_time, AlertInfo{cs.getAlertStatus(), cs.getAlertText1().cStr(), cs.getAlertText2().cStr()});
}
}
});
update();
@ -306,8 +290,8 @@ void Slider::paintEvent(QPaintEvent *ev) {
auto replay = getReplay();
if (replay) {
for (auto [begin, end, type] : replay->getTimeline()) {
fillRange(begin, end, timeline_colors[(int)type]);
for (const auto &entry: *replay->getTimeline()) {
fillRange(entry.start_time, entry.end_time, timeline_colors[(int)entry.type]);
}
QColor empty_color = palette().color(QPalette::Window);
@ -372,7 +356,7 @@ InfoLabel::InfoLabel(QWidget *parent) : QWidget(parent, Qt::WindowStaysOnTopHint
setVisible(false);
}
void InfoLabel::showPixmap(const QPoint &pt, const QString &sec, const QPixmap &pm, const AlertInfo &alert) {
void InfoLabel::showPixmap(const QPoint &pt, const QString &sec, const QPixmap &pm, const std::optional<Timeline::Entry> &alert) {
second = sec;
pixmap = pm;
alert_info = alert;
@ -381,10 +365,10 @@ void InfoLabel::showPixmap(const QPoint &pt, const QString &sec, const QPixmap &
update();
}
void InfoLabel::showAlert(const AlertInfo &alert) {
void InfoLabel::showAlert(const std::optional<Timeline::Entry> &alert) {
alert_info = alert;
pixmap = {};
setVisible(!alert_info.text1.isEmpty());
setVisible(alert_info.has_value());
update();
}
@ -396,18 +380,11 @@ void InfoLabel::paintEvent(QPaintEvent *event) {
p.drawRect(rect());
p.drawText(rect().adjusted(0, 0, 0, -THUMBNAIL_MARGIN), second, Qt::AlignHCenter | Qt::AlignBottom);
}
if (alert_info.text1.size() > 0) {
QColor color = timeline_colors[(int)TimelineType::AlertInfo];
if (alert_info.status == cereal::SelfdriveState::AlertStatus::USER_PROMPT) {
color = timeline_colors[(int)TimelineType::AlertWarning];
} else if (alert_info.status == cereal::SelfdriveState::AlertStatus::CRITICAL) {
color = timeline_colors[(int)TimelineType::AlertCritical];
}
if (alert_info) {
QColor color = timeline_colors[int(alert_info->type)];
color.setAlphaF(0.5);
QString text = alert_info.text1;
if (!alert_info.text2.isEmpty()) {
text += "\n" + alert_info.text2;
}
QString text = QString::fromStdString(alert_info->text1);
if (!alert_info->text2.empty()) text += "\n" + QString::fromStdString(alert_info->text2);
if (!pixmap.isNull()) {
QFont font;

@ -16,22 +16,17 @@
#include "selfdrive/ui/qt/widgets/cameraview.h"
#include "tools/cabana/utils/util.h"
#include "tools/replay/logreader.h"
struct AlertInfo {
cereal::SelfdriveState::AlertStatus status;
QString text1;
QString text2;
};
#include "tools/cabana/streams/replaystream.h"
class InfoLabel : public QWidget {
public:
InfoLabel(QWidget *parent);
void showPixmap(const QPoint &pt, const QString &sec, const QPixmap &pm, const AlertInfo &alert);
void showAlert(const AlertInfo &alert);
void showPixmap(const QPoint &pt, const QString &sec, const QPixmap &pm, const std::optional<Timeline::Entry> &alert);
void showAlert(const std::optional<Timeline::Entry> &alert);
void paintEvent(QPaintEvent *event) override;
QPixmap pixmap;
QString second;
AlertInfo alert_info;
std::optional<Timeline::Entry> alert_info;
};
class Slider : public QSlider {
@ -42,7 +37,7 @@ public:
double currentSecond() const { return value() / factor; }
void setCurrentSecond(double sec) { setValue(sec * factor); }
void setTimeRange(double min, double max);
AlertInfo alertInfo(double sec);
std::optional<Timeline::Entry> alertInfo(double sec);
QPixmap thumbnail(double sec);
void parseQLog(std::shared_ptr<LogReader> qlog);
@ -55,7 +50,6 @@ private:
void paintEvent(QPaintEvent *ev) override;
QMap<uint64_t, QPixmap> thumbnails;
std::map<uint64_t, AlertInfo> alerts;
InfoLabel *thumbnail_label;
};

@ -9,7 +9,8 @@ if arch == "Darwin":
else:
base_libs.append('OpenCL')
replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc", "route.cc", "util.cc"]
replay_lib_src = ["replay.cc", "consoleui.cc", "camera.cc", "filereader.cc", "logreader.cc", "framereader.cc",
"route.cc", "util.cc", "timeline.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs, FRAMEWORKS=base_frameworks)
Export('replay_lib')
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'zstd', 'curl', 'yuv', 'ncurses'] + base_libs

@ -1,5 +1,6 @@
#include "tools/replay/consoleui.h"
#include <time.h>
#include <initializer_list>
#include <string>
#include <tuple>
@ -152,7 +153,6 @@ void ConsoleUI::updateStatus() {
add_str(win, unit.c_str());
};
static const std::pair<const char *, Color> status_text[] = {
{"loading...", Color::Red},
{"playing", Color::Green},
{"paused...", Color::Yellow},
};
@ -161,8 +161,10 @@ void ConsoleUI::updateStatus() {
auto [status_str, status_color] = status_text[status];
write_item(0, 0, "STATUS: ", status_str, " ", false, status_color);
auto cur_ts = replay->routeDateTime() + (int)replay->currentSeconds();
char *time_string = ctime(&cur_ts);
std::string current_segment = " - " + std::to_string((int)(replay->currentSeconds() / 60));
write_item(0, 25, "TIME: ", replay->currentDateTime().toString("ddd MMMM dd hh:mm:ss").toStdString(), current_segment, true);
write_item(0, 25, "TIME: ", time_string, current_segment, true);
auto p = sm["liveParameters"].getLiveParameters();
write_item(1, 0, "STIFFNESS: ", util::string_format("%.2f %%", p.getStiffnessFactor() * 100), " ");
@ -247,18 +249,18 @@ void ConsoleUI::updateTimeline() {
wattroff(win, COLOR_PAIR(Color::Disengaged));
const int total_sec = replay->maxSeconds() - replay->minSeconds();
for (auto [begin, end, type] : replay->getTimeline()) {
int start_pos = ((begin - replay->minSeconds()) / total_sec) * width;
int end_pos = ((end - replay->minSeconds()) / total_sec) * width;
if (type == TimelineType::Engaged) {
for (const auto &entry : *replay->getTimeline()) {
int start_pos = ((entry.start_time - replay->minSeconds()) / total_sec) * width;
int end_pos = ((entry.end_time - replay->minSeconds()) / total_sec) * width;
if (entry.type == TimelineType::Engaged) {
mvwchgat(win, 1, start_pos, end_pos - start_pos + 1, A_COLOR, Color::Engaged, NULL);
mvwchgat(win, 2, start_pos, end_pos - start_pos + 1, A_COLOR, Color::Engaged, NULL);
} else if (type == TimelineType::UserFlag) {
} else if (entry.type == TimelineType::UserFlag) {
mvwchgat(win, 3, start_pos, end_pos - start_pos + 1, ACS_S3, Color::Cyan, NULL);
} else {
auto color_id = Color::Green;
if (type != TimelineType::AlertInfo) {
color_id = type == TimelineType::AlertWarning ? Color::Yellow : Color::Red;
if (entry.type != TimelineType::AlertInfo) {
color_id = entry.type == TimelineType::AlertWarning ? Color::Yellow : Color::Red;
}
mvwchgat(win, 3, start_pos, end_pos - start_pos + 1, ACS_S3, color_id, NULL);
}

@ -1,7 +1,5 @@
#include "tools/replay/replay.h"
#include <QDebug>
#include <QtConcurrent>
#include <capnp/dynamic.h>
#include <csignal>
#include "cereal/services.h"
@ -11,6 +9,14 @@
static void interrupt_sleep_handler(int signal) {}
// Helper function to notify events with safety checks
template <typename Callback, typename... Args>
void notifyEvent(Callback &callback, Args &&...args) {
if (callback) {
callback(std::forward<Args>(args)...);
}
}
Replay::Replay(const std::string &route, std::vector<std::string> allow, std::vector<std::string> block, SubMaster *sm_,
uint32_t flags, const std::string &data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) {
// Register signal handler for SIGUSR1
@ -40,7 +46,7 @@ Replay::Replay(const std::string &route, std::vector<std::string> allow, std::ve
}
}
rInfo("active services: %s", join(active_services, ',').c_str());
rInfo("active services: %s", join(active_services, ", ").c_str());
rInfo("loading route %s", route.c_str());
if (sm == nullptr) {
@ -68,7 +74,6 @@ void Replay::stop() {
stream_thread_ = nullptr;
rInfo("shutdown: done");
}
timeline_future.waitForFinished();
camera_server_.reset(nullptr);
segments_.clear();
}
@ -151,101 +156,11 @@ void Replay::checkSeekProgress() {
}
void Replay::seekToFlag(FindFlag flag) {
if (auto next = find(flag)) {
if (auto next = timeline_.find(currentSeconds(), flag)) {
seekTo(*next - 2, false); // seek to 2 seconds before next
}
}
void Replay::buildTimeline() {
uint64_t engaged_begin = 0;
bool engaged = false;
auto alert_status = cereal::SelfdriveState::AlertStatus::NORMAL;
auto alert_size = cereal::SelfdriveState::AlertSize::NONE;
uint64_t alert_begin = 0;
std::string alert_type;
const TimelineType timeline_types[] = {
[(int)cereal::SelfdriveState::AlertStatus::NORMAL] = TimelineType::AlertInfo,
[(int)cereal::SelfdriveState::AlertStatus::USER_PROMPT] = TimelineType::AlertWarning,
[(int)cereal::SelfdriveState::AlertStatus::CRITICAL] = TimelineType::AlertCritical,
};
const auto &route_segments = route_->segments();
for (auto it = route_segments.cbegin(); it != route_segments.cend() && !exit_; ++it) {
std::shared_ptr<LogReader> log(new LogReader());
if (!log->load(it->second.qlog, &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3) || log->events.empty()) continue;
std::vector<std::tuple<double, double, TimelineType>> timeline;
for (const Event &e : log->events) {
if (e.which == cereal::Event::Which::SELFDRIVE_STATE) {
capnp::FlatArrayMessageReader reader(e.data);
auto event = reader.getRoot<cereal::Event>();
auto cs = event.getSelfdriveState();
if (engaged != cs.getEnabled()) {
if (engaged) {
timeline.push_back({toSeconds(engaged_begin), toSeconds(e.mono_time), TimelineType::Engaged});
}
engaged_begin = e.mono_time;
engaged = cs.getEnabled();
}
if (alert_type != cs.getAlertType().cStr() || alert_status != cs.getAlertStatus()) {
if (!alert_type.empty() && alert_size != cereal::SelfdriveState::AlertSize::NONE) {
timeline.push_back({toSeconds(alert_begin), toSeconds(e.mono_time), timeline_types[(int)alert_status]});
}
alert_begin = e.mono_time;
alert_type = cs.getAlertType().cStr();
alert_size = cs.getAlertSize();
alert_status = cs.getAlertStatus();
}
} else if (e.which == cereal::Event::Which::USER_FLAG) {
timeline.push_back({toSeconds(e.mono_time), toSeconds(e.mono_time), TimelineType::UserFlag});
}
}
if (it->first == route_segments.rbegin()->first) {
if (engaged) {
timeline.push_back({toSeconds(engaged_begin), toSeconds(log->events.back().mono_time), TimelineType::Engaged});
}
if (!alert_type.empty() && alert_size != cereal::SelfdriveState::AlertSize::NONE) {
timeline.push_back({toSeconds(alert_begin), toSeconds(log->events.back().mono_time), timeline_types[(int)alert_status]});
}
max_seconds_ = std::ceil(toSeconds(log->events.back().mono_time));
emit minMaxTimeChanged(route_segments.cbegin()->first * 60.0, max_seconds_);
}
{
std::lock_guard lk(timeline_lock);
timeline_.insert(timeline_.end(), timeline.begin(), timeline.end());
std::sort(timeline_.begin(), timeline_.end(), [](auto &l, auto &r) { return std::get<2>(l) < std::get<2>(r); });
}
emit qLogLoaded(log);
}
}
std::optional<uint64_t> Replay::find(FindFlag flag) {
int cur_ts = currentSeconds();
for (auto [start_ts, end_ts, type] : getTimeline()) {
if (type == TimelineType::Engaged) {
if (flag == FindFlag::nextEngagement && start_ts > cur_ts) {
return start_ts;
} else if (flag == FindFlag::nextDisEngagement && end_ts > cur_ts) {
return end_ts;
}
} else if (start_ts > cur_ts) {
if ((flag == FindFlag::nextUserFlag && type == TimelineType::UserFlag) ||
(flag == FindFlag::nextInfo && type == TimelineType::AlertInfo) ||
(flag == FindFlag::nextWarning && type == TimelineType::AlertWarning) ||
(flag == FindFlag::nextCritical && type == TimelineType::AlertCritical)) {
return start_ts;
}
}
}
return std::nullopt;
}
void Replay::pause(bool pause) {
if (user_paused_ != pause) {
pauseStreamThread();
@ -266,16 +181,15 @@ void Replay::pauseStreamThread() {
}
}
void Replay::segmentLoadFinished(bool success) {
void Replay::segmentLoadFinished(int seg_num, bool success) {
if (!success) {
Segment *seg = qobject_cast<Segment *>(sender());
rWarning("failed to load segment %d, removing it from current replay list", seg->seg_num);
rWarning("failed to load segment %d, removing it from current replay list", seg_num);
updateEvents([&]() {
segments_.erase(seg->seg_num);
segments_.erase(seg_num);
return !segments_.empty();
});
}
updateSegmentsCache();
QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection);
}
void Replay::updateSegmentsCache() {
@ -306,8 +220,10 @@ void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator
auto it = std::find_if(first, last, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); });
if (it != last && !it->second) {
rDebug("loading segment %d...", it->first);
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_, filters_);
QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_, filters_,
[this](int seg_num, bool success) {
segmentLoadFinished(seg_num, success);
});
return true;
}
return false;
@ -373,7 +289,7 @@ void Replay::startStream(const Segment *cur_segment) {
auto event = reader.getRoot<cereal::Event>();
uint64_t wall_time = event.getInitData().getWallTimeNanos();
if (wall_time > 0) {
route_date_time_ = QDateTime::fromMSecsSinceEpoch(wall_time / 1e6);
route_date_time_ = wall_time / 1e6;
}
}
@ -405,12 +321,16 @@ void Replay::startStream(const Segment *cur_segment) {
}
emit segmentsMerged();
timeline_.initialize(*route_, route_start_ts_, !(flags_ & REPLAY_FLAG_NO_FILE_CACHE),
[this](std::shared_ptr<LogReader> log) {
notifyEvent(onQLogLoaded, log);
});
// start stream thread
stream_thread_ = new QThread();
QObject::connect(stream_thread_, &QThread::started, [=]() { streamThread(); });
stream_thread_->start();
timeline_future = QtConcurrent::run(this, &Replay::buildTimeline);
emit streamStarted();
}

@ -6,7 +6,6 @@
#include <optional>
#include <set>
#include <string>
#include <tuple>
#include <vector>
#include <utility>
@ -14,6 +13,7 @@
#include "tools/replay/camera.h"
#include "tools/replay/route.h"
#include "tools/replay/timeline.h"
#define DEMO_ROUTE "a2a0ccea32023010|2023-07-27--13-01-19"
@ -32,19 +32,8 @@ enum REPLAY_FLAGS {
REPLAY_FLAG_ALL_SERVICES = 0x0800,
};
enum class FindFlag {
nextEngagement,
nextDisEngagement,
nextUserFlag,
nextInfo,
nextWarning,
nextCritical
};
enum class TimelineType { None, Engaged, AlertInfo, AlertWarning, AlertCritical, UserFlag };
typedef bool (*replayEventFilter)(const Event *, void *);
typedef std::map<int, std::unique_ptr<Segment>> SegmentMap;
Q_DECLARE_METATYPE(std::shared_ptr<LogReader>);
class Replay : public QObject {
Q_OBJECT
@ -75,8 +64,7 @@ public:
inline void removeFlag(REPLAY_FLAGS flag) { flags_ &= ~flag; }
inline const Route* route() const { return route_.get(); }
inline double currentSeconds() const { return double(cur_mono_time_ - route_start_ts_) / 1e9; }
inline QDateTime routeDateTime() const { return route_date_time_; }
inline QDateTime currentDateTime() const { return route_date_time_.addSecs(currentSeconds()); }
inline std::time_t routeDateTime() const { return route_date_time_; }
inline uint64_t routeStartNanos() const { return route_start_ts_; }
inline double toSeconds(uint64_t mono_time) const { return (mono_time - route_start_ts_) / 1e9; }
inline double minSeconds() const { return !segments_.empty() ? segments_.begin()->first * 60 : 0; }
@ -85,22 +73,20 @@ public:
inline float getSpeed() const { return speed_; }
inline const SegmentMap &segments() const { return segments_; }
inline const std::string &carFingerprint() const { return car_fingerprint_; }
inline const std::vector<std::tuple<double, double, TimelineType>> getTimeline() {
std::lock_guard lk(timeline_lock);
return timeline_;
}
inline const std::shared_ptr<std::vector<Timeline::Entry>> getTimeline() const { return timeline_.get(); }
inline const std::optional<Timeline::Entry> findAlertAtTime(double sec) const { return timeline_.findAlertAtTime(sec); }
// Event callback functions
std::function<void(std::shared_ptr<LogReader>)> onQLogLoaded = nullptr;
signals:
void streamStarted();
void segmentsMerged();
void seeking(double sec);
void seekedTo(double sec);
void qLogLoaded(std::shared_ptr<LogReader> qlog);
void minMaxTimeChanged(double min_sec, double max_sec);
protected slots:
void segmentLoadFinished(bool success);
protected:
std::optional<uint64_t> find(FindFlag flag);
void pauseStreamThread();
@ -108,16 +94,18 @@ protected:
void streamThread();
void updateSegmentsCache();
void loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end);
void segmentLoadFinished(int seg_num, bool success);
void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end);
void updateEvents(const std::function<bool()>& update_events_function);
std::vector<Event>::const_iterator publishEvents(std::vector<Event>::const_iterator first,
std::vector<Event>::const_iterator last);
void publishMessage(const Event *e);
void publishFrame(const Event *e);
void buildTimeline();
void checkSeekProgress();
inline bool isSegmentMerged(int n) const { return merged_segments_.count(n) > 0; }
Timeline timeline_;
pthread_t stream_thread_id = 0;
QThread *stream_thread_ = nullptr;
std::mutex stream_lock_;
@ -130,7 +118,7 @@ protected:
std::atomic<bool> exit_ = false;
std::atomic<bool> paused_ = false;
bool events_ready_ = false;
QDateTime route_date_time_;
std::time_t route_date_time_;
uint64_t route_start_ts_ = 0;
std::atomic<uint64_t> cur_mono_time_ = 0;
std::atomic<double> max_seconds_ = 0;
@ -146,9 +134,6 @@ protected:
std::unique_ptr<CameraServer> camera_server_;
std::atomic<uint32_t> flags_ = REPLAY_FLAG_NONE;
std::mutex timeline_lock;
QFuture<void> timeline_future;
std::vector<std::tuple<double, double, TimelineType>> timeline_;
std::string car_fingerprint_;
std::atomic<float> speed_ = 1.0;
replayEventFilter event_filter = nullptr;

@ -1,10 +1,8 @@
#include "tools/replay/route.h"
#include <QDir>
#include <QEventLoop>
#include <QJsonArray>
#include <QJsonDocument>
#include <QtConcurrent>
#include <array>
#include <filesystem>
#include <regex>
@ -53,7 +51,11 @@ bool Route::load() {
rInfo("invalid route format");
return false;
}
date_time_ = QDateTime::fromString(route_.timestamp.c_str(), "yyyy-MM-dd--HH-mm-ss");
struct tm tm_time = {0};
strptime(route_.timestamp.c_str(), "%Y-%m-%d--%H-%M-%S", &tm_time);
date_time_ = mktime(&tm_time);
bool ret = data_dir_.empty() ? loadFromServer() : loadFromLocal();
if (ret) {
if (route_.begin_segment == -1) route_.begin_segment = segments_.rbegin()->first;
@ -151,8 +153,9 @@ void Route::addFileToSegment(int n, const std::string &file) {
// class Segment
Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector<bool> &filters)
: seg_num(n), flags(flags), filters_(filters) {
Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector<bool> &filters,
std::function<void(int, bool)> callback)
: seg_num(n), flags(flags), filters_(filters), onLoadFinished_(callback) {
// [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog
const std::array file_list = {
(flags & REPLAY_FLAG_QCAMERA) || files.road_cam.empty() ? files.qcamera : files.road_cam,
@ -163,16 +166,20 @@ Segment::Segment(int n, const SegmentFile &files, uint32_t flags, const std::vec
for (int i = 0; i < file_list.size(); ++i) {
if (!file_list[i].empty() && (!(flags & REPLAY_FLAG_NO_VIPC) || i >= MAX_CAMERAS)) {
++loading_;
synchronizer_.addFuture(QtConcurrent::run(this, &Segment::loadFile, i, file_list[i]));
threads_.emplace_back(&Segment::loadFile, this, i, file_list[i]);
}
}
}
Segment::~Segment() {
disconnect();
{
std::lock_guard lock(mutex_);
onLoadFinished_ = nullptr; // Prevent callback after destruction
}
abort_ = true;
synchronizer_.setCancelOnWait(true);
synchronizer_.waitForFinished();
for (auto &thread : threads_) {
if (thread.joinable()) thread.join();
}
}
void Segment::loadFile(int id, const std::string file) {
@ -192,6 +199,9 @@ void Segment::loadFile(int id, const std::string file) {
}
if (--loading_ == 0) {
emit loadFinished(!abort_);
std::lock_guard lock(mutex_);
if (onLoadFinished_) {
onLoadFinished_(seg_num, !abort_);
}
}
}

@ -2,11 +2,12 @@
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <QDateTime>
#include <QFutureSynchronizer>
#include <QString>
#include "tools/replay/framereader.h"
#include "tools/replay/logreader.h"
@ -43,7 +44,7 @@ public:
bool load();
RouteLoadError lastError() const { return err_; }
inline const std::string &name() const { return route_.str; }
inline const QDateTime datetime() const { return date_time_; }
inline const std::time_t datetime() const { return date_time_; }
inline const std::string &dir() const { return data_dir_; }
inline const RouteIdentifier &identifier() const { return route_; }
inline const std::map<int, SegmentFile> &segments() const { return segments_; }
@ -58,15 +59,14 @@ protected:
RouteIdentifier route_ = {};
std::string data_dir_;
std::map<int, SegmentFile> segments_;
QDateTime date_time_;
std::time_t date_time_;
RouteLoadError err_ = RouteLoadError::None;
};
class Segment : public QObject {
Q_OBJECT
class Segment {
public:
Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector<bool> &filters = {});
Segment(int n, const SegmentFile &files, uint32_t flags, const std::vector<bool> &filters,
std::function<void(int, bool)> callback);
~Segment();
inline bool isLoaded() const { return !loading_ && !abort_; }
@ -74,15 +74,14 @@ public:
std::unique_ptr<LogReader> log;
std::unique_ptr<FrameReader> frames[MAX_CAMERAS] = {};
signals:
void loadFinished(bool success);
protected:
void loadFile(int id, const std::string file);
std::atomic<bool> abort_ = false;
std::atomic<int> loading_ = 0;
QFutureSynchronizer<void> synchronizer_;
std::mutex mutex_;
std::vector<std::thread> threads_;
std::function<void(int, bool)> onLoadFinished_ = nullptr;
uint32_t flags;
std::vector<bool> filters_;
};

@ -72,9 +72,9 @@ TEST_CASE("LogReader") {
}
void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) {
QEventLoop loop;
Segment segment(n, segment_file, flags);
QObject::connect(&segment, &Segment::loadFinished, [&]() {
std::mutex mutex;
std::condition_variable cv;
Segment segment(n, segment_file, flags, {}, [&](int, bool) {
REQUIRE(segment.isLoaded() == true);
REQUIRE(segment.log != nullptr);
REQUIRE(segment.frames[RoadCam] != nullptr);
@ -105,10 +105,11 @@ void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) {
REQUIRE(fr->get(i, &buf));
}
}
loop.quit();
cv.notify_one();
});
loop.exec();
std::unique_lock lock(mutex);
cv.wait(lock);
}
std::string download_demo_route() {

@ -0,0 +1,109 @@
#include "tools/replay/timeline.h"
#include <array>
#include "cereal/gen/cpp/log.capnp.h"
Timeline::~Timeline() {
should_exit_.store(true);
if (thread_.joinable()) {
thread_.join();
}
}
void Timeline::initialize(const Route &route, uint64_t route_start_ts, bool local_cache,
std::function<void(std::shared_ptr<LogReader>)> callback) {
thread_ = std::thread(&Timeline::buildTimeline, this, route, route_start_ts, local_cache, callback);
}
std::optional<uint64_t> Timeline::find(double cur_ts, FindFlag flag) const {
for (const auto &entry : *get()) {
if (entry.type == TimelineType::Engaged) {
if (flag == FindFlag::nextEngagement && entry.start_time > cur_ts) {
return entry.start_time;
} else if (flag == FindFlag::nextDisEngagement && entry.end_time > cur_ts) {
return entry.end_time;
}
} else if (entry.start_time > cur_ts) {
if ((flag == FindFlag::nextUserFlag && entry.type == TimelineType::UserFlag) ||
(flag == FindFlag::nextInfo && entry.type == TimelineType::AlertInfo) ||
(flag == FindFlag::nextWarning && entry.type == TimelineType::AlertWarning) ||
(flag == FindFlag::nextCritical && entry.type == TimelineType::AlertCritical)) {
return entry.start_time;
}
}
}
return std::nullopt;
}
std::optional<Timeline::Entry> Timeline::findAlertAtTime(double target_time) const {
for (const auto &entry : *get()) {
if (entry.start_time > target_time) break;
if (entry.end_time >= target_time && entry.type >= TimelineType::AlertInfo) {
return entry;
}
}
return std::nullopt;
}
void Timeline::buildTimeline(const Route &route, uint64_t route_start_ts, bool local_cache,
std::function<void(std::shared_ptr<LogReader>)> callback) {
std::optional<size_t> current_engaged_idx, current_alert_idx;
for (const auto &segment : route.segments()) {
if (should_exit_) break;
auto log = std::make_shared<LogReader>();
if (!log->load(segment.second.qlog, &should_exit_, local_cache, 0, 3) || log->events.empty()) {
continue; // Skip if log loading fails or no events
}
for (const Event &e : log->events) {
double seconds = (e.mono_time - route_start_ts) / 1e9;
if (e.which == cereal::Event::Which::SELFDRIVE_STATE) {
capnp::FlatArrayMessageReader reader(e.data);
auto cs = reader.getRoot<cereal::Event>().getSelfdriveState();
updateEngagementStatus(cs, current_engaged_idx, seconds);
updateAlertStatus(cs, current_alert_idx, seconds);
} else if (e.which == cereal::Event::Which::USER_FLAG) {
staging_entries_.emplace_back(Entry{seconds, seconds, TimelineType::UserFlag});
}
}
callback(log); // Notify the callback once the log is processed
// Sort and finalize the timeline entries
std::sort(staging_entries_.begin(), staging_entries_.end(), [](auto &a, auto &b) { return a.start_time < b.start_time; });
timeline_entries_ = std::make_shared<std::vector<Entry>>(staging_entries_);
}
}
void Timeline::updateEngagementStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds) {
if (idx) staging_entries_[*idx].end_time = seconds;
if (cs.getEnabled()) {
if (!idx) {
idx = staging_entries_.size();
staging_entries_.emplace_back(Entry{seconds, seconds, TimelineType::Engaged});
}
} else {
idx.reset();
}
}
void Timeline::updateAlertStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds) {
static auto alert_types = std::array{TimelineType::AlertInfo, TimelineType::AlertWarning, TimelineType::AlertCritical};
Entry *entry = idx ? &staging_entries_[*idx] : nullptr;
if (entry) entry->end_time = seconds;
if (cs.getAlertSize() != cereal::SelfdriveState::AlertSize::NONE) {
auto type = alert_types[(int)cs.getAlertStatus()];
std::string text1 = cs.getAlertText1().cStr();
std::string text2 = cs.getAlertText2().cStr();
if (!entry || entry->type != type || entry->text1 != text1 || entry->text2 != text2) {
idx = staging_entries_.size();
staging_entries_.emplace_back(Entry{seconds, seconds, type, text1, text2}); // Start a new entry
}
} else {
idx.reset();
}
}

@ -0,0 +1,46 @@
#pragma once
#include <atomic>
#include <optional>
#include <thread>
#include <vector>
#include "tools/replay/route.h"
enum class TimelineType { None, Engaged, AlertInfo, AlertWarning, AlertCritical, UserFlag };
enum class FindFlag { nextEngagement, nextDisEngagement, nextUserFlag, nextInfo, nextWarning, nextCritical };
class Timeline {
public:
struct Entry {
double start_time;
double end_time;
TimelineType type;
std::string text1;
std::string text2;
};
Timeline() : timeline_entries_(std::make_shared<std::vector<Entry>>()) {}
~Timeline();
void initialize(const Route &route, uint64_t route_start_ts, bool local_cache,
std::function<void(std::shared_ptr<LogReader>)> callback);
std::optional<uint64_t> find(double cur_ts, FindFlag flag) const;
std::optional<Entry> findAlertAtTime(double target_time) const;
const std::shared_ptr<std::vector<Entry>> get() const { return timeline_entries_; }
private:
void buildTimeline(const Route &route, uint64_t route_start_ts, bool local_cache,
std::function<void(std::shared_ptr<LogReader>)> callback);
void updateEngagementStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds);
void updateAlertStatus(const cereal::SelfdriveState::Reader &cs, std::optional<size_t> &idx, double seconds);
std::thread thread_;
std::atomic<bool> should_exit_ = false;
// Temporarily holds entries before they are sorted and finalized
std::vector<Entry> staging_entries_;
// Final sorted timeline entries
std::shared_ptr<std::vector<Entry>> timeline_entries_;
};

@ -14,7 +14,6 @@
#include <map>
#include <mutex>
#include <numeric>
#include <sstream>
#include <utility>
#include <zstd.h>
@ -404,15 +403,6 @@ std::vector<std::string> split(std::string_view source, char delimiter) {
return fields;
}
std::string join(const std::vector<std::string> &elements, char separator) {
std::ostringstream oss;
for (size_t i = 0; i < elements.size(); ++i) {
if (i != 0) oss << separator;
oss << elements[i];
}
return oss.str();
}
std::string extractFileName(const std::string &file) {
size_t queryPos = file.find_first_of("?");
std::string path = (queryPos != std::string::npos) ? file.substr(0, queryPos) : file;

@ -3,6 +3,7 @@
#include <atomic>
#include <deque>
#include <functional>
#include <sstream>
#include <string>
#include <string_view>
#include <vector>
@ -59,6 +60,15 @@ typedef std::function<void(uint64_t cur, uint64_t total, bool success)> Download
void installDownloadProgressHandler(DownloadProgressHandler);
bool httpDownload(const std::string &url, const std::string &file, size_t chunk_size = 0, std::atomic<bool> *abort = nullptr);
std::string formattedDataSize(size_t size);
std::vector<std::string> split(std::string_view source, char delimiter);
std::string join(const std::vector<std::string> &elements, char separator);
std::string extractFileName(const std::string& file);
std::vector<std::string> split(std::string_view source, char delimiter);
template <typename Iterable>
std::string join(const Iterable& elements, const std::string& separator) {
std::ostringstream oss;
for (auto it = elements.begin(); it != elements.end(); ++it) {
if (it != elements.begin()) oss << separator;
oss << *it;
}
return oss.str();
}

Loading…
Cancel
Save