replay: bug fixes and improvements (#32193)

old-commit-hash: 2c409e0980
pull/32199/head
Dean Lee 1 year ago committed by GitHub
parent d63797aef9
commit 372bea14e2
  1. 9
      tools/cabana/streams/replaystream.cc
  2. 14
      tools/cabana/videowidget.cc
  3. 3
      tools/replay/consoleui.cc
  4. 5
      tools/replay/filereader.cc
  5. 43
      tools/replay/logreader.cc
  6. 32
      tools/replay/logreader.h
  7. 305
      tools/replay/replay.cc
  8. 33
      tools/replay/replay.h
  9. 2
      tools/replay/route.cc
  10. 70
      tools/replay/tests/test_replay.cc
  11. 33
      tools/replay/util.cc
  12. 2
      tools/replay/util.h

@ -33,13 +33,12 @@ void ReplayStream::mergeSegments() {
std::vector<const CanEvent *> new_events;
new_events.reserve(seg->log->events.size());
for (auto it = seg->log->events.cbegin(); it != seg->log->events.cend(); ++it) {
if ((*it)->which == cereal::Event::Which::CAN) {
const uint64_t ts = (*it)->mono_time;
capnp::FlatArrayMessageReader reader((*it)->data);
for (const Event &e : seg->log->events) {
if (e.which == cereal::Event::Which::CAN) {
capnp::FlatArrayMessageReader reader(e.data);
auto event = reader.getRoot<cereal::Event>();
for (const auto &c : event.getCan()) {
new_events.push_back(newEvent(ts, c));
new_events.push_back(newEvent(e.mono_time, c));
}
}
}

@ -257,13 +257,13 @@ void Slider::setTimeRange(double min, double max) {
void Slider::parseQLog(int segnum, std::shared_ptr<LogReader> qlog) {
const auto &segments = qobject_cast<ReplayStream *>(can)->route()->segments();
if (segments.size() > 0 && segnum == segments.rbegin()->first && !qlog->events.empty()) {
emit updateMaximumTime(qlog->events.back()->mono_time / 1e9 - can->routeStartTime());
emit updateMaximumTime(qlog->events.back().mono_time / 1e9 - can->routeStartTime());
}
std::mutex mutex;
QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event *e) {
if (e->which == cereal::Event::Which::THUMBNAIL) {
capnp::FlatArrayMessageReader reader(e->data);
QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event &e) {
if (e.which == cereal::Event::Which::THUMBNAIL) {
capnp::FlatArrayMessageReader reader(e.data);
auto thumb = reader.getRoot<cereal::Event>().getThumbnail();
auto data = thumb.getThumbnail();
if (QPixmap pm; pm.loadFromData(data.begin(), data.size(), "jpeg")) {
@ -271,13 +271,13 @@ void Slider::parseQLog(int segnum, std::shared_ptr<LogReader> qlog) {
std::lock_guard lk(mutex);
thumbnails[thumb.getTimestampEof()] = scaled;
}
} else if (e->which == cereal::Event::Which::CONTROLS_STATE) {
capnp::FlatArrayMessageReader reader(e->data);
} else if (e.which == cereal::Event::Which::CONTROLS_STATE) {
capnp::FlatArrayMessageReader reader(e.data);
auto cs = reader.getRoot<cereal::Event>().getControlsState();
if (cs.getAlertType().size() > 0 && cs.getAlertText1().size() > 0 &&
cs.getAlertSize() != cereal::ControlsState::AlertSize::NONE) {
std::lock_guard lk(mutex);
alerts.emplace(e->mono_time, AlertInfo{cs.getAlertStatus(), cs.getAlertText1().cStr(), cs.getAlertText2().cStr()});
alerts.emplace(e.mono_time, AlertInfo{cs.getAlertStatus(), cs.getAlertText1().cStr(), cs.getAlertText2().cStr()});
}
}
});

@ -172,7 +172,7 @@ void ConsoleUI::updateStatus() {
if (status != Status::Paused) {
auto events = replay->events();
uint64_t current_mono_time = replay->routeStartTime() + replay->currentSeconds() * 1e9;
bool playing = !events->empty() && events->back()->mono_time > current_mono_time;
bool playing = !events->empty() && events->back().mono_time > current_mono_time;
status = playing ? Status::Playing : Status::Waiting;
}
auto [status_str, status_color] = status_text[status];
@ -368,7 +368,6 @@ void ConsoleUI::handleKey(char c) {
} else if (c == ' ') {
pauseReplay(!replay->isPaused());
} else if (c == 'q' || c == 'Q') {
replay->stop();
qApp->exit();
}
}

@ -35,7 +35,10 @@ std::string FileReader::read(const std::string &file, std::atomic<bool> *abort)
std::string FileReader::download(const std::string &url, std::atomic<bool> *abort) {
for (int i = 0; i <= max_retries_ && !(abort && *abort); ++i) {
if (i > 0) rWarning("download failed, retrying %d", i);
if (i > 0) {
rWarning("download failed, retrying %d", i);
util::sleep_for(3000);
}
std::string result = httpGet(url, chunk_size_, abort);
if (!result.empty()) {

@ -4,16 +4,6 @@
#include "tools/replay/filereader.h"
#include "tools/replay/util.h"
LogReader::LogReader(size_t memory_pool_block_size) {
events.reserve(memory_pool_block_size);
}
LogReader::~LogReader() {
for (Event *e : events) {
delete e;
}
}
bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool local_cache, int chunk_size, int retries) {
raw_ = FileReader(local_cache, chunk_size, retries).read(url, abort);
if (raw_.empty()) return false;
@ -22,17 +12,13 @@ bool LogReader::load(const std::string &url, std::atomic<bool> *abort, bool loca
raw_ = decompressBZ2(raw_, abort);
if (raw_.empty()) return false;
}
return parse(abort);
return load(raw_.data(), raw_.size(), abort);
}
bool LogReader::load(const std::byte *data, size_t size, std::atomic<bool> *abort) {
raw_.assign((const char *)data, size);
return parse(abort);
}
bool LogReader::parse(std::atomic<bool> *abort) {
bool LogReader::load(const char *data, size_t size, std::atomic<bool> *abort) {
try {
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
events.reserve(65000);
kj::ArrayPtr<const capnp::word> words((const capnp::word *)data, size / sizeof(capnp::word));
while (words.size() > 0 && !(abort && *abort)) {
capnp::FlatArrayMessageReader reader(words);
auto event = reader.getRoot<cereal::Event>();
@ -40,16 +26,16 @@ bool LogReader::parse(std::atomic<bool> *abort) {
uint64_t mono_time = event.getLogMonoTime();
auto event_data = kj::arrayPtr(words.begin(), reader.getEnd());
Event *evt = events.emplace_back(newEvent(which, mono_time, event_data));
const Event &evt = events.emplace_back(which, mono_time, event_data);
// Add encodeIdx packet again as a frame packet for the video stream
if (evt->which == cereal::Event::ROAD_ENCODE_IDX ||
evt->which == cereal::Event::DRIVER_ENCODE_IDX ||
evt->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
if (evt.which == cereal::Event::ROAD_ENCODE_IDX ||
evt.which == cereal::Event::DRIVER_ENCODE_IDX ||
evt.which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
auto idx = capnp::AnyStruct::Reader(event).getPointerSection()[0].getAs<cereal::EncodeIndex>();
if (uint64_t sof = idx.getTimestampSof()) {
mono_time = sof;
}
events.emplace_back(newEvent(which, mono_time, event_data, idx.getSegmentNum()));
events.emplace_back(which, mono_time, event_data, idx.getSegmentNum());
}
words = kj::arrayPtr(reader.getEnd(), words.end());
@ -59,16 +45,9 @@ bool LogReader::parse(std::atomic<bool> *abort) {
}
if (!events.empty() && !(abort && *abort)) {
std::sort(events.begin(), events.end(), Event::lessThan());
events.shrink_to_fit();
std::sort(events.begin(), events.end());
return true;
}
return false;
}
Event *LogReader::newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &words, int eidx_segnum) {
#ifdef HAS_MEMORY_RESOURCE
return new (&mbr_) Event(which, mono_time, words, eidx_segnum);
#else
return new Event(which, mono_time, words, eidx_segnum);
#endif
}

@ -1,10 +1,5 @@
#pragma once
#if __has_include(<memory_resource>)
#define HAS_MEMORY_RESOURCE 1
#include <memory_resource>
#endif
#include <memory>
#include <string>
#include <vector>
@ -13,27 +8,15 @@
const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
const int MAX_CAMERAS = std::size(ALL_CAMERAS);
const int DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE = 65000;
class Event {
public:
Event(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &data, int eidx_segnum = -1)
: which(which), mono_time(mono_time), data(data), eidx_segnum(eidx_segnum) {}
struct lessThan {
inline bool operator()(const Event *l, const Event *r) {
return l->mono_time < r->mono_time || (l->mono_time == r->mono_time && l->which < r->which);
}
};
#if HAS_MEMORY_RESOURCE
void *operator new(size_t size, std::pmr::monotonic_buffer_resource *mbr) {
return mbr->allocate(size);
}
void operator delete(void *ptr) {
// No-op. memory used by EventMemoryPool increases monotonically until the logReader is destroyed.
bool operator<(const Event &other) const {
return mono_time < other.mono_time || (mono_time == other.mono_time && which < other.which);
}
#endif
uint64_t mono_time;
cereal::Event::Which which;
@ -43,18 +26,11 @@ public:
class LogReader {
public:
LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE);
~LogReader();
bool load(const std::string &url, std::atomic<bool> *abort = nullptr,
bool local_cache = false, int chunk_size = -1, int retries = 0);
bool load(const std::byte *data, size_t size, std::atomic<bool> *abort = nullptr);
std::vector<Event*> events;
bool load(const char *data, size_t size, std::atomic<bool> *abort = nullptr);
std::vector<Event> events;
private:
Event *newEvent(cereal::Event::Which which, uint64_t mono_time, const kj::ArrayPtr<const capnp::word> &words, int eidx_segnum = -1);
bool parse(std::atomic<bool> *abort);
std::string raw_;
#ifdef HAS_MEMORY_RESOURCE
std::pmr::monotonic_buffer_resource mbr_{DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE * sizeof(Event)};
#endif
};

@ -2,15 +2,20 @@
#include <QDebug>
#include <QtConcurrent>
#include <capnp/dynamic.h>
#include <csignal>
#include "cereal/services.h"
#include "common/params.h"
#include "common/timing.h"
#include "tools/replay/util.h"
static void interrupt_sleep_handler(int signal) {}
Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_,
uint32_t flags, QString data_dir, QObject *parent) : sm(sm_), flags_(flags), QObject(parent) {
// Register signal handler for SIGUSR1
std::signal(SIGUSR1, interrupt_sleep_handler);
if (!(flags_ & REPLAY_FLAG_ALL_SERVICES)) {
block << "uiDebug" << "userFlag";
}
@ -33,28 +38,21 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
pm = std::make_unique<PubMaster>(s);
}
route_ = std::make_unique<Route>(route, data_dir);
events_ = std::make_unique<std::vector<Event *>>();
new_events_ = std::make_unique<std::vector<Event *>>();
}
Replay::~Replay() {
stop();
}
void Replay::stop() {
if (!stream_thread_ && segments_.empty()) return;
rInfo("shutdown: in progress...");
if (stream_thread_ != nullptr) {
exit_ = updating_events_ = true;
exit_ =true;
paused_ = true;
stream_cv_.notify_one();
stream_thread_->quit();
stream_thread_->wait();
stream_thread_ = nullptr;
delete stream_thread_;
}
camera_server_.reset(nullptr);
timeline_future.waitForFinished();
segments_.clear();
rInfo("shutdown: done");
}
@ -84,13 +82,12 @@ void Replay::start(int seconds) {
seekTo(route_->identifier().begin_segment * 60 + seconds, false);
}
void Replay::updateEvents(const std::function<bool()> &lambda) {
// set updating_events to true to force stream thread release the lock and wait for events_updated.
updating_events_ = true;
void Replay::updateEvents(const std::function<bool()> &update_events_function) {
pauseStreamThread();
{
std::unique_lock lk(stream_lock_);
events_updated_ = lambda();
updating_events_ = false;
events_ready_ = update_events_function();
paused_ = user_paused_;
}
stream_cv_.notify_one();
}
@ -117,7 +114,7 @@ void Replay::seekTo(double seconds, bool relative) {
}
return segment_merged;
});
queueSegment();
updateSegmentsCache();
}
void Replay::seekToFlag(FindFlag flag) {
@ -146,34 +143,34 @@ void Replay::buildTimeline() {
std::shared_ptr<LogReader> log(new LogReader());
if (!log->load(it->second.qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue;
for (const Event *e : log->events) {
if (e->which == cereal::Event::Which::CONTROLS_STATE) {
capnp::FlatArrayMessageReader reader(e->data);
for (const Event &e : log->events) {
if (e.which == cereal::Event::Which::CONTROLS_STATE) {
capnp::FlatArrayMessageReader reader(e.data);
auto event = reader.getRoot<cereal::Event>();
auto cs = event.getControlsState();
if (engaged != cs.getEnabled()) {
if (engaged) {
std::lock_guard lk(timeline_lock);
timeline.push_back({toSeconds(engaged_begin), toSeconds(e->mono_time), TimelineType::Engaged});
timeline.push_back({toSeconds(engaged_begin), toSeconds(e.mono_time), TimelineType::Engaged});
}
engaged_begin = e->mono_time;
engaged_begin = e.mono_time;
engaged = cs.getEnabled();
}
if (alert_type != cs.getAlertType().cStr() || alert_status != cs.getAlertStatus()) {
if (!alert_type.empty() && alert_size != cereal::ControlsState::AlertSize::NONE) {
std::lock_guard lk(timeline_lock);
timeline.push_back({toSeconds(alert_begin), toSeconds(e->mono_time), timeline_types[(int)alert_status]});
timeline.push_back({toSeconds(alert_begin), toSeconds(e.mono_time), timeline_types[(int)alert_status]});
}
alert_begin = e->mono_time;
alert_begin = e.mono_time;
alert_type = cs.getAlertType().cStr();
alert_size = cs.getAlertSize();
alert_status = cs.getAlertStatus();
}
} else if (e->which == cereal::Event::Which::USER_FLAG) {
} else if (e.which == cereal::Event::Which::USER_FLAG) {
std::lock_guard lk(timeline_lock);
timeline.push_back({toSeconds(e->mono_time), toSeconds(e->mono_time), TimelineType::UserFlag});
timeline.push_back({toSeconds(e.mono_time), toSeconds(e.mono_time), TimelineType::UserFlag});
}
}
std::sort(timeline.begin(), timeline.end(), [](auto &l, auto &r) { return std::get<2>(l) < std::get<2>(r); });
@ -203,16 +200,22 @@ std::optional<uint64_t> Replay::find(FindFlag flag) {
}
void Replay::pause(bool pause) {
updateEvents([=]() {
rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds());
paused_ = pause;
return true;
});
if (user_paused_ != pause) {
pauseStreamThread();
{
std::unique_lock lk(stream_lock_);
rWarning("%s at %.2f s", pause ? "paused..." : "resuming", currentSeconds());
paused_ = user_paused_ = pause;
}
stream_cv_.notify_one();
}
}
void Replay::setCurrentSegment(int n) {
if (current_segment_.exchange(n) != n) {
QMetaObject::invokeMethod(this, &Replay::queueSegment, Qt::QueuedConnection);
void Replay::pauseStreamThread() {
paused_ = true;
// Send SIGUSR1 to interrupt clock_nanosleep
if (stream_thread_ && stream_thread_id) {
pthread_kill(stream_thread_id, SIGUSR1);
}
}
@ -222,27 +225,22 @@ void Replay::segmentLoadFinished(bool success) {
rWarning("failed to load segment %d, removing it from current replay list", seg->seg_num);
updateEvents([&]() {
segments_.erase(seg->seg_num);
return true;
return !segments_.empty();
});
}
queueSegment();
updateSegmentsCache();
}
void Replay::queueSegment() {
void Replay::updateSegmentsCache() {
auto cur = segments_.lower_bound(current_segment_.load());
if (cur == segments_.end()) return;
// Calculate the range of segments to load
auto begin = std::prev(cur, std::min<int>(segment_cache_limit / 2, std::distance(segments_.begin(), cur)));
auto end = std::next(begin, std::min<int>(segment_cache_limit, std::distance(begin, segments_.end())));
begin = std::prev(end, std::min<int>(segment_cache_limit, std::distance(segments_.begin(), end)));
// load one segment at a time
auto it = std::find_if(cur, end, [](auto &it) { return !it.second || !it.second->isLoaded(); });
if (it != end && !it->second) {
rDebug("loading segment %d...", it->first);
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_);
QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
}
loadSegmentInRange(begin, cur, end);
mergeSegments(begin, end);
// free segments out of current semgnt window.
@ -257,69 +255,81 @@ void Replay::queueSegment() {
}
}
void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) {
auto loadNext = [this](auto begin, auto end) {
auto it = std::find_if(begin, end, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); });
if (it != end && !it->second) {
rDebug("loading segment %d...", it->first);
it->second = std::make_unique<Segment>(it->first, route_->at(it->first), flags_);
QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
return true;
}
return false;
};
// Load forward segments, then try reverse
if (!loadNext(cur, end)) {
loadNext(std::make_reverse_iterator(cur), segments_.rend());
}
}
void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
std::vector<int> segments_need_merge;
std::set<int> segments_to_merge;
size_t new_events_size = 0;
for (auto it = begin; it != end; ++it) {
if (it->second && it->second->isLoaded()) {
segments_need_merge.push_back(it->first);
segments_to_merge.insert(it->first);
new_events_size += it->second->log->events.size();
}
}
if (segments_need_merge != segments_merged_) {
std::string s;
for (int i = 0; i < segments_need_merge.size(); ++i) {
s += std::to_string(segments_need_merge[i]);
if (i != segments_need_merge.size() - 1) s += ", ";
}
rDebug("merge segments %s", s.c_str());
new_events_->clear();
new_events_->reserve(new_events_size);
for (int n : segments_need_merge) {
size_t size = new_events_->size();
const auto &events = segments_[n]->log->events;
std::copy_if(events.begin(), events.end(), std::back_inserter(*new_events_),
[this](auto e) { return e->which < sockets_.size() && sockets_[e->which] != nullptr; });
std::inplace_merge(new_events_->begin(), new_events_->begin() + size, new_events_->end(), Event::lessThan());
}
if (segments_to_merge == merged_segments_) return;
if (stream_thread_) {
emit segmentsMerged();
rDebug("merge segments %s", std::accumulate(segments_to_merge.begin(), segments_to_merge.end(), std::string{},
[](auto & a, int b) { return a + (a.empty() ? "" : ", ") + std::to_string(b); }).c_str());
// Check if seeking is in progress
if (seeking_to_seconds_ >= 0) {
int target_segment = int(seeking_to_seconds_ / 60);
auto segment_found = std::find(segments_need_merge.begin(), segments_need_merge.end(), target_segment);
std::vector<Event> new_events;
new_events.reserve(new_events_size);
// If the target segment is found, emit seekedTo signal and reset seeking_to_seconds_
if (segment_found != segments_need_merge.end()) {
emit seekedTo(seeking_to_seconds_);
seeking_to_seconds_ = -1; // Reset seeking_to_seconds_ to indicate completion of seek
}
}
// Merge events from segments_to_merge into new_events
for (int n : segments_to_merge) {
size_t size = new_events.size();
const auto &events = segments_.at(n)->log->events;
std::copy_if(events.begin(), events.end(), std::back_inserter(new_events),
[this](const Event &e) { return e.which < sockets_.size() && sockets_[e.which] != nullptr; });
std::inplace_merge(new_events.begin(), new_events.begin() + size, new_events.end());
}
if (stream_thread_) {
emit segmentsMerged();
// Check if seeking is in progress
int target_segment = int(seeking_to_seconds_ / 60);
if (seeking_to_seconds_ >= 0 && segments_to_merge.count(target_segment) > 0) {
emit seekedTo(seeking_to_seconds_);
seeking_to_seconds_ = -1; // Reset seeking_to_seconds_ to indicate completion of seek
}
updateEvents([&]() {
events_.swap(new_events_);
segments_merged_ = segments_need_merge;
// Do not wake up the stream thread if the current segment has not been merged.
return isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0);
});
}
updateEvents([&]() {
events_.swap(new_events);
merged_segments_ = segments_to_merge;
// Wake up the stream thread if the current segment is loaded or invalid.
return isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0);
});
}
void Replay::startStream(const Segment *cur_segment) {
const auto &events = cur_segment->log->events;
route_start_ts_ = events.front()->mono_time;
route_start_ts_ = events.front().mono_time;
cur_mono_time_ += route_start_ts_ - 1;
// get datetime from INIT_DATA, fallback to datetime in the route name
route_date_time_ = route()->datetime();
auto it = std::find_if(events.cbegin(), events.cend(),
[](auto e) { return e->which == cereal::Event::Which::INIT_DATA; });
[](const Event &e) { return e.which == cereal::Event::Which::INIT_DATA; });
if (it != events.cend()) {
capnp::FlatArrayMessageReader reader((*it)->data);
capnp::FlatArrayMessageReader reader(it->data);
auto event = reader.getRoot<cereal::Event>();
uint64_t wall_time = event.getInitData().getWallTimeNanos();
if (wall_time > 0) {
@ -328,9 +338,9 @@ void Replay::startStream(const Segment *cur_segment) {
}
// write CarParams
it = std::find_if(events.begin(), events.end(), [](auto e) { return e->which == cereal::Event::Which::CAR_PARAMS; });
it = std::find_if(events.begin(), events.end(), [](const Event &e) { return e.which == cereal::Event::Which::CAR_PARAMS; });
if (it != events.end()) {
capnp::FlatArrayMessageReader reader((*it)->data);
capnp::FlatArrayMessageReader reader(it->data);
auto event = reader.getRoot<cereal::Event>();
car_fingerprint_ = event.getCarParams().getCarFingerprint();
capnp::MallocMessageBuilder builder;
@ -357,8 +367,7 @@ void Replay::startStream(const Segment *cur_segment) {
emit segmentsMerged();
// start stream thread
stream_thread_ = new QThread();
QObject::connect(stream_thread_, &QThread::started, [=]() { stream(); });
QObject::connect(stream_thread_, &QThread::finished, stream_thread_, &QThread::deleteLater);
QObject::connect(stream_thread_, &QThread::started, [=]() { streamThread(); });
stream_thread_->start();
timeline_future = QtConcurrent::run(this, &Replay::buildTimeline);
@ -382,83 +391,54 @@ void Replay::publishMessage(const Event *e) {
}
void Replay::publishFrame(const Event *e) {
static const std::map<cereal::Event::Which, CameraType> cam_types{
{cereal::Event::ROAD_ENCODE_IDX, RoadCam},
{cereal::Event::DRIVER_ENCODE_IDX, DriverCam},
{cereal::Event::WIDE_ROAD_ENCODE_IDX, WideRoadCam},
};
if ((e->which == cereal::Event::DRIVER_ENCODE_IDX && !hasFlag(REPLAY_FLAG_DCAM)) ||
(e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX && !hasFlag(REPLAY_FLAG_ECAM))) {
return;
CameraType cam;
switch (e->which) {
case cereal::Event::ROAD_ENCODE_IDX: cam = RoadCam; break;
case cereal::Event::DRIVER_ENCODE_IDX: cam = DriverCam; break;
case cereal::Event::WIDE_ROAD_ENCODE_IDX: cam = WideRoadCam; break;
default: return; // Invalid event type
}
if ((cam == DriverCam && !hasFlag(REPLAY_FLAG_DCAM)) || (cam == WideRoadCam && !hasFlag(REPLAY_FLAG_ECAM)))
return; // Camera isdisabled
if (isSegmentMerged(e->eidx_segnum)) {
auto &segment = segments_.at(e->eidx_segnum);
auto cam = cam_types.at(e->which);
if (auto &frame = segment->frames[cam]; frame) {
camera_server_->pushFrame(cam, frame.get(), e);
}
}
}
void Replay::stream() {
void Replay::streamThread() {
stream_thread_id = pthread_self();
cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA;
double prev_replay_speed = speed_;
std::unique_lock lk(stream_lock_);
while (true) {
stream_cv_.wait(lk, [=]() { return exit_ || (events_updated_ && !paused_); });
events_updated_ = false;
stream_cv_.wait(lk, [=]() { return exit_ || ( events_ready_ && !paused_); });
if (exit_) break;
Event cur_event{cur_which, cur_mono_time_, {}};
auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan());
if (eit == events_->end()) {
Event event(cur_which, cur_mono_time_, {});
auto first = std::upper_bound(events_.cbegin(), events_.cend(), event);
if (first == events_.cend()) {
rInfo("waiting for events...");
events_ready_ = false;
continue;
}
uint64_t evt_start_ts = cur_mono_time_;
uint64_t loop_start_ts = nanos_since_boot();
for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) {
const Event *evt = (*eit);
cur_which = evt->which;
cur_mono_time_ = evt->mono_time;
setCurrentSegment(toSeconds(cur_mono_time_) / 60);
if (sockets_[cur_which] != nullptr) {
// keep time
long etime = (cur_mono_time_ - evt_start_ts) / speed_;
long rtime = nanos_since_boot() - loop_start_ts;
long behind_ns = etime - rtime;
// if behind_ns is greater than 1 second, it means that an invalid segment is skipped by seeking/replaying
if (behind_ns >= 1 * 1e9 || speed_ != prev_replay_speed) {
// reset event start times
evt_start_ts = cur_mono_time_;
loop_start_ts = nanos_since_boot();
prev_replay_speed = speed_;
} else if (behind_ns > 0) {
precise_nano_sleep(behind_ns);
}
auto it = publishEvents(first, events_.cend());
if (evt->eidx_segnum == -1) {
publishMessage(evt);
} else if (camera_server_) {
if (speed_ > 1.0) {
camera_server_->waitForSent();
}
publishFrame(evt);
}
}
}
// wait for frame to be sent before unlock.(frameReader may be deleted after unlock)
// Ensure frames are sent before unlocking to prevent race conditions
if (camera_server_) {
camera_server_->waitForSent();
}
if (eit == events_->end() && !hasFlag(REPLAY_FLAG_NO_LOOP)) {
int last_segment = segments_.empty() ? 0 : segments_.rbegin()->first;
if (it != events_.cend()) {
cur_which = it->which;
} else if (!hasFlag(REPLAY_FLAG_NO_LOOP)) {
// Check for loop end and restart if necessary
int last_segment = segments_.rbegin()->first;
if (current_segment_ >= last_segment && isSegmentMerged(last_segment)) {
rInfo("reaches the end of route, restart from beginning");
QMetaObject::invokeMethod(this, std::bind(&Replay::seekTo, this, 0, false), Qt::QueuedConnection);
@ -466,3 +446,48 @@ void Replay::stream() {
}
}
}
std::vector<Event>::const_iterator Replay::publishEvents(std::vector<Event>::const_iterator first,
std::vector<Event>::const_iterator last) {
uint64_t evt_start_ts = cur_mono_time_;
uint64_t loop_start_ts = nanos_since_boot();
double prev_replay_speed = speed_;
for (; !paused_ && first != last; ++first) {
const Event &evt = *first;
int segment = toSeconds(evt.mono_time) / 60;
if (current_segment_ != segment) {
current_segment_ = segment;
QMetaObject::invokeMethod(this, &Replay::updateSegmentsCache, Qt::QueuedConnection);
}
// Skip events if socket is not present
if (!sockets_[evt.which]) continue;
int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (nanos_since_boot() - loop_start_ts);
// if time_diff is greater than 1 second, it means that an invalid segment is skipped
if (time_diff >= 1e9 || speed_ != prev_replay_speed) {
// reset event start times
evt_start_ts = evt.mono_time;
loop_start_ts = nanos_since_boot();
prev_replay_speed = speed_;
} else if (time_diff > 0) {
precise_nano_sleep(time_diff);
}
if (paused_) break;
cur_mono_time_ = evt.mono_time;
if (evt.eidx_segnum == -1) {
publishMessage(&evt);
} else if (camera_server_) {
if (speed_ > 1.0) {
camera_server_->waitForSent();
}
publishFrame(&evt);
}
}
return first;
}

@ -4,6 +4,7 @@
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <tuple>
#include <vector>
@ -53,11 +54,10 @@ public:
~Replay();
bool load();
void start(int seconds = 0);
void stop();
void pause(bool pause);
void seekToFlag(FindFlag flag);
void seekTo(double seconds, bool relative);
inline bool isPaused() const { return paused_; }
inline bool isPaused() const { return user_paused_; }
// the filter is called in streaming thread.try to return quickly from it to avoid blocking streaming.
// the filter function must return true if the event should be filtered.
// otherwise it must return false.
@ -79,7 +79,7 @@ public:
inline int totalSeconds() const { return (!segments_.empty()) ? (segments_.rbegin()->first + 1) * 60 : 0; }
inline void setSpeed(float speed) { speed_ = speed; }
inline float getSpeed() const { return speed_; }
inline const std::vector<Event *> *events() const { return events_.get(); }
inline const std::vector<Event> *events() const { return &events_; }
inline const std::map<int, std::unique_ptr<Segment>> &segments() const { return segments_; }
inline const std::string &carFingerprint() const { return car_fingerprint_; }
inline const std::vector<std::tuple<double, double, TimelineType>> getTimeline() {
@ -99,36 +99,37 @@ protected slots:
protected:
typedef std::map<int, std::unique_ptr<Segment>> SegmentMap;
std::optional<uint64_t> find(FindFlag flag);
void pauseStreamThread();
void startStream(const Segment *cur_segment);
void stream();
void setCurrentSegment(int n);
void queueSegment();
void streamThread();
void updateSegmentsCache();
void loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end);
void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end);
void updateEvents(const std::function<bool()>& lambda);
void updateEvents(const std::function<bool()>& update_events_function);
std::vector<Event>::const_iterator publishEvents(std::vector<Event>::const_iterator first,
std::vector<Event>::const_iterator last);
void publishMessage(const Event *e);
void publishFrame(const Event *e);
void buildTimeline();
inline bool isSegmentMerged(int n) {
return std::find(segments_merged_.begin(), segments_merged_.end(), n) != segments_merged_.end();
}
inline bool isSegmentMerged(int n) const { return merged_segments_.count(n) > 0; }
pthread_t stream_thread_id = 0;
QThread *stream_thread_ = nullptr;
std::mutex stream_lock_;
bool user_paused_ = false;
std::condition_variable stream_cv_;
std::atomic<bool> updating_events_ = false;
std::atomic<int> current_segment_ = 0;
double seeking_to_seconds_ = -1;
SegmentMap segments_;
// the following variables must be protected with stream_lock_
std::atomic<bool> exit_ = false;
bool paused_ = false;
bool events_updated_ = false;
std::atomic<bool> paused_ = false;
bool events_ready_ = false;
QDateTime route_date_time_;
uint64_t route_start_ts_ = 0;
std::atomic<uint64_t> cur_mono_time_ = 0;
std::unique_ptr<std::vector<Event *>> events_;
std::unique_ptr<std::vector<Event *>> new_events_;
std::vector<int> segments_merged_;
std::vector<Event> events_;
std::set<int> merged_segments_;
// messaging
SubMaster *sm = nullptr;

@ -77,7 +77,7 @@ bool Route::loadFromServer(int retries) {
return false;
}
rWarning("Retrying %d/%d", i, retries);
util::sleep_for(500);
util::sleep_for(3000);
}
return false;
}

@ -1,7 +1,6 @@
#include <chrono>
#include <thread>
#include <QDebug>
#include <QEventLoop>
#include "catch2/catch.hpp"
@ -67,7 +66,7 @@ TEST_CASE("LogReader") {
corrupt_content.resize(corrupt_content.length() / 2);
corrupt_content = decompressBZ2(corrupt_content);
LogReader log;
REQUIRE(log.load((std::byte *)corrupt_content.data(), corrupt_content.size()));
REQUIRE(log.load(corrupt_content.data(), corrupt_content.size()));
REQUIRE(log.events.size() > 0);
}
}
@ -88,7 +87,7 @@ void read_segment(int n, const SegmentFile &segment_file, uint32_t flags) {
// test LogReader & FrameReader
REQUIRE(segment.log->events.size() > 0);
REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end(), Event::lessThan()));
REQUIRE(std::is_sorted(segment.log->events.begin(), segment.log->events.end()));
for (auto cam : ALL_CAMERAS) {
auto &fr = segment.frames[cam];
@ -158,63 +157,20 @@ TEST_CASE("Remote route") {
}
}
// helper class for unit tests
class TestReplay : public Replay {
public:
TestReplay(const QString &route, uint32_t flags = REPLAY_FLAG_NO_FILE_CACHE | REPLAY_FLAG_NO_VIPC) : Replay(route, {}, {}, nullptr, flags) {}
void test_seek();
void testSeekTo(int seek_to);
};
void TestReplay::testSeekTo(int seek_to) {
seekTo(seek_to, false);
while (true) {
std::unique_lock lk(stream_lock_);
stream_cv_.wait(lk, [=]() { return events_updated_ == true; });
events_updated_ = false;
if (cur_mono_time_ != route_start_ts_ + seek_to * 1e9) {
// wake up by the previous merging, skip it.
continue;
}
Event cur_event(cereal::Event::Which::INIT_DATA, cur_mono_time_, {});
auto eit = std::upper_bound(events_->begin(), events_->end(), &cur_event, Event::lessThan());
if (eit == events_->end()) {
qDebug() << "waiting for events...";
continue;
}
REQUIRE(std::is_sorted(events_->begin(), events_->end(), Event::lessThan()));
const int seek_to_segment = seek_to / 60;
const int event_seconds = ((*eit)->mono_time - route_start_ts_) / 1e9;
current_segment_ = event_seconds / 60;
INFO("seek to [" << seek_to << "s segment " << seek_to_segment << "], events [" << event_seconds << "s segment" << current_segment_ << "]");
REQUIRE(event_seconds >= seek_to);
if (event_seconds > seek_to) {
auto it = segments_.lower_bound(seek_to_segment);
REQUIRE(it->first == current_segment_);
}
break;
}
}
void TestReplay::test_seek() {
// create a dummy stream thread
stream_thread_ = new QThread(this);
TEST_CASE("seek_to") {
QEventLoop loop;
std::thread thread = std::thread([&]() {
for (int i = 0; i < 10; ++i) {
testSeekTo(util::random_int(0, 2 * 60));
}
int seek_to = util::random_int(0, 2 * 59);
Replay replay(DEMO_ROUTE, {}, {}, nullptr, REPLAY_FLAG_NO_VIPC);
QObject::connect(&replay, &Replay::seekedTo, [&](double sec) {
INFO("seek to " << seek_to << "s seeked to" << sec);
REQUIRE(sec >= seek_to);
loop.quit();
});
loop.exec();
thread.join();
}
TEST_CASE("Replay") {
TestReplay replay(DEMO_ROUTE);
REQUIRE(replay.load());
replay.test_seek();
replay.start();
replay.seekTo(seek_to, false);
loop.exec();
}

@ -4,10 +4,10 @@
#include <curl/curl.h>
#include <openssl/sha.h>
#include <cstdarg>
#include <cstring>
#include <cassert>
#include <cmath>
#include <cstdarg>
#include <cstring>
#include <fstream>
#include <iostream>
#include <map>
@ -158,7 +158,10 @@ size_t getRemoteFileSize(const std::string &url, std::atomic<bool> *abort) {
int still_running = 1;
while (still_running > 0 && !(abort && *abort)) {
CURLMcode mc = curl_multi_perform(cm, &still_running);
if (!mc) curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
if (mc != CURLM_OK) break;
if (still_running > 0) {
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
}
}
double content_length = -1;
@ -208,10 +211,20 @@ bool httpDownload(const std::string &url, T &buf, size_t chunk_size, size_t cont
}
int still_running = 1;
size_t prev_written = 0;
while (still_running > 0 && !(abort && *abort)) {
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
curl_multi_perform(cm, &still_running);
download_stats.update(url, written);
CURLMcode mc = curl_multi_perform(cm, &still_running);
if (mc != CURLM_OK) {
break;
}
if (still_running > 0) {
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
}
if (((written - prev_written) / (double)content_length) >= 0.01) {
download_stats.update(url, written);
prev_written = written;
}
}
CURLMsg *msg;
@ -304,9 +317,11 @@ std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool>
return {};
}
void precise_nano_sleep(long sleep_ns) {
struct timespec req = {.tv_sec = 0, .tv_nsec = sleep_ns};
struct timespec rem = {};
void precise_nano_sleep(int64_t nanoseconds) {
struct timespec req, rem;
req.tv_sec = nanoseconds / 1e9;
req.tv_nsec = nanoseconds % (int64_t)1e9;
while (clock_nanosleep(CLOCK_MONOTONIC, 0, &req, &rem) && errno == EINTR) {
// Retry sleep if interrupted by a signal
req = rem;

@ -21,7 +21,7 @@ void logMessage(ReplyMsgType type, const char* fmt, ...);
#define rError(fmt, ...) ::logMessage(ReplyMsgType::Critical , fmt, ## __VA_ARGS__)
std::string sha256(const std::string &str);
void precise_nano_sleep(long sleep_ns);
void precise_nano_sleep(int64_t nanoseconds);
std::string decompressBZ2(const std::string &in, std::atomic<bool> *abort = nullptr);
std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic<bool> *abort = nullptr);
std::string getUrlWithoutQuery(const std::string &url);

Loading…
Cancel
Save