cabana: avoid dead locks and improve responsiveness (#32740)

avoid dead locks and improve responsive
pull/30575/head^2
Dean Lee 11 months ago committed by GitHub
parent 02ed9c584c
commit 865b98a5c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      tools/cabana/chart/chart.cc
  2. 24
      tools/cabana/chart/chartswidget.cc
  3. 16
      tools/cabana/chart/chartswidget.h
  4. 1
      tools/cabana/mainwin.cc
  5. 22
      tools/cabana/streams/abstractstream.cc
  6. 8
      tools/cabana/streams/abstractstream.h
  7. 7
      tools/cabana/streams/replaystream.cc
  8. 3
      tools/cabana/streams/replaystream.h
  9. 23
      tools/cabana/videowidget.cc
  10. 8
      tools/cabana/videowidget.h
  11. 1
      tools/replay/camera.cc
  12. 6
      tools/replay/logreader.cc
  13. 88
      tools/replay/replay.cc
  14. 11
      tools/replay/replay.h
  15. 36
      tools/replay/util.cc
  16. 2
      tools/replay/util.h

@ -513,7 +513,7 @@ void ChartView::mouseReleaseEvent(QMouseEvent *event) {
// no rubber dragged, seek to mouse position
can->seekTo(min);
} else if (rubber->width() > 10 && (max - min) > MIN_ZOOM_SECONDS) {
charts_widget->zoom_undo_stack->push(new ZoomCommand(charts_widget, {min, max}));
charts_widget->zoom_undo_stack->push(new ZoomCommand({min, max}));
} else {
viewport()->update();
}

@ -103,6 +103,8 @@ ChartsWidget::ChartsWidget(QWidget *parent) : QFrame(parent) {
QObject::connect(dbc(), &DBCManager::DBCFileChanged, this, &ChartsWidget::removeAll);
QObject::connect(can, &AbstractStream::eventsMerged, this, &ChartsWidget::eventsMerged);
QObject::connect(can, &AbstractStream::msgsReceived, this, &ChartsWidget::updateState);
QObject::connect(can, &AbstractStream::seeking, this, &ChartsWidget::updateState);
QObject::connect(can, &AbstractStream::timeRangeChanged, this, &ChartsWidget::timeRangeChanged);
QObject::connect(range_slider, &QSlider::valueChanged, this, &ChartsWidget::setMaxChartRange);
QObject::connect(new_plot_btn, &QToolButton::clicked, this, &ChartsWidget::newChart);
QObject::connect(remove_all_btn, &QToolButton::clicked, this, &ChartsWidget::removeAll);
@ -159,16 +161,13 @@ void ChartsWidget::eventsMerged(const MessageEventsMap &new_events) {
}
}
void ChartsWidget::setZoom(double min, double max) {
zoomed_range = {min, max};
is_zoomed = zoomed_range != display_range;
void ChartsWidget::timeRangeChanged(const std::optional<std::pair<double, double>> &time_range) {
updateToolBar();
updateState();
emit rangeChanged(min, max, is_zoomed);
}
void ChartsWidget::zoomReset() {
setZoom(display_range.first, display_range.second);
can->setTimeRange(std::nullopt);
zoom_undo_stack->clear();
}
@ -186,8 +185,9 @@ void ChartsWidget::showValueTip(double sec) {
void ChartsWidget::updateState() {
if (charts.isEmpty()) return;
const auto &time_range = can->timeRange();
const double cur_sec = can->currentSec();
if (!is_zoomed) {
if (!time_range.has_value()) {
double pos = (cur_sec - display_range.first) / std::max<float>(1.0, max_chart_range);
if (pos < 0 || pos > 0.8) {
display_range.first = std::max(0.0, cur_sec - max_chart_range * 0.1);
@ -195,13 +195,9 @@ void ChartsWidget::updateState() {
double max_sec = std::min(display_range.first + max_chart_range, can->totalSeconds());
display_range.first = std::max(0.0, max_sec - max_chart_range);
display_range.second = display_range.first + max_chart_range;
} else if (cur_sec < (zoomed_range.first - 0.1) || cur_sec >= zoomed_range.second) {
// loop in zoomed range
QTimer::singleShot(0, [ts = zoomed_range.first]() { can->seekTo(ts);});
return;
}
const auto &range = is_zoomed ? zoomed_range : display_range;
const auto &range = time_range ? *time_range : display_range;
for (auto c : charts) {
c->updatePlot(cur_sec, range.first, range.second);
}
@ -217,12 +213,14 @@ void ChartsWidget::updateToolBar() {
title_label->setText(tr("Charts: %1").arg(charts.size()));
columns_action->setText(tr("Column: %1").arg(column_count));
range_lb->setText(utils::formatSeconds(max_chart_range));
bool is_zoomed = can->timeRange().has_value();
range_lb_action->setVisible(!is_zoomed);
range_slider_action->setVisible(!is_zoomed);
undo_zoom_action->setVisible(is_zoomed);
redo_zoom_action->setVisible(is_zoomed);
reset_zoom_action->setVisible(is_zoomed);
reset_zoom_btn->setText(is_zoomed ? tr("%1-%2").arg(zoomed_range.first, 0, 'f', 2).arg(zoomed_range.second, 0, 'f', 2) : "");
reset_zoom_btn->setText(is_zoomed ? tr("%1-%2").arg(can->timeRange()->first, 0, 'f', 2).arg(can->timeRange()->second, 0, 'f', 2) : "");
remove_all_btn->setEnabled(!charts.isEmpty());
dock_btn->setIcon(docking ? "arrow-up-right-square" : "arrow-down-left-square");
dock_btn->setToolTip(docking ? tr("Undock charts") : tr("Dock charts"));
@ -252,7 +250,7 @@ ChartView *ChartsWidget::findChart(const MessageId &id, const cabana::Signal *si
}
ChartView *ChartsWidget::createChart() {
auto chart = new ChartView(is_zoomed ? zoomed_range : display_range, this);
auto chart = new ChartView(can->timeRange().value_or(display_range), this);
chart->setFixedHeight(settings.chart_height);
chart->setMinimumWidth(CHART_MIN_WIDTH);
chart->setSizePolicy(QSizePolicy::MinimumExpanding, QSizePolicy::Fixed);

@ -46,11 +46,10 @@ public:
public slots:
void setColumnCount(int n);
void removeAll();
void setZoom(double min, double max);
void timeRangeChanged(const std::optional<std::pair<double, double>> &time_range);
signals:
void dock(bool floating);
void rangeChanged(double min, double max, bool is_zommed);
void seriesChanged();
private:
@ -102,9 +101,7 @@ private:
ChartsContainer *charts_container;
QScrollArea *charts_scroll;
uint32_t max_chart_range = 0;
bool is_zoomed = false;
std::pair<double, double> display_range;
std::pair<double, double> zoomed_range;
QAction *columns_action;
int column_count = 1;
int current_column_count = 0;
@ -119,12 +116,11 @@ private:
class ZoomCommand : public QUndoCommand {
public:
ZoomCommand(ChartsWidget *charts, std::pair<double, double> range) : charts(charts), range(range), QUndoCommand() {
prev_range = charts->is_zoomed ? charts->zoomed_range : charts->display_range;
ZoomCommand(std::pair<double, double> range) : range(range), QUndoCommand() {
prev_range = can->timeRange();
setText(QObject::tr("Zoom to %1-%2").arg(range.first, 0, 'f', 2).arg(range.second, 0, 'f', 2));
}
void undo() override { charts->setZoom(prev_range.first, prev_range.second); }
void redo() override { charts->setZoom(range.first, range.second); }
ChartsWidget *charts;
std::pair<double, double> prev_range, range;
void undo() override { can->setTimeRange(prev_range); }
void redo() override { can->setTimeRange(range); }
std::optional<std::pair<double, double>> prev_range, range;
};

@ -192,7 +192,6 @@ void MainWindow::createDockWidgets() {
video_splitter = new QSplitter(Qt::Vertical, this);
video_widget = new VideoWidget(this);
video_splitter->addWidget(video_widget);
QObject::connect(charts_widget, &ChartsWidget::rangeChanged, video_widget, &VideoWidget::updateTimeRange);
video_splitter->addWidget(charts_container);
video_splitter->setStretchFactor(1, 1);

@ -92,13 +92,23 @@ void AbstractStream::updateLastMessages() {
std::set<MessageId> msgs;
{
std::lock_guard lk(mutex_);
double max_sec = 0;
for (const auto &id : new_msgs_) {
const auto &can_data = messages_[id];
current_sec_ = std::max(current_sec_, can_data.ts);
max_sec = std::max(max_sec, can_data.ts);
last_msgs[id] = can_data;
sources.insert(id.source);
}
msgs = std::move(new_msgs_);
if (!new_msgs_.empty()) {
msgs = std::move(new_msgs_);
current_sec_ = max_sec;
}
}
if (time_range_ && (current_sec_ < time_range_->first || current_sec_ >= time_range_->second)) {
seekTo(time_range_->first);
return;
}
if (sources.size() != prev_src_size) {
@ -108,6 +118,14 @@ void AbstractStream::updateLastMessages() {
emit msgsReceived(&msgs, prev_msg_size != last_msgs.size());
}
void AbstractStream::setTimeRange(const std::optional<std::pair<double, double>> &range) {
time_range_ = range;
if (time_range_ && (current_sec_ < time_range_->first || current_sec_ >= time_range_->second)) {
seekTo(time_range_->first);
}
emit timeRangeChanged(time_range_);
}
void AbstractStream::updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size) {
std::lock_guard lk(mutex_);
messages_[id].compute(id, data, size, sec, getSpeed(), masks_[id]);

@ -3,8 +3,10 @@
#include <array>
#include <memory>
#include <mutex>
#include <optional>
#include <set>
#include <unordered_map>
#include <utility>
#include <vector>
#include <QColor>
@ -77,6 +79,8 @@ public:
virtual double getSpeed() { return 1; }
virtual bool isPaused() const { return false; }
virtual void pause(bool pause) {}
void setTimeRange(const std::optional<std::pair<double, double>> &range);
const std::optional<std::pair<double, double>> &timeRange() const { return time_range_; }
inline const std::unordered_map<MessageId, CanData> &lastMessages() const { return last_msgs; }
inline const MessageEventsMap &eventsMap() const { return events_; }
@ -91,8 +95,9 @@ public:
signals:
void paused();
void resume();
void seekingTo(double sec);
void seeking(double sec);
void seekedTo(double sec);
void timeRangeChanged(const std::optional<std::pair<double, double>> &range);
void streamStarted();
void eventsMerged(const MessageEventsMap &events_map);
void msgsReceived(const std::set<MessageId> *new_msgs, bool has_new_ids);
@ -110,6 +115,7 @@ protected:
std::vector<const CanEvent *> all_events_;
double current_sec_ = 0;
std::optional<std::pair<double, double>> time_range_;
uint64_t lastest_event_ts = 0;
private:

@ -53,9 +53,9 @@ bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint
{}, nullptr, replay_flags, data_dir, this));
replay->setSegmentCacheLimit(settings.max_cached_minutes);
replay->installEventFilter(event_filter, this);
QObject::connect(replay.get(), &Replay::seeking, this, &AbstractStream::seeking);
QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo);
QObject::connect(replay.get(), &Replay::segmentsMerged, this, &ReplayStream::mergeSegments);
QObject::connect(replay.get(), &Replay::qLogLoaded, this, &ReplayStream::qLogLoaded, Qt::QueuedConnection);
return replay->load();
}
@ -92,12 +92,7 @@ bool ReplayStream::eventFilter(const Event *event) {
}
void ReplayStream::seekTo(double ts) {
// Update timestamp and notify receivers of the time change.
current_sec_ = ts;
std::set<MessageId> new_msgs;
msgsReceived(&new_msgs, false);
// Seek to the specified timestamp
replay->seekTo(std::max(double(0), ts), false);
}

@ -34,9 +34,6 @@ public:
void pause(bool pause) override;
static AbstractOpenStreamWidget *widget(AbstractStream **stream);
signals:
void qLogLoaded(int segnum, std::shared_ptr<LogReader> qlog);
private:
void mergeSegments();
std::unique_ptr<Replay> replay = nullptr;

@ -38,6 +38,8 @@ VideoWidget::VideoWidget(QWidget *parent) : QFrame(parent) {
QObject::connect(can, &AbstractStream::paused, this, &VideoWidget::updatePlayBtnState);
QObject::connect(can, &AbstractStream::resume, this, &VideoWidget::updatePlayBtnState);
QObject::connect(can, &AbstractStream::msgsReceived, this, &VideoWidget::updateState);
QObject::connect(can, &AbstractStream::seeking, this, &VideoWidget::updateState);
QObject::connect(can, &AbstractStream::timeRangeChanged, this, &VideoWidget::timeRangeChanged);
updatePlayBtnState();
setWhatsThis(tr(R"(
@ -150,14 +152,16 @@ QWidget *VideoWidget::createCameraWidget() {
setMaximumTime(can->totalSeconds());
QObject::connect(slider, &QSlider::sliderReleased, [this]() { can->seekTo(slider->currentSecond()); });
QObject::connect(slider, &Slider::updateMaximumTime, this, &VideoWidget::setMaximumTime, Qt::QueuedConnection);
QObject::connect(can, &AbstractStream::eventsMerged, this, [this]() { slider->update(); });
QObject::connect(static_cast<ReplayStream*>(can), &ReplayStream::qLogLoaded, slider, &Slider::parseQLog);
QObject::connect(cam_widget, &CameraWidget::clicked, []() { can->pause(!can->isPaused()); });
QObject::connect(cam_widget, &CameraWidget::vipcAvailableStreamsUpdated, this, &VideoWidget::vipcAvailableStreamsUpdated);
QObject::connect(camera_tab, &QTabBar::currentChanged, [this](int index) {
if (index != -1) cam_widget->setStreamType((VisionStreamType)camera_tab->tabData(index).toInt());
});
auto replay = static_cast<ReplayStream*>(can)->getReplay();
QObject::connect(replay, &Replay::qLogLoaded, slider, &Slider::parseQLog, Qt::QueuedConnection);
QObject::connect(replay, &Replay::totalSecondsUpdated, this, &VideoWidget::setMaximumTime, Qt::QueuedConnection);
return w;
}
@ -198,13 +202,13 @@ void VideoWidget::setMaximumTime(double sec) {
slider->setTimeRange(0, sec);
}
void VideoWidget::updateTimeRange(double min, double max, bool is_zoomed) {
void VideoWidget::timeRangeChanged(const std::optional<std::pair<double, double>> &time_range) {
if (can->liveStreaming()) {
skip_to_end_btn->setEnabled(!is_zoomed);
skip_to_end_btn->setEnabled(!time_range.has_value());
return;
}
is_zoomed ? slider->setTimeRange(min, max)
: slider->setTimeRange(0, maximum_time);
time_range ? slider->setTimeRange(time_range->first, time_range->second)
: slider->setTimeRange(0, maximum_time);
}
QString VideoWidget::formatTime(double sec, bool include_milliseconds) {
@ -255,12 +259,7 @@ void Slider::setTimeRange(double min, double max) {
setRange(min * factor, max * factor);
}
void Slider::parseQLog(int segnum, std::shared_ptr<LogReader> qlog) {
const auto &segments = qobject_cast<ReplayStream *>(can)->route()->segments();
if (segments.size() > 0 && segnum == segments.rbegin()->first && !qlog->events.empty()) {
emit updateMaximumTime(qlog->events.back().mono_time / 1e9 - can->routeStartTime());
}
void Slider::parseQLog(std::shared_ptr<LogReader> qlog) {
std::mutex mutex;
QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event &e) {
if (e.which == cereal::Event::Which::THUMBNAIL) {

@ -3,6 +3,7 @@
#include <map>
#include <memory>
#include <set>
#include <utility>
#include <QHBoxLayout>
#include <QFrame>
@ -40,13 +41,10 @@ public:
void setTimeRange(double min, double max);
AlertInfo alertInfo(double sec);
QPixmap thumbnail(double sec);
void parseQLog(int segnum, std::shared_ptr<LogReader> qlog);
void parseQLog(std::shared_ptr<LogReader> qlog);
const double factor = 1000.0;
signals:
void updateMaximumTime(double);
private:
void mousePressEvent(QMouseEvent *e) override;
void mouseMoveEvent(QMouseEvent *e) override;
@ -63,11 +61,11 @@ class VideoWidget : public QFrame {
public:
VideoWidget(QWidget *parnet = nullptr);
void updateTimeRange(double min, double max, bool is_zommed);
void setMaximumTime(double sec);
protected:
QString formatTime(double sec, bool include_milliseconds = false);
void timeRangeChanged(const std::optional<std::pair<double, double>> &time_range);
void updateState();
void updatePlayBtnState();
QWidget *createCameraWidget();

@ -60,7 +60,6 @@ void CameraServer::cameraThread(Camera &cam) {
capnp::FlatArrayMessageReader reader(event->data);
auto evt = reader.getRoot<cereal::Event>();
auto eidx = capnp::AnyStruct::Reader(evt).getPointerSection()[0].getAs<cereal::EncodeIndex>();
if (eidx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C) continue;
int segment_id = eidx.getSegmentId();
uint32_t frame_id = eidx.getFrameId();

@ -42,10 +42,10 @@ bool LogReader::load(const char *data, size_t size, std::atomic<bool> *abort) {
evt.which == cereal::Event::DRIVER_ENCODE_IDX ||
evt.which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
auto idx = capnp::AnyStruct::Reader(event).getPointerSection()[0].getAs<cereal::EncodeIndex>();
if (uint64_t sof = idx.getTimestampSof()) {
mono_time = sof;
if (idx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) {
uint64_t sof = idx.getTimestampSof();
events.emplace_back(which, sof ? sof : mono_time, event_data, idx.getSegmentNum());
}
events.emplace_back(which, mono_time, event_data, idx.getSegmentNum());
}
}
} catch (const kj::Exception &e) {

@ -55,11 +55,11 @@ void Replay::stop() {
rInfo("shutdown: in progress...");
if (stream_thread_ != nullptr) {
exit_ = true;
paused_ = true;
pauseStreamThread();
stream_cv_.notify_one();
stream_thread_->quit();
stream_thread_->wait();
delete stream_thread_;
stream_thread_->deleteLater();
stream_thread_ = nullptr;
}
timeline_future.waitForFinished();
@ -104,28 +104,40 @@ void Replay::updateEvents(const std::function<bool()> &update_events_function) {
void Replay::seekTo(double seconds, bool relative) {
updateEvents([&]() {
seeking_to_seconds_ = relative ? seconds + currentSeconds() : seconds;
seeking_to_seconds_ = std::max(double(0.0), seeking_to_seconds_);
int target_segment = (int)seeking_to_seconds_ / 60;
double target_time = relative ? seconds + currentSeconds() : seconds;
target_time = std::max(double(0.0), target_time);
int target_segment = (int)target_time / 60;
if (segments_.count(target_segment) == 0) {
rWarning("can't seek to %d s segment %d is invalid", (int)seeking_to_seconds_, target_segment);
rWarning("Can't seek to %d s segment %d is invalid", (int)target_time, target_segment);
return true;
}
rInfo("seeking to %d s, segment %d", (int)seeking_to_seconds_, target_segment);
rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment);
current_segment_ = target_segment;
cur_mono_time_ = route_start_ts_ + seeking_to_seconds_ * 1e9;
bool segment_merged = isSegmentMerged(target_segment);
if (segment_merged) {
emit seekedTo(seeking_to_seconds_);
// Reset seeking_to_seconds_ to indicate completion of seek
seeking_to_seconds_ = -1;
}
return segment_merged;
cur_mono_time_ = route_start_ts_ + target_time * 1e9;
seeking_to_ = target_time;
return false;
});
checkSeekProgress();
updateSegmentsCache();
}
void Replay::checkSeekProgress() {
if (seeking_to_) {
auto it = segments_.find(int(*seeking_to_ / 60));
if (it != segments_.end() && it->second && it->second->isLoaded()) {
emit seekedTo(*seeking_to_);
seeking_to_ = std::nullopt;
// wake up stream thread
updateEvents([]() { return true; });
} else {
// Emit signal indicating the ongoing seek operation
emit seeking(*seeking_to_);
}
}
}
void Replay::seekToFlag(FindFlag flag) {
if (auto next = find(flag)) {
seekTo(*next - 2, false); // seek to 2 seconds before next
@ -150,8 +162,9 @@ void Replay::buildTimeline() {
const auto &route_segments = route_->segments();
for (auto it = route_segments.cbegin(); it != route_segments.cend() && !exit_; ++it) {
std::shared_ptr<LogReader> log(new LogReader());
if (!log->load(it->second.qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue;
if (!log->load(it->second.qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3) || log->events.empty()) continue;
std::vector<std::tuple<double, double, TimelineType>> timeline;
for (const Event &e : log->events) {
if (e.which == cereal::Event::Which::CONTROLS_STATE) {
capnp::FlatArrayMessageReader reader(e.data);
@ -160,7 +173,6 @@ void Replay::buildTimeline() {
if (engaged != cs.getEnabled()) {
if (engaged) {
std::lock_guard lk(timeline_lock);
timeline.push_back({toSeconds(engaged_begin), toSeconds(e.mono_time), TimelineType::Engaged});
}
engaged_begin = e.mono_time;
@ -169,7 +181,6 @@ void Replay::buildTimeline() {
if (alert_type != cs.getAlertType().cStr() || alert_status != cs.getAlertStatus()) {
if (!alert_type.empty() && alert_size != cereal::ControlsState::AlertSize::NONE) {
std::lock_guard lk(timeline_lock);
timeline.push_back({toSeconds(alert_begin), toSeconds(e.mono_time), timeline_types[(int)alert_status]});
}
alert_begin = e.mono_time;
@ -178,12 +189,20 @@ void Replay::buildTimeline() {
alert_status = cs.getAlertStatus();
}
} else if (e.which == cereal::Event::Which::USER_FLAG) {
std::lock_guard lk(timeline_lock);
timeline.push_back({toSeconds(e.mono_time), toSeconds(e.mono_time), TimelineType::UserFlag});
}
}
std::sort(timeline.begin(), timeline.end(), [](auto &l, auto &r) { return std::get<2>(l) < std::get<2>(r); });
emit qLogLoaded(it->first, log);
{
std::lock_guard lk(timeline_lock);
timeline_.insert(timeline_.end(), timeline.begin(), timeline.end());
std::sort(timeline_.begin(), timeline_.end(), [](auto &l, auto &r) { return std::get<2>(l) < std::get<2>(r); });
}
if (it->first == route_segments.rbegin()->first) {
emit totalSecondsUpdated(toSeconds(log->events.back().mono_time));
}
emit qLogLoaded(log);
}
}
@ -260,14 +279,13 @@ void Replay::updateSegmentsCache() {
const auto &cur_segment = cur->second;
if (stream_thread_ == nullptr && cur_segment->isLoaded()) {
startStream(cur_segment.get());
emit streamStarted();
}
}
void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) {
auto loadNext = [this](auto begin, auto end) {
auto it = std::find_if(begin, end, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); });
if (it != end && !it->second) {
auto loadNextSegment = [this](auto first, auto last) {
auto it = std::find_if(first, last, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); });
if (it != last && !it->second) {
rDebug("loading segment %d...", it->first);
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_, filters_);
QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
@ -276,9 +294,9 @@ void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator
return false;
};
// Load forward segments, then try reverse
if (!loadNext(cur, end)) {
loadNext(std::make_reverse_iterator(cur), segments_.rend());
// Try loading forward segments, then reverse segments
if (!loadNextSegment(cur, end)) {
loadNextSegment(std::make_reverse_iterator(cur), std::make_reverse_iterator(begin));
}
}
@ -316,15 +334,10 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
updateEvents([&]() {
events_.swap(new_events);
merged_segments_ = segments_to_merge;
// Check if seeking is in progress
int target_segment = int(seeking_to_seconds_ / 60);
if (seeking_to_seconds_ >= 0 && segments_to_merge.count(target_segment) > 0) {
emit seekedTo(seeking_to_seconds_);
seeking_to_seconds_ = -1; // Reset seeking_to_seconds_ to indicate completion of seek
}
// Wake up the stream thread if the current segment is loaded or invalid.
return isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0);
return !seeking_to_ && (isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0));
});
checkSeekProgress();
}
void Replay::startStream(const Segment *cur_segment) {
@ -379,6 +392,7 @@ void Replay::startStream(const Segment *cur_segment) {
stream_thread_->start();
timeline_future = QtConcurrent::run(this, &Replay::buildTimeline);
emit streamStarted();
}
void Replay::publishMessage(const Event *e) {
@ -473,6 +487,7 @@ std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::con
// Skip events if socket is not present
if (!sockets_[evt.which]) continue;
cur_mono_time_ = evt.mono_time;
const uint64_t current_nanos = nanos_since_boot();
const int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (current_nanos - loop_start_ts);
@ -484,12 +499,11 @@ std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::con
loop_start_ts = current_nanos;
prev_replay_speed = speed_;
} else if (time_diff > 0) {
precise_nano_sleep(time_diff);
precise_nano_sleep(time_diff, paused_);
}
if (paused_) break;
cur_mono_time_ = evt.mono_time;
if (evt.eidx_segnum == -1) {
publishMessage(&evt);
} else if (camera_server_) {

@ -85,14 +85,16 @@ public:
inline const std::string &carFingerprint() const { return car_fingerprint_; }
inline const std::vector<std::tuple<double, double, TimelineType>> getTimeline() {
std::lock_guard lk(timeline_lock);
return timeline;
return timeline_;
}
signals:
void streamStarted();
void segmentsMerged();
void seeking(double sec);
void seekedTo(double sec);
void qLogLoaded(int segnum, std::shared_ptr<LogReader> qlog);
void qLogLoaded(std::shared_ptr<LogReader> qlog);
void totalSecondsUpdated(double sec);
protected slots:
void segmentLoadFinished(bool success);
@ -112,6 +114,7 @@ protected:
void publishMessage(const Event *e);
void publishFrame(const Event *e);
void buildTimeline();
void checkSeekProgress();
inline bool isSegmentMerged(int n) const { return merged_segments_.count(n) > 0; }
pthread_t stream_thread_id = 0;
@ -120,7 +123,7 @@ protected:
bool user_paused_ = false;
std::condition_variable stream_cv_;
std::atomic<int> current_segment_ = 0;
double seeking_to_seconds_ = -1;
std::optional<double> seeking_to_;
SegmentMap segments_;
// the following variables must be protected with stream_lock_
std::atomic<bool> exit_ = false;
@ -143,7 +146,7 @@ protected:
std::mutex timeline_lock;
QFuture<void> timeline_future;
std::vector<std::tuple<double, double, TimelineType>> timeline;
std::vector<std::tuple<double, double, TimelineType>> timeline_;
std::string car_fingerprint_;
std::atomic<float> speed_ = 1.0;
replayEventFilter event_filter = nullptr;

@ -318,33 +318,23 @@ std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool>
return {};
}
void precise_nano_sleep(int64_t nanoseconds) {
#ifdef __APPLE__
const long estimate_ns = 1 * 1e6; // 1ms
struct timespec req = {.tv_nsec = estimate_ns};
uint64_t start_sleep = nanos_since_boot();
while (nanoseconds > estimate_ns) {
nanosleep(&req, nullptr);
uint64_t end_sleep = nanos_since_boot();
nanoseconds -= (end_sleep - start_sleep);
start_sleep = end_sleep;
}
// spin wait
if (nanoseconds > 0) {
while ((nanos_since_boot() - start_sleep) <= nanoseconds) {
std::this_thread::yield();
}
}
#else
void precise_nano_sleep(int64_t nanoseconds, std::atomic<bool> &should_exit) {
struct timespec req, rem;
req.tv_sec = nanoseconds / 1e9;
req.tv_nsec = nanoseconds % (int64_t)1e9;
while (clock_nanosleep(CLOCK_MONOTONIC, 0, &req, &rem) && errno == EINTR) {
req.tv_sec = nanoseconds / 1000000000;
req.tv_nsec = nanoseconds % 1000000000;
while (!should_exit) {
#ifdef __APPLE_
int ret = nanosleep(&req, &rem);
if (ret == 0 || errno != EINTR)
break;
#else
int ret = clock_nanosleep(CLOCK_MONOTONIC, 0, &req, &rem);
if (ret == 0 || ret != EINTR)
break;
#endif
// Retry sleep if interrupted by a signal
req = rem;
}
#endif
}
std::string sha256(const std::string &str) {

@ -37,7 +37,7 @@ private:
};
std::string sha256(const std::string &str);
void precise_nano_sleep(int64_t nanoseconds);
void precise_nano_sleep(int64_t nanoseconds, std::atomic<bool> &should_exit);
std::string decompressBZ2(const std::string &in, std::atomic<bool> *abort = nullptr);
std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool> *abort = nullptr);
std::string getUrlWithoutQuery(const std::string &url);

Loading…
Cancel
Save