replay: refactor Route and Segment (#22531)

* new functions

* fix wrong call to qUrl::isLocalFile

* cleanup

* keep extension in cached files

* cleanup

* simplify segment

* delete thread

* add output

* pre-decompress

* remove suffix

* revert remove suffix

* 1 connection for log file

* cleanup

* segment may not be continuous,use map

* don't emit finish if aborting_

* use QFuture and thread pool

* cleanup

* fix segfault in LoadFromLocal

* cleanup

* handle segment failed to load

* output info

* continue error handling

* Remove redundant testSeekTo

* cleanup

* always return true

* keep time is ok now

change to 1s

write 1 byte at the end of the sparse file

* log loading segment

* merge #22476

* Update selfdrive/ui/replay/logreader.cc

* correct connect

* pub message in function

* typo

* Update selfdrive/ui/replay/replay.cc

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
pull/22570/head
Dean Lee 4 years ago committed by GitHub
parent 3de2cd897b
commit acc52ece20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      selfdrive/ui/replay/logreader.cc
  2. 5
      selfdrive/ui/replay/logreader.h
  3. 78
      selfdrive/ui/replay/replay.cc
  4. 4
      selfdrive/ui/replay/replay.h
  5. 185
      selfdrive/ui/replay/route.cc
  6. 38
      selfdrive/ui/replay/route.h
  7. 4
      selfdrive/ui/replay/tests/test_replay.cc
  8. 6
      selfdrive/ui/replay/util.cc

@ -1,6 +1,5 @@
#include "selfdrive/ui/replay/logreader.h"
#include <cassert>
#include <sstream>
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/util.h"
@ -32,8 +31,9 @@ LogReader::~LogReader() {
for (auto e : events) delete e;
}
bool LogReader::load(const std::string &file, bool is_bz2file) {
if (is_bz2file) {
bool LogReader::load(const std::string &file) {
bool is_bz2 = file.rfind(".bz2") == file.length() - 4;
if (is_bz2) {
std::ostringstream stream;
if (!readBZ2File(file, stream)) {
LOGW("bz2 decompress failed");

@ -1,8 +1,5 @@
#pragma once
#include <unordered_map>
#include <cassert>
#include <capnp/serialize.h>
#include "cereal/gen/cpp/log.capnp.h"
#include "selfdrive/camerad/cameras/camera_common.h"
@ -38,7 +35,7 @@ class LogReader {
public:
LogReader() = default;
~LogReader();
bool load(const std::string &file, bool is_bz2file);
bool load(const std::string &file);
std::vector<Event*> events;

@ -57,10 +57,9 @@ bool Replay::load() {
return false;
}
for (int i = 0; i < route_->size(); ++i) {
const SegmentFile &f = route_->at(i);
for (auto &[n, f] : route_->segments()) {
if ((!f.rlog.isEmpty() || !f.qlog.isEmpty()) && (!f.road_cam.isEmpty() || !f.qcamera.isEmpty())) {
segments_[i] = nullptr;
segments_[n] = nullptr;
}
}
if (segments_.empty()) {
@ -98,8 +97,9 @@ void Replay::doSeek(int seconds, bool relative) {
seconds += currentSeconds();
}
qInfo() << "seeking to" << seconds;
cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (int)segments_.rbegin()->first * 60) * 1e9;
current_segment_ = std::min(seconds / 60, (int)segments_.rbegin()->first - 1);
const int max_segment_number = segments_.rbegin()->first;
cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (max_segment_number + 1) * 60) * 1e9;
current_segment_ = std::min(seconds / 60, max_segment_number);
return false;
});
queueSegment();
@ -122,16 +122,32 @@ void Replay::setCurrentSegment(int n) {
}
}
// maintain the segment window
void Replay::segmentLoadFinished(bool success) {
if (!success) {
Segment *seg = qobject_cast<Segment *>(sender());
qInfo() << "failed to load segment " << seg->seg_num << ", removing it from current replay list";
segments_.erase(seg->seg_num);
}
queueSegment();
}
void Replay::queueSegment() {
// forward fetch segments
// get the current segment window
SegmentMap::iterator begin, end;
begin = end = segments_.lower_bound(current_segment_);
for (int fwd = 0; end != segments_.end() && fwd <= FORWARD_SEGS; ++end, ++fwd) {
auto &[n, seg] = *end;
for (int i = 0; i < BACKWARD_SEGS && begin != segments_.begin(); ++i) {
--begin;
}
for (int i = 0; i <= FORWARD_SEGS && end != segments_.end(); ++i) {
++end;
}
// load segments
for (auto it = begin; it != end; ++it) {
auto &[n, seg] = *it;
if (!seg) {
seg = std::make_unique<Segment>(n, route_->at(n), load_dcam, load_ecam);
QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::queueSegment);
QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
qInfo() << "loading segment" << n << "...";
}
}
// merge segments
@ -159,8 +175,8 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0,
[=](int v, int n) { return v + segments_[n]->log->events.size(); }));
for (int n : segments_need_merge) {
auto &log = segments_[n]->log;
auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end());
auto &e = segments_[n]->log->events;
auto middle = new_events->insert(new_events->end(), e.begin(), e.end());
std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan());
}
// update events
@ -182,7 +198,20 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::
});
delete prev_events;
} else {
updateEvents([=]() { return begin->second->isLoaded(); });
updateEvents([=]() { return true; });
}
}
void Replay::publishMessage(const Event *e) {
if (sm == nullptr) {
auto bytes = e->bytes();
int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size());
if (ret == -1) {
qDebug() << "stop publishing" << sockets_[e->which] << "due to multiple publishers error";
sockets_[e->which] = nullptr;
}
} else {
sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], e->event}});
}
}
@ -222,8 +251,8 @@ void Replay::stream() {
continue;
}
const uint64_t evt_start_ts = cur_mono_time_;
const uint64_t loop_start_ts = nanos_since_boot();
uint64_t evt_start_ts = cur_mono_time_;
uint64_t loop_start_ts = nanos_since_boot();
for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) {
const Event *evt = (*eit);
@ -250,24 +279,19 @@ void Replay::stream() {
long etime = cur_mono_time_ - evt_start_ts;
long rtime = nanos_since_boot() - loop_start_ts;
long behind_ns = etime - rtime;
if (behind_ns > 0) {
// if behind_ns is greater than 1 second, it means that an invalid segemnt is skipped by seeking/replaying
if (behind_ns >= 1 * 1e9) {
// reset start times
evt_start_ts = cur_mono_time_;
loop_start_ts = nanos_since_boot();
} else if (behind_ns > 0) {
precise_nano_sleep(behind_ns);
}
if (evt->frame) {
publishFrame(evt);
} else {
// publish msg
if (sm == nullptr) {
auto bytes = evt->bytes();
int ret = pm->send(sockets_[cur_which], (capnp::byte *)bytes.begin(), bytes.size());
if (ret == -1) {
qDebug() << "stop publishing" << sockets_[cur_which] << "due to multiple publishers error";
sockets_[cur_which] = nullptr;
}
} else {
sm->update_msgs(nanos_since_boot(), {{sockets_[cur_which], evt->event}});
}
publishMessage(evt);
}
}
}

@ -6,7 +6,7 @@
#include "selfdrive/ui/replay/route.h"
constexpr int FORWARD_SEGS = 2;
constexpr int BACKWARD_SEGS = 2;
constexpr int BACKWARD_SEGS = 1;
class Replay : public QObject {
Q_OBJECT
@ -27,6 +27,7 @@ signals:
protected slots:
void queueSegment();
void doSeek(int seconds, bool relative);
void segmentLoadFinished(bool sucess);
protected:
typedef std::map<int, std::unique_ptr<Segment>> SegmentMap;
@ -34,6 +35,7 @@ protected:
void setCurrentSegment(int n);
void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end);
void updateEvents(const std::function<bool()>& lambda);
void publishMessage(const Event *e);
void publishFrame(const Event *e);
inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; }

