diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index 5b0f854acb..d04c5b0010 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -160,9 +160,12 @@ void Replay::queueSegment() { } // start stream thread - if (stream_thread_ == nullptr && cur != segments_.end() && cur->second->isLoaded()) { + bool current_segment_loaded = (cur != segments_.end() && cur->second->isLoaded()); + if (stream_thread_ == nullptr && current_segment_loaded) { startStream(cur->second.get()); } + + enableHttpLogging(!current_segment_loaded); } void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { diff --git a/selfdrive/ui/replay/route.cc b/selfdrive/ui/replay/route.cc index 6d2eff18ae..206970009e 100644 --- a/selfdrive/ui/replay/route.cc +++ b/selfdrive/ui/replay/route.cc @@ -126,8 +126,7 @@ void Segment::loadFile(int id, const std::string file) { bool file_ready = util::file_exists(local_file); if (!file_ready && is_remote) { - // TODO: retry on failure - file_ready = httpMultiPartDownload(file, local_file, id < MAX_CAMERAS ? 3 : 1, &aborting_); + file_ready = downloadFile(id, file, local_file); } if (!aborting_ && file_ready) { @@ -150,6 +149,22 @@ void Segment::loadFile(int id, const std::string file) { } } +bool Segment::downloadFile(int id, const std::string &url, const std::string local_file) { + bool ret = false; + int retries = 0; + while (!aborting_) { + ret = httpMultiPartDownload(url, local_file, id < MAX_CAMERAS ? 3 : 1, &aborting_); + if (ret || aborting_) break; + + if (++retries > max_retries_) { + qInfo() << "download failed after retries" << max_retries_; + break; + } + qInfo() << "download failed, retrying" << retries; + } + return ret; +} + std::string Segment::cacheFilePath(const std::string &file) { QString url_no_query = QUrl(file.c_str()).toString(QUrl::RemoveQuery); QString sha256 = QCryptographicHash::hash(url_no_query.toUtf8(), QCryptographicHash::Sha256).toHex(); diff --git a/selfdrive/ui/replay/route.h b/selfdrive/ui/replay/route.h index 80d275f6d3..7944a1ec29 100644 --- a/selfdrive/ui/replay/route.h +++ b/selfdrive/ui/replay/route.h @@ -53,9 +53,11 @@ signals: protected: void loadFile(int id, const std::string file); + bool downloadFile(int id, const std::string &url, const std::string local_file); std::string cacheFilePath(const std::string &file); std::atomic success_ = true, aborting_ = false; std::atomic loading_ = 0; - std::list loading_threads_; + std::vector loading_threads_; + const int max_retries_ = 3; }; diff --git a/selfdrive/ui/replay/tests/test_replay.cc b/selfdrive/ui/replay/tests/test_replay.cc index f164b9838b..83e633c324 100644 --- a/selfdrive/ui/replay/tests/test_replay.cc +++ b/selfdrive/ui/replay/tests/test_replay.cc @@ -13,20 +13,19 @@ std::string sha_256(const QString &dat) { TEST_CASE("httpMultiPartDownload") { char filename[] = "/tmp/XXXXXX"; - int fd = mkstemp(filename); - close(fd); + close(mkstemp(filename)); - const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc"; - SECTION("http 200") { + const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2"; + SECTION("5 connections") { REQUIRE(httpMultiPartDownload(stream_url, filename, 5)); - std::string content = util::read_file(filename); - REQUIRE(content.size() == 37495242); - std::string checksum = sha_256(QString::fromStdString(content)); - REQUIRE(checksum == "d8ff81560ce7ed6f16d5fb5a6d6dd13aba06c8080c62cfe768327914318744c4"); } - SECTION("http 404") { - REQUIRE(httpMultiPartDownload(util::string_format("%s_abc", stream_url), filename, 5) == false); + SECTION("1 connection") { + REQUIRE(httpMultiPartDownload(stream_url, filename, 1)); } + std::string content = util::read_file(filename); + REQUIRE(content.size() == 9112651); + std::string checksum = sha_256(QString::fromStdString(content)); + REQUIRE(checksum == "e44edfbb545abdddfd17020ced2b18b6ec36506152267f32b6a8e3341f8126d6"); } int random_int(int min, int max) { diff --git a/selfdrive/ui/replay/util.cc b/selfdrive/ui/replay/util.cc index 361aebc42d..5ffc4c0250 100644 --- a/selfdrive/ui/replay/util.cc +++ b/selfdrive/ui/replay/util.cc @@ -2,6 +2,10 @@ #include #include +#include +#include +#include + #include #include @@ -16,21 +20,26 @@ struct CURLGlobalInitializer { struct MultiPartWriter { int64_t offset; int64_t end; + int64_t written; FILE *fp; }; -static size_t write_cb(char *data, size_t n, size_t l, void *userp) { +static size_t write_cb(char *data, size_t size, size_t count, void *userp) { MultiPartWriter *w = (MultiPartWriter *)userp; fseek(w->fp, w->offset, SEEK_SET); - fwrite(data, l, n, w->fp); - w->offset += n * l; - return n * l; + fwrite(data, size, count, w->fp); + size_t bytes = size * count; + w->offset += bytes; + w->written += bytes; + return bytes; } -static size_t dumy_write_cb(char *data, size_t n, size_t l, void *userp) { return n * l; } +static size_t dumy_write_cb(char *data, size_t size, size_t count, void *userp) { return size * count; } -int64_t getDownloadContentLength(const std::string &url) { +int64_t getRemoteFileSize(const std::string &url) { CURL *curl = curl_easy_init(); + if (!curl) return -1; + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dumy_write_cb); curl_easy_setopt(curl, CURLOPT_HEADER, 1); @@ -39,25 +48,47 @@ int64_t getDownloadContentLength(const std::string &url) { double content_length = -1; if (res == CURLE_OK) { res = curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &content_length); + } else { + std::cout << "Download failed: error code: " << res << std::endl; } curl_easy_cleanup(curl); return res == CURLE_OK ? (int64_t)content_length : -1; } +std::string formattedDataSize(size_t size) { + if (size < 1024) { + return std::to_string(size) + " B"; + } else if (size < 1024 * 1024) { + return util::string_format("%.2f KB", (float)size / 1024); + } else { + return util::string_format("%.2f MB", (float)size / (1024 * 1024)); + } +} + +static std::atomic enable_http_logging = false; + +void enableHttpLogging(bool enable) { + enable_http_logging = enable; +} + bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic *abort) { static CURLGlobalInitializer curl_initializer; + static std::mutex lock; + static uint64_t total_written = 0, prev_total_written = 0; + static double last_print_ts = 0; - int64_t content_length = getDownloadContentLength(url); - if (content_length == -1) return false; + int64_t content_length = getRemoteFileSize(url); + if (content_length <= 0) return false; // create a tmp sparse file - std::string tmp_file = target_file + ".tmp"; + const std::string tmp_file = target_file + ".tmp"; FILE *fp = fopen(tmp_file.c_str(), "wb"); assert(fp); fseek(fp, content_length - 1, SEEK_SET); fwrite("\0", 1, 1, fp); CURLM *cm = curl_multi_init(); + std::map writers; const int part_size = content_length / parts; for (int i = 0; i < parts; ++i) { @@ -74,44 +105,63 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil curl_easy_setopt(eh, CURLOPT_HTTPGET, 1); curl_easy_setopt(eh, CURLOPT_NOSIGNAL, 1); curl_easy_setopt(eh, CURLOPT_FOLLOWLOCATION, 1); + curl_multi_add_handle(cm, eh); } - int running = 1, success_cnt = 0; - while (!(abort && abort->load())) { - CURLMcode ret = curl_multi_perform(cm, &running); - - if (!running) { - CURLMsg *msg; - int msgs_left = -1; - while ((msg = curl_multi_info_read(cm, &msgs_left))) { - if (msg->msg == CURLMSG_DONE && msg->data.result == CURLE_OK) { - int http_status_code = 0; - curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_status_code); - success_cnt += (http_status_code == 206); - } + int still_running = 1; + size_t prev_written = 0; + while (still_running > 0 && !(abort && *abort)) { + curl_multi_wait(cm, nullptr, 0, 1000, nullptr); + curl_multi_perform(cm, &still_running); + + size_t written = std::accumulate(writers.begin(), writers.end(), 0, [=](int v, auto &w) { return v + w.second.written; }); + int cur_written = written - prev_written; + prev_written = written; + + std::lock_guard lk(lock); + double ts = millis_since_boot(); + total_written += cur_written; + if ((ts - last_print_ts) > 2 * 1000) { + if (enable_http_logging && last_print_ts > 0) { + size_t average = (total_written - prev_total_written) / ((ts - last_print_ts) / 1000.); + std::cout << "downloading segments at " << formattedDataSize(average) << "/S" << std::endl; } - break; + prev_total_written = total_written; + last_print_ts = ts; } + } - if (ret == CURLM_OK) { - curl_multi_wait(cm, nullptr, 0, 1000, nullptr); + CURLMsg *msg; + int msgs_left = -1; + int complete = 0; + while ((msg = curl_multi_info_read(cm, &msgs_left)) && !(abort && *abort)) { + if (msg->msg == CURLMSG_DONE) { + if (msg->data.result == CURLE_OK) { + long res_status = 0; + curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &res_status); + if (res_status == 206) { + complete++; + } else { + std::cout << "Download failed: http error code: " << res_status << std::endl; + } + } else { + std::cout << "Download failed: connection failure: " << msg->data.result << std::endl; + } } - }; - - fclose(fp); - bool success = success_cnt == parts; - if (success) { - success = ::rename(tmp_file.c_str(), target_file.c_str()) == 0; } - // cleanup curl for (auto &[e, w] : writers) { curl_multi_remove_handle(cm, e); curl_easy_cleanup(e); } + curl_multi_cleanup(cm); - return success; + fclose(fp); + + bool ret = complete == parts; + ret = ret && ::rename(tmp_file.c_str(), target_file.c_str()) == 0; + return ret; } bool readBZ2File(const std::string_view file, std::ostream &stream) { @@ -147,7 +197,7 @@ void precise_nano_sleep(long sleep_ns) { } // spin wait if (sleep_ns > 0) { - while ((nanos_since_boot() - start_sleep) <= sleep_ns) { + while ((nanos_since_boot() - start_sleep) <= sleep_ns) { usleep(0); } } diff --git a/selfdrive/ui/replay/util.h b/selfdrive/ui/replay/util.h index 70095bef44..db7fa8483e 100644 --- a/selfdrive/ui/replay/util.h +++ b/selfdrive/ui/replay/util.h @@ -3,8 +3,8 @@ #include #include #include -#include void precise_nano_sleep(long sleep_ns); bool readBZ2File(const std::string_view file, std::ostream &stream); +void enableHttpLogging(bool enable); bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic *abort = nullptr);