From acc52ece20cd5b152d2a439c5c01c9498182f089 Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Sat, 16 Oct 2021 05:35:17 +0800 Subject: [PATCH] replay: refactor Route and Segment (#22531) * new functions * fix wrong call to qUrl::isLocalFile * cleanup * keep extension in cached files * cleanup * simplify segment * delete thread * add output * pre-decompress * remove suffix * revert remove suffix * 1 connection for log file * cleanup * segment may not be continuous,use map * don't emit finish if aborting_ * use QFuture and thread pool * cleanup * fix segfault in LoadFromLocal * cleanup * handle segment failed to load * output info * continue error handling * Remove redundant testSeekTo * cleanup * always return true * keep time is ok now change to 1s write 1 byte at the end of the sparse file * log loading segment * merge #22476 * Update selfdrive/ui/replay/logreader.cc * correct connect * pub message in function * typo * Update selfdrive/ui/replay/replay.cc Co-authored-by: Adeeb Shihadeh --- selfdrive/ui/replay/logreader.cc | 6 +- selfdrive/ui/replay/logreader.h | 5 +- selfdrive/ui/replay/replay.cc | 78 +++++--- selfdrive/ui/replay/replay.h | 4 +- selfdrive/ui/replay/route.cc | 217 +++++++++-------------- selfdrive/ui/replay/route.h | 38 ++-- selfdrive/ui/replay/tests/test_replay.cc | 4 +- selfdrive/ui/replay/util.cc | 6 +- 8 files changed, 166 insertions(+), 192 deletions(-) diff --git a/selfdrive/ui/replay/logreader.cc b/selfdrive/ui/replay/logreader.cc index 1b88402713..d4ef8e5c00 100644 --- a/selfdrive/ui/replay/logreader.cc +++ b/selfdrive/ui/replay/logreader.cc @@ -1,6 +1,5 @@ #include "selfdrive/ui/replay/logreader.h" -#include #include #include "selfdrive/common/util.h" #include "selfdrive/ui/replay/util.h" @@ -32,8 +31,9 @@ LogReader::~LogReader() { for (auto e : events) delete e; } -bool LogReader::load(const std::string &file, bool is_bz2file) { - if (is_bz2file) { +bool LogReader::load(const std::string &file) { + bool is_bz2 = file.rfind(".bz2") == file.length() - 4; + if (is_bz2) { std::ostringstream stream; if (!readBZ2File(file, stream)) { LOGW("bz2 decompress failed"); diff --git a/selfdrive/ui/replay/logreader.h b/selfdrive/ui/replay/logreader.h index d57ac8c51d..348dcfd384 100644 --- a/selfdrive/ui/replay/logreader.h +++ b/selfdrive/ui/replay/logreader.h @@ -1,8 +1,5 @@ #pragma once -#include -#include - #include #include "cereal/gen/cpp/log.capnp.h" #include "selfdrive/camerad/cameras/camera_common.h" @@ -38,7 +35,7 @@ class LogReader { public: LogReader() = default; ~LogReader(); - bool load(const std::string &file, bool is_bz2file); + bool load(const std::string &file); std::vector events; diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index 59482e10ff..79805d5b5c 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -57,10 +57,9 @@ bool Replay::load() { return false; } - for (int i = 0; i < route_->size(); ++i) { - const SegmentFile &f = route_->at(i); + for (auto &[n, f] : route_->segments()) { if ((!f.rlog.isEmpty() || !f.qlog.isEmpty()) && (!f.road_cam.isEmpty() || !f.qcamera.isEmpty())) { - segments_[i] = nullptr; + segments_[n] = nullptr; } } if (segments_.empty()) { @@ -98,8 +97,9 @@ void Replay::doSeek(int seconds, bool relative) { seconds += currentSeconds(); } qInfo() << "seeking to" << seconds; - cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (int)segments_.rbegin()->first * 60) * 1e9; - current_segment_ = std::min(seconds / 60, (int)segments_.rbegin()->first - 1); + const int max_segment_number = segments_.rbegin()->first; + cur_mono_time_ = route_start_ts_ + std::clamp(seconds, 0, (max_segment_number + 1) * 60) * 1e9; + current_segment_ = std::min(seconds / 60, max_segment_number); return false; }); queueSegment(); @@ -122,16 +122,32 @@ void Replay::setCurrentSegment(int n) { } } -// maintain the segment window +void Replay::segmentLoadFinished(bool success) { + if (!success) { + Segment *seg = qobject_cast(sender()); + qInfo() << "failed to load segment " << seg->seg_num << ", removing it from current replay list"; + segments_.erase(seg->seg_num); + } + queueSegment(); +} + void Replay::queueSegment() { - // forward fetch segments + // get the current segment window SegmentMap::iterator begin, end; begin = end = segments_.lower_bound(current_segment_); - for (int fwd = 0; end != segments_.end() && fwd <= FORWARD_SEGS; ++end, ++fwd) { - auto &[n, seg] = *end; + for (int i = 0; i < BACKWARD_SEGS && begin != segments_.begin(); ++i) { + --begin; + } + for (int i = 0; i <= FORWARD_SEGS && end != segments_.end(); ++i) { + ++end; + } + // load segments + for (auto it = begin; it != end; ++it) { + auto &[n, seg] = *it; if (!seg) { seg = std::make_unique(n, route_->at(n), load_dcam, load_ecam); - QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::queueSegment); + QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); + qInfo() << "loading segment" << n << "..."; } } // merge segments @@ -159,8 +175,8 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: new_events->reserve(std::accumulate(segments_need_merge.begin(), segments_need_merge.end(), 0, [=](int v, int n) { return v + segments_[n]->log->events.size(); })); for (int n : segments_need_merge) { - auto &log = segments_[n]->log; - auto middle = new_events->insert(new_events->end(), log->events.begin(), log->events.end()); + auto &e = segments_[n]->log->events; + auto middle = new_events->insert(new_events->end(), e.begin(), e.end()); std::inplace_merge(new_events->begin(), middle, new_events->end(), Event::lessThan()); } // update events @@ -182,7 +198,20 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: }); delete prev_events; } else { - updateEvents([=]() { return begin->second->isLoaded(); }); + updateEvents([=]() { return true; }); + } +} + +void Replay::publishMessage(const Event *e) { + if (sm == nullptr) { + auto bytes = e->bytes(); + int ret = pm->send(sockets_[e->which], (capnp::byte *)bytes.begin(), bytes.size()); + if (ret == -1) { + qDebug() << "stop publishing" << sockets_[e->which] << "due to multiple publishers error"; + sockets_[e->which] = nullptr; + } + } else { + sm->update_msgs(nanos_since_boot(), {{sockets_[e->which], e->event}}); } } @@ -222,8 +251,8 @@ void Replay::stream() { continue; } - const uint64_t evt_start_ts = cur_mono_time_; - const uint64_t loop_start_ts = nanos_since_boot(); + uint64_t evt_start_ts = cur_mono_time_; + uint64_t loop_start_ts = nanos_since_boot(); for (auto end = events_->end(); !updating_events_ && eit != end; ++eit) { const Event *evt = (*eit); @@ -250,24 +279,19 @@ void Replay::stream() { long etime = cur_mono_time_ - evt_start_ts; long rtime = nanos_since_boot() - loop_start_ts; long behind_ns = etime - rtime; - if (behind_ns > 0) { + // if behind_ns is greater than 1 second, it means that an invalid segemnt is skipped by seeking/replaying + if (behind_ns >= 1 * 1e9) { + // reset start times + evt_start_ts = cur_mono_time_; + loop_start_ts = nanos_since_boot(); + } else if (behind_ns > 0) { precise_nano_sleep(behind_ns); } if (evt->frame) { publishFrame(evt); } else { - // publish msg - if (sm == nullptr) { - auto bytes = evt->bytes(); - int ret = pm->send(sockets_[cur_which], (capnp::byte *)bytes.begin(), bytes.size()); - if (ret == -1) { - qDebug() << "stop publishing" << sockets_[cur_which] << "due to multiple publishers error"; - sockets_[cur_which] = nullptr; - } - } else { - sm->update_msgs(nanos_since_boot(), {{sockets_[cur_which], evt->event}}); - } + publishMessage(evt); } } } diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index 28fb231ec7..55383d8270 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -6,7 +6,7 @@ #include "selfdrive/ui/replay/route.h" constexpr int FORWARD_SEGS = 2; -constexpr int BACKWARD_SEGS = 2; +constexpr int BACKWARD_SEGS = 1; class Replay : public QObject { Q_OBJECT @@ -27,6 +27,7 @@ signals: protected slots: void queueSegment(); void doSeek(int seconds, bool relative); + void segmentLoadFinished(bool sucess); protected: typedef std::map> SegmentMap; @@ -34,6 +35,7 @@ protected: void setCurrentSegment(int n); void mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end); void updateEvents(const std::function& lambda); + void publishMessage(const Event *e); void publishFrame(const Event *e); inline int currentSeconds() const { return (cur_mono_time_ - route_start_ts_) / 1e9; } diff --git a/selfdrive/ui/replay/route.cc b/selfdrive/ui/replay/route.cc index 8953596cdb..a7aa4a28e6 100644 --- a/selfdrive/ui/replay/route.cc +++ b/selfdrive/ui/replay/route.cc @@ -1,40 +1,40 @@ #include "selfdrive/ui/replay/route.h" #include -#include #include #include #include -#include -#include +#include #include "selfdrive/hardware/hw.h" #include "selfdrive/ui/qt/api.h" #include "selfdrive/ui/replay/util.h" -Route::Route(const QString &route, const QString &data_dir) : route_(route), data_dir_(data_dir) {} - bool Route::load() { if (data_dir_.isEmpty()) { - QEventLoop loop; - auto onError = [&loop](const QString &err) { loop.quit(); }; - - bool ret = false; - HttpRequest http(nullptr, !Hardware::PC()); - QObject::connect(&http, &HttpRequest::failedResponse, onError); - QObject::connect(&http, &HttpRequest::timeoutResponse, onError); - QObject::connect(&http, &HttpRequest::receivedResponse, [&](const QString json) { - ret = loadFromJson(json); - loop.quit(); - }); - http.sendRequest("https://api.commadotai.com/v1/route/" + route_ + "/files"); - loop.exec(); - return ret; + return loadFromServer(); } else { return loadFromLocal(); } } +bool Route::loadFromServer() { + QEventLoop loop; + auto onError = [&loop](const QString &err) { loop.quit(); }; + + bool ret = false; + HttpRequest http(nullptr, !Hardware::PC()); + QObject::connect(&http, &HttpRequest::failedResponse, onError); + QObject::connect(&http, &HttpRequest::timeoutResponse, onError); + QObject::connect(&http, &HttpRequest::receivedResponse, [&](const QString json) { + ret = loadFromJson(json); + loop.quit(); + }); + http.sendRequest("https://api.commadotai.com/v1/route/" + route_ + "/files"); + loop.exec(); + return ret; +} + bool Route::loadFromJson(const QString &json) { QJsonObject route_files = QJsonDocument::fromJson(json.trimmed().toUtf8()).object(); if (route_files.empty()) { @@ -47,23 +47,7 @@ bool Route::loadFromJson(const QString &json) { for (const auto &url : route_files[key].toArray()) { QString url_str = url.toString(); if (rx.indexIn(url_str) != -1) { - const int seg_num = rx.cap(1).toInt(); - if (segments_.size() <= seg_num) { - segments_.resize(seg_num + 1); - } - if (key == "logs") { - segments_[seg_num].rlog = url_str; - } else if (key == "qlogs") { - segments_[seg_num].qlog = url_str; - } else if (key == "cameras") { - segments_[seg_num].road_cam = url_str; - } else if (key == "dcameras") { - segments_[seg_num].driver_cam = url_str; - } else if (key == "ecameras") { - segments_[seg_num].wide_road_cam = url_str; - } else if (key == "qcameras") { - segments_[seg_num].qcamera = url_str; - } + addFileToSegment(rx.cap(1).toInt(), url_str); } } } @@ -79,121 +63,94 @@ bool Route::loadFromLocal() { if (folders.isEmpty()) return false; for (auto folder : folders) { - const int seg_num = folder.split("--")[2].toInt(); - if (segments_.size() <= seg_num) { - segments_.resize(seg_num + 1); - } - QDir segment_dir(log_dir.filePath(folder)); - for (auto f : segment_dir.entryList(QDir::Files)) { - const QString file_path = segment_dir.absoluteFilePath(f); - if (f.startsWith("rlog")) { - segments_[seg_num].rlog = file_path; - } else if (f.startsWith("qlog")) { - segments_[seg_num].qlog = file_path; - } else if (f.startsWith("fcamera")) { - segments_[seg_num].road_cam = file_path; - } else if (f.startsWith("dcamera")) { - segments_[seg_num].driver_cam = file_path; - } else if (f.startsWith("ecamera")) { - segments_[seg_num].wide_road_cam = file_path; - } else if (f.startsWith("qcamera")) { - segments_[seg_num].qcamera = file_path; + int seg_num_pos = folder.lastIndexOf("--"); + if (seg_num_pos != -1) { + const int seg_num = folder.mid(seg_num_pos + 2).toInt(); + QDir segment_dir(log_dir.filePath(folder)); + for (auto f : segment_dir.entryList(QDir::Files)) { + addFileToSegment(seg_num, segment_dir.absoluteFilePath(f)); } } } return true; } +void Route::addFileToSegment(int n, const QString &file) { + const QString name = QUrl(file).fileName(); + if (name == "rlog.bz2") { + segments_[n].rlog = file; + } else if (name == "qlog.bz2") { + segments_[n].qlog = file; + } else if (name == "fcamera.hevc") { + segments_[n].road_cam = file; + } else if (name == "dcamera.hevc") { + segments_[n].driver_cam = file; + } else if (name == "ecamera.hevc") { + segments_[n].wide_road_cam = file; + } else if (name == "qcamera.ts") { + segments_[n].qcamera = file; + } +} + // class Segment -Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam) : seg_num_(n), files_(segment_files) { +Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam) : seg_num(n) { static std::once_flag once_flag; - std::call_once(once_flag, [=]() { - if (!CACHE_DIR.exists()) QDir().mkdir(CACHE_DIR.absolutePath()); - }); - - // fallback to qcamera/qlog - road_cam_path_ = files_.road_cam.isEmpty() ? files_.qcamera : files_.road_cam; - log_path_ = files_.rlog.isEmpty() ? files_.qlog : files_.rlog; - assert (!log_path_.isEmpty() && !road_cam_path_.isEmpty()); - - if (!load_dcam) { - files_.driver_cam = ""; - } - if (!load_ecam) { - files_.wide_road_cam = ""; - } - - if (!QUrl(log_path_).isLocalFile()) { - for (auto &url : {log_path_, road_cam_path_, files_.driver_cam, files_.wide_road_cam}) { - if (!url.isEmpty() && !QFile::exists(localPath(url))) { - downloadFile(url); - ++downloading_; - } + std::call_once(once_flag, [=]() { if (!CACHE_DIR.exists()) QDir().mkdir(CACHE_DIR.absolutePath()); }); + + // the order is [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog + const QString file_list[] = { + files.road_cam.isEmpty() ? files.qcamera : files.road_cam, + load_dcam ? files.driver_cam : "", + load_ecam ? files.wide_road_cam : "", + files.rlog.isEmpty() ? files.qlog : files.rlog, + }; + for (int i = 0; i < std::size(file_list); i++) { + if (!file_list[i].isEmpty()) { + loading_++; + synchronizer_.addFuture(QtConcurrent::run(this, &Segment::loadFile, i, file_list[i].toStdString())); } } - if (downloading_ == 0) { - QTimer::singleShot(0, this, &Segment::load); - } else { - qDebug() << "downloading segment" << seg_num_ << "..."; - } } Segment::~Segment() { aborting_ = true; - if (downloading_ > 0) { - qDebug() << "cancel download segment" << seg_num_; - } - for (auto &t : download_threads_) { - if (t->isRunning()) t->wait(); - } + synchronizer_.setCancelOnWait(true); + synchronizer_.waitForFinished(); } -void Segment::downloadFile(const QString &url) { - download_threads_.emplace_back(QThread::create([=]() { - const std::string local_file = localPath(url).toStdString(); - bool ret = httpMultiPartDownload(url.toStdString(), local_file, connections_per_file, &aborting_); - if (ret && url == log_path_) { - // pre-decompress log file. - std::ofstream ostrm(local_file + "_decompressed", std::ios::binary); - readBZ2File(local_file, ostrm); - } - if (--downloading_ == 0 && !aborting_) { - load(); - } - }))->start(); -} +void Segment::loadFile(int id, const std::string file) { + const bool is_remote = file.find("https://") == 0; + const std::string local_file = is_remote ? cacheFilePath(file) : file; + bool file_ready = util::file_exists(local_file); + + if (!file_ready && is_remote) { + // TODO: retry on failure + file_ready = httpMultiPartDownload(file, local_file, id < MAX_CAMERAS ? 3 : 1, &aborting_); + } -// load concurrency -void Segment::load() { - std::vector> futures; - - futures.emplace_back(std::async(std::launch::async, [=]() { - const std::string bzip_file = localPath(log_path_).toStdString(); - const std::string decompressed_file = bzip_file + "_decompressed"; - bool is_bzip = !util::file_exists(decompressed_file); - log = std::make_unique(); - return log->load(is_bzip ? bzip_file : decompressed_file, is_bzip); - })); - - QString camera_files[] = {road_cam_path_, files_.driver_cam, files_.wide_road_cam}; - for (int i = 0; i < std::size(camera_files); ++i) { - if (!camera_files[i].isEmpty()) { - futures.emplace_back(std::async(std::launch::async, [=]() { - frames[i] = std::make_unique(); - return frames[i]->load(localPath(camera_files[i]).toStdString()); - })); + if (!aborting_ && file_ready) { + if (id < MAX_CAMERAS) { + frames[id] = std::make_unique(); + success_ = success_ && frames[id]->load(local_file); + } else { + std::string decompressed = cacheFilePath(local_file + ".decompressed"); + if (!util::file_exists(decompressed)) { + std::ofstream ostrm(decompressed, std::ios::binary); + readBZ2File(local_file, ostrm); + } + log = std::make_unique(); + success_ = success_ && log->load(decompressed); } } - int success_cnt = std::accumulate(futures.begin(), futures.end(), 0, [=](int v, auto &f) { return f.get() + v; }); - loaded_ = (success_cnt == futures.size()); - emit loadFinished(); + if (!aborting_ && --loading_ == 0) { + emit loadFinished(success_); + } } -QString Segment::localPath(const QUrl &url) { - if (url.isLocalFile() || QFile(url.toString()).exists()) return url.toString(); - - QByteArray url_no_query = url.toString(QUrl::RemoveQuery).toUtf8(); - return CACHE_DIR.filePath(QString(QCryptographicHash::hash(url_no_query, QCryptographicHash::Sha256).toHex())); +std::string Segment::cacheFilePath(const std::string &file) { + QString url_no_query = QUrl(file.c_str()).toString(QUrl::RemoveQuery); + QString sha256 = QCryptographicHash::hash(url_no_query.toUtf8(), QCryptographicHash::Sha256).toHex(); + return CACHE_DIR.filePath(sha256 + "." + QFileInfo(url_no_query).suffix()).toStdString(); } diff --git a/selfdrive/ui/replay/route.h b/selfdrive/ui/replay/route.h index 754c25f380..c4ab0cd2a3 100644 --- a/selfdrive/ui/replay/route.h +++ b/selfdrive/ui/replay/route.h @@ -1,16 +1,13 @@ #pragma once #include -#include -#include -#include +#include #include "selfdrive/common/util.h" #include "selfdrive/ui/replay/framereader.h" #include "selfdrive/ui/replay/logreader.h" const QDir CACHE_DIR(util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/").c_str()); -const int connections_per_file = 3; struct SegmentFile { QString rlog; @@ -23,45 +20,42 @@ struct SegmentFile { class Route { public: - Route(const QString &route, const QString &data_dir = {}); + Route(const QString &route, const QString &data_dir = {}) : route_(route), data_dir_(data_dir) {}; bool load(); inline const QString &name() const { return route_; }; - inline int size() const { return segments_.size(); } - inline SegmentFile &at(int n) { return segments_[n]; } + inline const std::map &segments() const { return segments_; } + inline const SegmentFile &at(int n) { return segments_.at(n); } protected: bool loadFromLocal(); + bool loadFromServer(); bool loadFromJson(const QString &json); + void addFileToSegment(int seg_num, const QString &file); QString route_; QString data_dir_; - std::vector segments_; + std::map segments_; }; class Segment : public QObject { Q_OBJECT public: - Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam); + Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam); ~Segment(); - inline bool isLoaded() const { return loaded_; } + inline bool isLoaded() const { return !loading_ && success_; } + const int seg_num = 0; std::unique_ptr log; std::unique_ptr frames[MAX_CAMERAS] = {}; signals: - void loadFinished(); + void loadFinished(bool success); protected: - void load(); - void downloadFile(const QString &url); - QString localPath(const QUrl &url); + void loadFile(int id, const std::string file); + std::string cacheFilePath(const std::string &file); - std::atomic loaded_ = false; - std::atomic aborting_ = false; - std::atomic downloading_ = 0; - int seg_num_ = 0; - SegmentFile files_; - QString road_cam_path_; - QString log_path_; - std::vector download_threads_; + std::atomic success_ = true, aborting_ = false; + std::atomic loading_ = 0; + QFutureSynchronizer synchronizer_; }; diff --git a/selfdrive/ui/replay/tests/test_replay.cc b/selfdrive/ui/replay/tests/test_replay.cc index b374239f36..fa4f196346 100644 --- a/selfdrive/ui/replay/tests/test_replay.cc +++ b/selfdrive/ui/replay/tests/test_replay.cc @@ -14,7 +14,6 @@ std::string sha_256(const QString &dat) { TEST_CASE("httpMultiPartDownload") { char filename[] = "/tmp/XXXXXX"; int fd = mkstemp(filename); - REQUIRE(fd != -1); close(fd); const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc"; @@ -54,7 +53,7 @@ bool is_events_ordered(const std::vector &events) { TEST_CASE("Segment") { Route demo_route(DEMO_ROUTE); REQUIRE(demo_route.load()); - REQUIRE(demo_route.size() == 11); + REQUIRE(demo_route.segments().size() == 11); QEventLoop loop; Segment segment(0, demo_route.at(0), false, false); @@ -132,7 +131,6 @@ void TestReplay::test_seek() { segments_.erase(n); } for (int i =0; i < 50; ++i) { - testSeekTo(520); testSeekTo(random_int(4 * 60, 9 * 60)); } loop.quit(); diff --git a/selfdrive/ui/replay/util.cc b/selfdrive/ui/replay/util.cc index e48891cde6..68048b623d 100644 --- a/selfdrive/ui/replay/util.cc +++ b/selfdrive/ui/replay/util.cc @@ -49,10 +49,12 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil int64_t content_length = getDownloadContentLength(url); if (content_length == -1) return false; + // create a tmp sparse file std::string tmp_file = target_file + ".tmp"; FILE *fp = fopen(tmp_file.c_str(), "wb"); - // create a sparse file - fseek(fp, content_length, SEEK_SET); + assert(fp); + fseek(fp, content_length - 1, SEEK_SET); + fwrite("\0", 1, 1, fp); CURLM *cm = curl_multi_init(); std::map writers;