From 9fae8b50f3debe43816b1a02b943f5b943c7668d Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Tue, 28 Sep 2021 18:24:48 +0800 Subject: [PATCH] c++ replay: chunking and concurrent downloads (#22308) * download segment files by chunks in multiple threads * remove easy_handl on aborting * add test cases * better error handling * update test * cleanup * add CURLGlobalInitializer * check http code * finish old-commit-hash: 4e6ff308a82532b29a58f281544f9b598244a01d --- selfdrive/ui/SConscript | 4 +- selfdrive/ui/replay/main.cc | 1 + selfdrive/ui/replay/replay.cc | 2 +- selfdrive/ui/replay/route.cc | 129 ++++++++++++++++++++--- selfdrive/ui/replay/route.h | 10 +- selfdrive/ui/replay/tests/test_replay.cc | 61 ++++++++--- selfdrive/ui/replay/tests/test_runner.cc | 2 + 7 files changed, 168 insertions(+), 41 deletions(-) create mode 100644 selfdrive/ui/replay/tests/test_runner.cc diff --git a/selfdrive/ui/SConscript b/selfdrive/ui/SConscript index 116ecc1273..b077f6de2e 100644 --- a/selfdrive/ui/SConscript +++ b/selfdrive/ui/SConscript @@ -111,8 +111,8 @@ if arch in ['x86_64', 'Darwin'] and os.path.exists(Dir("#tools/").get_abspath()) replay_lib_src = ["replay/replay.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc"] replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs) - replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2'] + qt_libs + replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2', 'curl'] + qt_libs qt_env.Program("replay/replay", ["replay/main.cc"], LIBS=replay_libs) if GetOption('test'): - qt_env.Program('replay/tests/test_replay', ['replay/tests/test_replay.cc'], LIBS=[replay_libs]) + qt_env.Program('replay/tests/test_replay', ['replay/tests/test_runner.cc', 'replay/tests/test_replay.cc'], LIBS=[replay_libs]) diff --git a/selfdrive/ui/replay/main.cc b/selfdrive/ui/replay/main.cc index 84167fe4fb..e7944b03f3 100644 --- a/selfdrive/ui/replay/main.cc +++ b/selfdrive/ui/replay/main.cc @@ -5,6 +5,7 @@ #include #include +#include #include const QString DEMO_ROUTE = "3533c53bb29502d1|2019-12-10--01-13-27"; diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index ece5c47dc8..52071ad1f5 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -1,7 +1,7 @@ #include "selfdrive/ui/replay/replay.h" #include - +#include #include "cereal/services.h" #include "selfdrive/camerad/cameras/camera_common.h" #include "selfdrive/common/timing.h" diff --git a/selfdrive/ui/replay/route.cc b/selfdrive/ui/replay/route.cc index 7a99cfbb97..5fce700296 100644 --- a/selfdrive/ui/replay/route.cc +++ b/selfdrive/ui/replay/route.cc @@ -1,16 +1,121 @@ #include "selfdrive/ui/replay/route.h" +#include + #include #include #include #include #include #include +#include #include #include "selfdrive/hardware/hw.h" #include "selfdrive/ui/qt/api.h" +struct CURLGlobalInitializer { + CURLGlobalInitializer() { curl_global_init(CURL_GLOBAL_DEFAULT); } + ~CURLGlobalInitializer() { curl_global_cleanup(); } +}; + +struct MultiPartWriter { + int64_t offset; + int64_t end; + FILE *fp; +}; + +static size_t write_cb(char *data, size_t n, size_t l, void *userp) { + MultiPartWriter *w = (MultiPartWriter *)userp; + fseek(w->fp, w->offset, SEEK_SET); + fwrite(data, l, n, w->fp); + w->offset += n * l; + return n * l; +} + +static size_t dumy_write_cb(char *data, size_t n, size_t l, void *userp) { return n * l; } + +int64_t getDownloadContentLength(const std::string &url) { + CURL *curl = curl_easy_init(); + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dumy_write_cb); + curl_easy_setopt(curl, CURLOPT_HEADER, 1); + curl_easy_setopt(curl, CURLOPT_NOBODY, 1); + CURLcode res = curl_easy_perform(curl); + double content_length = -1; + if (res == CURLE_OK) { + res = curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &content_length); + } + curl_easy_cleanup(curl); + return res == CURLE_OK ? (int64_t)content_length : -1; +} + +bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic *abort) { + int64_t content_length = getDownloadContentLength(url); + if (content_length == -1) return false; + + std::string tmp_file = target_file + ".tmp"; + FILE *fp = fopen(tmp_file.c_str(), "wb"); + // create a sparse file + fseek(fp, content_length, SEEK_SET); + + CURLM *cm = curl_multi_init(); + std::map writers; + const int part_size = content_length / parts; + for (int i = 0; i < parts; ++i) { + CURL *eh = curl_easy_init(); + writers[eh] = { + .fp = fp, + .offset = i * part_size, + .end = i == parts - 1 ? content_length - 1 : (i + 1) * part_size - 1, + }; + curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_cb); + curl_easy_setopt(eh, CURLOPT_WRITEDATA, (void *)(&writers[eh])); + curl_easy_setopt(eh, CURLOPT_URL, url.c_str()); + curl_easy_setopt(eh, CURLOPT_RANGE, util::string_format("%d-%d", writers[eh].offset, writers[eh].end).c_str()); + curl_easy_setopt(eh, CURLOPT_HTTPGET, 1); + curl_easy_setopt(eh, CURLOPT_NOSIGNAL, 1); + curl_easy_setopt(eh, CURLOPT_FOLLOWLOCATION, 1); + curl_multi_add_handle(cm, eh); + } + + int running = 1, success_cnt = 0; + while (!(abort && abort->load())){ + CURLMcode ret = curl_multi_perform(cm, &running); + + if (!running) { + CURLMsg *msg; + int msgs_left = -1; + while ((msg = curl_multi_info_read(cm, &msgs_left))) { + if (msg->msg == CURLMSG_DONE && msg->data.result == CURLE_OK) { + int http_status_code = 0; + curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_status_code); + success_cnt += (http_status_code == 206); + } + } + break; + } + + if (ret == CURLM_OK) { + curl_multi_wait(cm, nullptr, 0, 1000, nullptr); + } + }; + + fclose(fp); + bool success = success_cnt == parts; + if (success) { + success = ::rename(tmp_file.c_str(), target_file.c_str()) == 0; + } + + // cleanup curl + for (auto &[e, w] : writers) { + curl_multi_remove_handle(cm, e); + curl_easy_cleanup(e); + } + curl_multi_cleanup(cm); + return success; +} + Route::Route(const QString &route) : route_(route) {} bool Route::load() { @@ -71,6 +176,7 @@ bool Route::loadFromJson(const QString &json) { // class Segment Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam) : seg_num_(n), files_(segment_files) { + static CURLGlobalInitializer curl_initializer; static std::once_flag once_flag; std::call_once(once_flag, [=]() { if (!QDir(CACHE_DIR).exists()) QDir().mkdir(CACHE_DIR); @@ -91,7 +197,6 @@ Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool l if (!QUrl(files_.rlog).isLocalFile()) { for (auto &url : {files_.rlog, road_cam_path_, files_.driver_cam, files_.wide_road_cam}) { if (!url.isEmpty() && !QFile::exists(localPath(url))) { - qDebug() << "download" << url; downloadFile(url); ++downloading_; } @@ -103,30 +208,20 @@ Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool l } Segment::~Segment() { - // cancel download, qnam will not abort requests, need to abort them manually aborting_ = true; - for (QNetworkReply *replay : replies_) { - if (replay->isRunning()) { - replay->abort(); - } - replay->deleteLater(); + for (auto &t : download_threads_) { + if (t->isRunning()) t->wait(); } } void Segment::downloadFile(const QString &url) { - QNetworkReply *reply = qnam_.get(QNetworkRequest(url)); - replies_.insert(reply); - connect(reply, &QNetworkReply::finished, [=]() { - if (reply->error() == QNetworkReply::NoError) { - QFile file(localPath(url)); - if (file.open(QIODevice::WriteOnly)) { - file.write(reply->readAll()); - } - } + qDebug() << "download" << url; + download_threads_.emplace_back(QThread::create([=]() { + httpMultiPartDownload(url.toStdString(), localPath(url).toStdString(), connections_per_file, &aborting_); if (--downloading_ == 0 && !aborting_) { load(); } - }); + }))->start(); } // load concurrency diff --git a/selfdrive/ui/replay/route.h b/selfdrive/ui/replay/route.h index 3f7725db96..b7557f53a6 100644 --- a/selfdrive/ui/replay/route.h +++ b/selfdrive/ui/replay/route.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -9,6 +9,7 @@ #include "selfdrive/ui/replay/logreader.h" const QString CACHE_DIR = util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/").c_str(); +const int connections_per_file = 3; struct SegmentFile { QString rlog; @@ -55,11 +56,12 @@ protected: QString localPath(const QUrl &url); bool loaded_ = false, valid_ = false; - bool aborting_ = false; + std::atomic aborting_ = false; int downloading_ = 0; int seg_num_ = 0; SegmentFile files_; QString road_cam_path_; - QSet replies_; - QNetworkAccessManager qnam_; + std::vector download_threads_; }; + +bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic *abort = nullptr); diff --git a/selfdrive/ui/replay/tests/test_replay.cc b/selfdrive/ui/replay/tests/test_replay.cc index d0c88d7ca0..9a2560cea6 100644 --- a/selfdrive/ui/replay/tests/test_replay.cc +++ b/selfdrive/ui/replay/tests/test_replay.cc @@ -1,24 +1,51 @@ -#define CATCH_CONFIG_MAIN +#include +#include +#include + #include "catch2/catch.hpp" +#include "selfdrive/common/util.h" #include "selfdrive/ui/replay/framereader.h" +#include "selfdrive/ui/replay/route.h" const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc"; -TEST_CASE("FrameReader") { - SECTION("process&get") { - FrameReader fr; - REQUIRE(fr.load(stream_url) == true); - REQUIRE(fr.valid() == true); - REQUIRE(fr.getFrameCount() == 1200); - // random get 50 frames - // srand(time(NULL)); - // for (int i = 0; i < 50; ++i) { - // int idx = rand() % (fr.getFrameCount() - 1); - // REQUIRE(fr.get(idx) != nullptr); - // } - // sequence get 50 frames { - for (int i = 0; i < 50; ++i) { - REQUIRE(fr.get(i) != nullptr); - } +// TEST_CASE("FrameReader") { +// SECTION("process&get") { +// FrameReader fr; +// REQUIRE(fr.load(stream_url) == true); +// REQUIRE(fr.valid() == true); +// REQUIRE(fr.getFrameCount() == 1200); +// // random get 50 frames +// // srand(time(NULL)); +// // for (int i = 0; i < 50; ++i) { +// // int idx = rand() % (fr.getFrameCount() - 1); +// // REQUIRE(fr.get(idx) != nullptr); +// // } +// // sequence get 50 frames { +// for (int i = 0; i < 50; ++i) { +// REQUIRE(fr.get(i) != nullptr); +// } +// } +// } + +std::string sha_256(const QString &dat) { + return QString(QCryptographicHash::hash(dat.toUtf8(), QCryptographicHash::Sha256).toHex()).toStdString(); +} + +TEST_CASE("httpMultiPartDownload") { + char filename[] = "/tmp/XXXXXX"; + int fd = mkstemp(filename); + REQUIRE(fd != -1); + close(fd); + + SECTION("http 200") { + REQUIRE(httpMultiPartDownload(stream_url, filename, 5)); + std::string content = util::read_file(filename); + REQUIRE(content.size() == 37495242); + std::string checksum = sha_256(QString::fromStdString(content)); + REQUIRE(checksum == "d8ff81560ce7ed6f16d5fb5a6d6dd13aba06c8080c62cfe768327914318744c4"); + } + SECTION("http 404") { + REQUIRE(httpMultiPartDownload(util::string_format("%s_abc", stream_url), filename, 5) == false); } } diff --git a/selfdrive/ui/replay/tests/test_runner.cc b/selfdrive/ui/replay/tests/test_runner.cc new file mode 100644 index 0000000000..62bf7476a1 --- /dev/null +++ b/selfdrive/ui/replay/tests/test_runner.cc @@ -0,0 +1,2 @@ +#define CATCH_CONFIG_MAIN +#include "catch2/catch.hpp"