@ -1,21 +1,24 @@
#include "selfdrive/ui/replay/route.h"
#include <QEventLoop>
#include <QFile>
#include <QJsonArray>
#include <QJsonDocument>
#include <QRegExp>
#include <QThread>
#include <future>
#include <QtConcurrent>
#include "selfdrive/hardware/hw.h"
#include "selfdrive/ui/qt/api.h"
#include "selfdrive/ui/replay/util.h"
Route::Route(const QString &route, const QString &data_dir) : route_(route), data_dir_(data_dir) {}
bool Route::load() {
if (data_dir_.isEmpty()) {
return loadFromServer();
} else {
return loadFromLocal();
}
}
bool Route::loadFromServer() {
QEventLoop loop;
auto onError = [&loop](const QString &err) { loop.quit(); };
@ -30,9 +33,6 @@ bool Route::load() {
http.sendRequest("https://api.commadotai.com/v1/route/" + route_ + "/files");
loop.exec();
return ret;
} else {
return loadFromLocal();
}
}
bool Route::loadFromJson(const QString &json) {
@ -47,23 +47,7 @@ bool Route::loadFromJson(const QString &json) {
for (const auto &url : route_files[key].toArray()) {
QString url_str = url.toString();
if (rx.indexIn(url_str) != -1) {
const int seg_num = rx.cap(1).toInt();
if (segments_.size() <= seg_num) {
segments_.resize(seg_num + 1);
}
if (key == "logs") {
segments_[seg_num].rlog = url_str;
} else if (key == "qlogs") {
segments_[seg_num].qlog = url_str;
} else if (key == "cameras") {
segments_[seg_num].road_cam = url_str;
} else if (key == "dcameras") {
segments_[seg_num].driver_cam = url_str;
} else if (key == "ecameras") {
segments_[seg_num].wide_road_cam = url_str;
} else if (key == "qcameras") {
segments_[seg_num].qcamera = url_str;
}
addFileToSegment(rx.cap(1).toInt(), url_str);
}
}
}
@ -79,121 +63,94 @@ bool Route::loadFromLocal() {
if (folders.isEmpty()) return false;
for (auto folder : folders) {
const int seg_num = folder.split("--")[2].toInt();
if (segments_.size() <= seg_num) {
segments_.resize(seg_num + 1);
}
int seg_num_pos = folder.lastIndexOf("--");
if (seg_num_pos != -1) {
const int seg_num = folder.mid(seg_num_pos + 2).toInt();
QDir segment_dir(log_dir.filePath(folder));
for (auto f : segment_dir.entryList(QDir::Files)) {
const QString file_path = segment_dir.absoluteFilePath(f);
if (f.startsWith("rlog")) {
segments_[seg_num].rlog = file_path;
} else if (f.startsWith("qlog")) {
segments_[seg_num].qlog = file_path;
} else if (f.startsWith("fcamera")) {
segments_[seg_num].road_cam = file_path;
} else if (f.startsWith("dcamera")) {
segments_[seg_num].driver_cam = file_path;
} else if (f.startsWith("ecamera")) {
segments_[seg_num].wide_road_cam = file_path;
} else if (f.startsWith("qcamera")) {
segments_[seg_num].qcamera = file_path;
addFileToSegment(seg_num, segment_dir.absoluteFilePath(f));
}
}
}
return true;
}
void Route::addFileToSegment(int n, const QString &file) {
const QString name = QUrl(file).fileName();
if (name == "rlog.bz2") {
segments_[n].rlog = file;
} else if (name == "qlog.bz2") {
segments_[n].qlog = file;
} else if (name == "fcamera.hevc") {
segments_[n].road_cam = file;
} else if (name == "dcamera.hevc") {
segments_[n].driver_cam = file;
} else if (name == "ecamera.hevc") {
segments_[n].wide_road_cam = file;
} else if (name == "qcamera.ts") {
segments_[n].qcamera = file;
}
}
// class Segment
Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam) : seg_num_(n), files_(segment_files) {
Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam) : seg_num(n) {
static std::once_flag once_flag;
std::call_once(once_flag, [=]() {
if (!CACHE_DIR.exists()) QDir().mkdir(CACHE_DIR.absolutePath());
});
// fallback to qcamera/qlog
road_cam_path_ = files_.road_cam.isEmpty() ? files_.qcamera : files_.road_cam;
log_path_ = files_.rlog.isEmpty() ? files_.qlog : files_.rlog;
assert (!log_path_.isEmpty() && !road_cam_path_.isEmpty());
if (!load_dcam) {
files_.driver_cam = "";
}
if (!load_ecam) {
files_.wide_road_cam = "";
}
std::call_once(once_flag, [=]() { if (!CACHE_DIR.exists()) QDir().mkdir(CACHE_DIR.absolutePath()); });
if (!QUrl(log_path_).isLocalFile()) {
for (auto &url : {log_path_, road_cam_path_, files_.driver_cam, files_.wide_road_cam}) {
if (!url.isEmpty() && !QFile::exists(localPath(url))) {
downloadFile(url);
++downloading_;
}
// the order is [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog
const QString file_list[] = {
files.road_cam.isEmpty() ? files.qcamera : files.road_cam,
load_dcam ? files.driver_cam : "",
load_ecam ? files.wide_road_cam : "",
files.rlog.isEmpty() ? files.qlog : files.rlog,
};
for (int i = 0; i < std::size(file_list); i++) {
if (!file_list[i].isEmpty()) {
loading_++;
synchronizer_.addFuture(QtConcurrent::run(this, &Segment::loadFile, i, file_list[i].toStdString()));
}
}
if (downloading_ == 0) {
QTimer::singleShot(0, this, &Segment::load);
} else {
qDebug() << "downloading segment" << seg_num_ << "...";
}
}
Segment::~Segment() {
aborting_ = true;
if (downloading_ > 0) {
qDebug() << "cancel download segment" << seg_num_;
}
for (auto &t : download_threads_) {
if (t->isRunning()) t->wait();
}
synchronizer_.setCancelOnWait(true);
synchronizer_.waitForFinished();
}
void Segment::downloadFile(const QString &url) {
download_threads_.emplace_back(QThread::create([=]() {
const std::string local_file = localPath(url).toStdString();
bool ret = httpMultiPartDownload(url.toStdString(), local_file, connections_per_file, &aborting_);
if (ret && url == log_path_) {
// pre-decompress log file.
std::ofstream ostrm(local_file + "_decompressed", std::ios::binary);
readBZ2File(local_file, ostrm);
}
if (--downloading_ == 0 && !aborting_) {
load();
}
}))->start();
}
void Segment::loadFile(int id, const std::string file) {
const bool is_remote = file.find("https://") == 0;
const std::string local_file = is_remote ? cacheFilePath(file) : file;
bool file_ready = util::file_exists(local_file);
// load concurrency
void Segment::load() {
std::vector<std::future<bool>> futures;
if (!file_ready && is_remote) {
// TODO: retry on failure
file_ready = httpMultiPartDownload(file, local_file, id < MAX_CAMERAS ? 3 : 1, &aborting_);
}
futures.emplace_back(std::async(std::launch::async, [=]() {
const std::string bzip_file = localPath(log_path_).toStdString();
const std::string decompressed_file = bzip_file + "_decompressed";
bool is_bzip = !util::file_exists(decompressed_file);
if (!aborting_ && file_ready) {
if (id < MAX_CAMERAS) {
frames[id] = std::make_unique<FrameReader>();
success_ = success_ && frames[id]->load(local_file);
} else {
std::string decompressed = cacheFilePath(local_file + ".decompressed");
if (!util::file_exists(decompressed)) {
std::ofstream ostrm(decompressed, std::ios::binary);
readBZ2File(local_file, ostrm);
}
log = std::make_unique<LogReader>();
return log->load(is_bzip ? bzip_file : decompressed_file, is_bzip);
}));
QString camera_files[] = {road_cam_path_, files_.driver_cam, files_.wide_road_cam};
for (int i = 0; i < std::size(camera_files); ++i) {
if (!camera_files[i].isEmpty()) {
futures.emplace_back(std::async(std::launch::async, [=]() {
frames[i] = std::make_unique<FrameReader>();
return frames[i]->load(localPath(camera_files[i]).toStdString());
}));
success_ = success_ && log->load(decompressed);
}
}
int success_cnt = std::accumulate(futures.begin(), futures.end(), 0, [=](int v, auto &f) { return f.get() + v; });
loaded_ = (success_cnt == futures.size());
emit loadFinished();
if (!aborting_ && --loading_ == 0) {
emit loadFinished(success_);
}
}
QString Segment::localPath(const QUrl &url) {
if (url.isLocalFile() || QFile(url.toString()).exists()) return url.toString();
QByteArray url_no_query = url.toString(QUrl::RemoveQuery).toUtf8();
return CACHE_DIR.filePath(QString(QCryptographicHash::hash(url_no_query, QCryptographicHash::Sha256).toHex()));
std::string Segment::cacheFilePath(const std::string &file) {
QString url_no_query = QUrl(file.c_str()).toString(QUrl::RemoveQuery);
QString sha256 = QCryptographicHash::hash(url_no_query.toUtf8(), QCryptographicHash::Sha256).toHex();
return CACHE_DIR.filePath(sha256 + "." + QFileInfo(url_no_query).suffix()).toStdString();
}

@ -1,16 +1,13 @@
#pragma once
#include <QDir>
#include <QObject>
#include <QString>
#include <vector>
#include <QFutureSynchronizer>
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/framereader.h"
#include "selfdrive/ui/replay/logreader.h"
const QDir CACHE_DIR(util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/").c_str());
const int connections_per_file = 3;
struct SegmentFile {
QString rlog;
@ -23,45 +20,42 @@ struct SegmentFile {
class Route {
public:
Route(const QString &route, const QString &data_dir = {});
Route(const QString &route, const QString &data_dir = {}) : route_(route), data_dir_(data_dir) {};
bool load();
inline const QString &name() const { return route_; };
inline int size() const { return segments_.size(); }
inline SegmentFile &at(int n) { return segments_[n]; }
inline const std::map<int, SegmentFile> &segments() const { return segments_; }
inline const SegmentFile &at(int n) { return segments_.at(n); }
protected:
bool loadFromLocal();
bool loadFromServer();
bool loadFromJson(const QString &json);
void addFileToSegment(int seg_num, const QString &file);
QString route_;
QString data_dir_;
std::vector<SegmentFile> segments_;
std::map<int, SegmentFile> segments_;
};
class Segment : public QObject {
Q_OBJECT
public:
Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam);
Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam);
~Segment();
inline bool isLoaded() const { return loaded_; }
inline bool isLoaded() const { return !loading_ && success_; }
const int seg_num = 0;
std::unique_ptr<LogReader> log;
std::unique_ptr<FrameReader> frames[MAX_CAMERAS] = {};
signals:
void loadFinished();
void loadFinished(bool success);
protected:
void load();
void downloadFile(const QString &url);
QString localPath(const QUrl &url);
void loadFile(int id, const std::string file);
std::string cacheFilePath(const std::string &file);
std::atomic<bool> loaded_ = false;
std::atomic<bool> aborting_ = false;
std::atomic<int> downloading_ = 0;
int seg_num_ = 0;
SegmentFile files_;
QString road_cam_path_;
QString log_path_;
std::vector<QThread*> download_threads_;
std::atomic<bool> success_ = true, aborting_ = false;
std::atomic<int> loading_ = 0;
QFutureSynchronizer<void> synchronizer_;
};

@ -14,7 +14,6 @@ std::string sha_256(const QString &dat) {
TEST_CASE("httpMultiPartDownload") {
char filename[] = "/tmp/XXXXXX";
int fd = mkstemp(filename);
REQUIRE(fd != -1);
close(fd);
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc";
@ -54,7 +53,7 @@ bool is_events_ordered(const std::vector<Event *> &events) {
TEST_CASE("Segment") {
Route demo_route(DEMO_ROUTE);
REQUIRE(demo_route.load());
REQUIRE(demo_route.size() == 11);
REQUIRE(demo_route.segments().size() == 11);
QEventLoop loop;
Segment segment(0, demo_route.at(0), false, false);
@ -132,7 +131,6 @@ void TestReplay::test_seek() {
segments_.erase(n);
}
for (int i =0; i < 50; ++i) {
testSeekTo(520);
testSeekTo(random_int(4 * 60, 9 * 60));
}
loop.quit();

@ -49,10 +49,12 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil
int64_t content_length = getDownloadContentLength(url);
if (content_length == -1) return false;
// create a tmp sparse file
std::string tmp_file = target_file + ".tmp";
FILE *fp = fopen(tmp_file.c_str(), "wb");
// create a sparse file
fseek(fp, content_length, SEEK_SET);
assert(fp);
fseek(fp, content_length - 1, SEEK_SET);
fwrite("\0", 1, 1, fp);
CURLM *cm = curl_multi_init();
std::map<CURL *, MultiPartWriter> writers;

Loading…
Cancel
Save