diff --git a/selfdrive/camerad/SConscript b/selfdrive/camerad/SConscript index 65be54aa4e..d018704d60 100644 --- a/selfdrive/camerad/SConscript +++ b/selfdrive/camerad/SConscript @@ -21,8 +21,12 @@ else: if USE_FRAME_STREAM: cameras = ['cameras/camera_frame_stream.cc'] else: - libs += ['avutil', 'avcodec', 'avformat', 'swscale'] - cameras = ['cameras/camera_replay.cc', env.Object('camera-framereader', '#/selfdrive/ui/replay/framereader.cc')] + libs += ['avutil', 'avcodec', 'avformat', 'swscale', 'bz2', 'ssl', 'curl', 'crypto'] + # TODO: import replay_lib from root SConstruct + cameras = ['cameras/camera_replay.cc', + env.Object('camera-util', '#/selfdrive/ui/replay/util.cc'), + env.Object('camera-framereader', '#/selfdrive/ui/replay/framereader.cc'), + env.Object('camera-filereader', '#/selfdrive/ui/replay/filereader.cc')] if arch == "Darwin": del libs[libs.index('OpenCL')] diff --git a/selfdrive/camerad/cameras/camera_replay.cc b/selfdrive/camerad/cameras/camera_replay.cc index 0345f37b74..6bc53d8c94 100644 --- a/selfdrive/camerad/cameras/camera_replay.cc +++ b/selfdrive/camerad/cameras/camera_replay.cc @@ -23,8 +23,7 @@ std::string get_url(std::string route_name, const std::string &camera, int segme } void camera_init(VisionIpcServer *v, CameraState *s, int camera_id, unsigned int fps, cl_device_id device_id, cl_context ctx, VisionStreamType rgb_type, VisionStreamType yuv_type, const std::string &url) { - // TODO: cache url file - s->frame = new FrameReader(); + s->frame = new FrameReader(true); if (!s->frame->load(url)) { printf("failed to load stream from %s", url.c_str()); assert(0); diff --git a/selfdrive/loggerd/SConscript b/selfdrive/loggerd/SConscript index dfa3dca714..7e41c9d3ee 100644 --- a/selfdrive/loggerd/SConscript +++ b/selfdrive/loggerd/SConscript @@ -28,4 +28,4 @@ env.Program(src, LIBS=libs) env.Program('bootlog.cc', LIBS=libs) if GetOption('test'): - env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc', env.Object('logger_util', '#/selfdrive/ui/replay/util.cc')], LIBS=[libs] + ['curl']) + env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc', env.Object('logger_util', '#/selfdrive/ui/replay/util.cc')], LIBS=[libs] + ['curl', 'crypto']) diff --git a/selfdrive/loggerd/tests/test_logger.cc b/selfdrive/loggerd/tests/test_logger.cc index e59c2f0153..baef312ad8 100644 --- a/selfdrive/loggerd/tests/test_logger.cc +++ b/selfdrive/loggerd/tests/test_logger.cc @@ -21,12 +21,8 @@ void verify_segment(const std::string &route_path, int segment, int max_segment, REQUIRE(!util::file_exists(segment_path + "/rlog.bz2.lock")); for (const char *fn : {"/rlog.bz2", "/qlog.bz2"}) { const std::string log_file = segment_path + fn; - INFO(log_file); - - std::ostringstream stream; - bool ret = readBZ2File(log_file, stream); - REQUIRE(ret); - std::string log = stream.str(); + std::string log = decompressBZ2(util::read_file(log_file)); + REQUIRE(!log.empty()); int event_cnt = 0, i = 0; kj::ArrayPtr words((capnp::word *)log.data(), log.size() / sizeof(capnp::word)); while (words.size() > 0) { diff --git a/selfdrive/ui/SConscript b/selfdrive/ui/SConscript index 4d3ae49ba0..17441b6ba6 100644 --- a/selfdrive/ui/SConscript +++ b/selfdrive/ui/SConscript @@ -112,7 +112,7 @@ if GetOption('extras'): if arch in ['x86_64', 'Darwin'] or GetOption('extras'): qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"] - replay_lib_src = ["replay/replay.cc", "replay/camera.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"] + replay_lib_src = ["replay/replay.cc", "replay/camera.cc", "replay/filereader.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"] replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs) replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'swscale'] + qt_libs diff --git a/selfdrive/ui/replay/filereader.cc b/selfdrive/ui/replay/filereader.cc new file mode 100644 index 0000000000..1585631401 --- /dev/null +++ b/selfdrive/ui/replay/filereader.cc @@ -0,0 +1,62 @@ +#include "selfdrive/ui/replay/filereader.h" + +#include + +#include +#include +#include +#include +#include + +#include "selfdrive/common/util.h" +#include "selfdrive/ui/replay/util.h" + +std::string cacheFilePath(const std::string &url) { + static std::string cache_path = [] { + const std::string comma_cache = util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/"); + util::create_directories(comma_cache, 0755); + return comma_cache.back() == '/' ? comma_cache : comma_cache + "/"; + }(); + + return cache_path + sha256(getUrlWithoutQuery(url));; +} + +std::string FileReader::read(const std::string &file, std::atomic *abort) { + const bool is_remote = file.find("https://") == 0; + const std::string local_file = is_remote ? cacheFilePath(file) : file; + std::string result; + + if ((!is_remote || cache_to_local_) && util::file_exists(local_file)) { + result = util::read_file(local_file); + } else if (is_remote) { + result = download(file, abort); + if (cache_to_local_ && !result.empty()) { + std::ofstream fs(local_file, fs.binary | fs.out); + fs.write(result.data(), result.size()); + } + } + return result; +} + +std::string FileReader::download(const std::string &url, std::atomic *abort) { + std::string result; + size_t remote_file_size = 0; + for (int i = 0; i <= max_retries_ && !(abort && *abort); ++i) { + if (i > 0) { + std::cout << "download failed, retrying" << i << std::endl; + } + if (remote_file_size <= 0) { + remote_file_size = getRemoteFileSize(url); + } + if (remote_file_size > 0 && !(abort && *abort)) { + std::ostringstream oss; + result.resize(remote_file_size); + oss.rdbuf()->pubsetbuf(result.data(), result.size()); + int chunks = chunk_size_ > 0 ? std::min(1, (int)std::nearbyint(remote_file_size / (float)chunk_size_)) : 1; + if (httpMultiPartDownload(url, oss, chunks, remote_file_size, abort)) { + return result; + } + } + } + return {}; +} diff --git a/selfdrive/ui/replay/filereader.h b/selfdrive/ui/replay/filereader.h new file mode 100644 index 0000000000..06ce14e9f2 --- /dev/null +++ b/selfdrive/ui/replay/filereader.h @@ -0,0 +1,20 @@ +#pragma once + +#include +#include + +class FileReader { +public: + FileReader(bool cache_to_local, int chunk_size = -1, int max_retries = 3) + : cache_to_local_(cache_to_local), chunk_size_(chunk_size), max_retries_(max_retries) {} + virtual ~FileReader() {} + std::string read(const std::string &file, std::atomic *abort = nullptr); + +private: + std::string download(const std::string &url, std::atomic *abort); + int chunk_size_; + int max_retries_; + bool cache_to_local_; +}; + +std::string cacheFilePath(const std::string &url); diff --git a/selfdrive/ui/replay/framereader.cc b/selfdrive/ui/replay/framereader.cc index dd29d6363f..b6092a0df9 100644 --- a/selfdrive/ui/replay/framereader.cc +++ b/selfdrive/ui/replay/framereader.cc @@ -3,8 +3,11 @@ #include #include #include +#include -static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) { +namespace { + +int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) { std::mutex *mutex = (std::mutex *)*arg; switch (op) { case AV_LOCK_CREATE: @@ -22,38 +25,56 @@ static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) { return 0; } -struct AVInitializer { - AVInitializer() { - int ret = av_lockmgr_register(ffmpeg_lockmgr_cb); - assert(ret >= 0); +int readFunction(void *opaque, uint8_t *buf, int buf_size) { + auto &iss = *reinterpret_cast(opaque); + iss.read(reinterpret_cast(buf), buf_size); + return iss.gcount() ? iss.gcount() : AVERROR_EOF; +} + +} // namespace + +FrameReader::FrameReader(bool local_cache, int chunk_size, int retries) : FileReader(local_cache, chunk_size, retries) { + static std::once_flag once_flag; + std::call_once(once_flag, [] { + av_lockmgr_register(ffmpeg_lockmgr_cb); av_register_all(); - avformat_network_init(); - } - ~AVInitializer() { avformat_network_deinit(); } -}; + }); + + pFormatCtx_ = avformat_alloc_context(); + av_frame_ = av_frame_alloc(); + rgb_frame_ = av_frame_alloc(); + yuv_frame_ = av_frame_alloc();; -FrameReader::FrameReader() { - static AVInitializer av_initializer; } FrameReader::~FrameReader() { for (auto &f : frames_) { av_free_packet(&f.pkt); } - if (pCodecCtx_) { - avcodec_close(pCodecCtx_); - avcodec_free_context(&pCodecCtx_); - } + if (pCodecCtx_) avcodec_free_context(&pCodecCtx_); if (pFormatCtx_) avformat_close_input(&pFormatCtx_); if (av_frame_) av_frame_free(&av_frame_); if (rgb_frame_) av_frame_free(&rgb_frame_); if (yuv_frame_) av_frame_free(&yuv_frame_); if (rgb_sws_ctx_) sws_freeContext(rgb_sws_ctx_); if (yuv_sws_ctx_) sws_freeContext(yuv_sws_ctx_); + + if (avio_ctx_) { + av_freep(&avio_ctx_->buffer); + av_freep(&avio_ctx_); + } } -bool FrameReader::load(const std::string &url) { - pFormatCtx_ = avformat_alloc_context(); +bool FrameReader::load(const std::string &url, std::atomic *abort) { + std::string content = read(url, abort); + if (content.empty()) return false; + + std::istringstream iss(content); + const int avio_ctx_buffer_size = 64 * 1024; + unsigned char *avio_ctx_buffer = (unsigned char *)av_malloc(avio_ctx_buffer_size); + avio_ctx_ = avio_alloc_context(avio_ctx_buffer, avio_ctx_buffer_size, 0, &iss, readFunction, nullptr, nullptr); + pFormatCtx_->pb = avio_ctx_; + pFormatCtx_->probesize = 10 * 1024 * 1024; // 10MB if (avformat_open_input(&pFormatCtx_, url.c_str(), NULL, NULL) != 0) { printf("error loading %s\n", url.c_str()); @@ -75,10 +96,6 @@ bool FrameReader::load(const std::string &url) { ret = avcodec_open2(pCodecCtx_, pCodec, NULL); if (ret < 0) return false; - av_frame_ = av_frame_alloc(); - rgb_frame_ = av_frame_alloc(); - yuv_frame_ = av_frame_alloc();; - width = (pCodecCtxOrig->width + 3) & ~3; height = pCodecCtxOrig->height; rgb_sws_ctx_ = sws_getContext(pCodecCtxOrig->width, pCodecCtxOrig->height, AV_PIX_FMT_YUV420P, @@ -92,7 +109,7 @@ bool FrameReader::load(const std::string &url) { if (!yuv_sws_ctx_) return false; frames_.reserve(60 * 20); // 20fps, one minute - while (true) { + while (!(abort && *abort)) { Frame &frame = frames_.emplace_back(); int err = av_read_frame(pFormatCtx_, &frame.pkt); if (err < 0) { diff --git a/selfdrive/ui/replay/framereader.h b/selfdrive/ui/replay/framereader.h index b05336c0eb..2d42e135fe 100644 --- a/selfdrive/ui/replay/framereader.h +++ b/selfdrive/ui/replay/framereader.h @@ -2,6 +2,7 @@ #include #include +#include "selfdrive/ui/replay/filereader.h" extern "C" { #include @@ -10,11 +11,11 @@ extern "C" { #include } -class FrameReader { +class FrameReader : protected FileReader { public: - FrameReader(); + FrameReader(bool local_cache = false, int chunk_size = -1, int retries = 0); ~FrameReader(); - bool load(const std::string &url); + bool load(const std::string &url, std::atomic *abort = nullptr); bool get(int idx, uint8_t *rgb, uint8_t *yuv); int getRGBSize() const { return width * height * 3; } int getYUVSize() const { return width * height * 3 / 2; } @@ -39,4 +40,5 @@ private: AVCodecContext *pCodecCtx_ = nullptr; int key_frames_count_ = 0; bool valid_ = false; + AVIOContext *avio_ctx_ = nullptr; }; diff --git a/selfdrive/ui/replay/logreader.cc b/selfdrive/ui/replay/logreader.cc index 04c42bc1a4..10be8a19c7 100644 --- a/selfdrive/ui/replay/logreader.cc +++ b/selfdrive/ui/replay/logreader.cc @@ -1,7 +1,6 @@ #include "selfdrive/ui/replay/logreader.h" -#include -#include "selfdrive/common/util.h" +#include #include "selfdrive/ui/replay/util.h" Event::Event(const kj::ArrayPtr &amsg, bool frame) : reader(amsg), frame(frame) { @@ -27,7 +26,7 @@ Event::Event(const kj::ArrayPtr &amsg, bool frame) : reader(a // class LogReader -LogReader::LogReader(size_t memory_pool_block_size) { +LogReader::LogReader(bool local_cache, int chunk_size, int retries, size_t memory_pool_block_size) : FileReader(local_cache, chunk_size, retries) { #ifdef HAS_MEMORY_RESOURCE const size_t buf_size = sizeof(Event) * memory_pool_block_size; pool_buffer_ = ::operator new(buf_size); @@ -47,18 +46,9 @@ LogReader::~LogReader() { #endif } -bool LogReader::load(const std::string &file) { - bool is_bz2 = file.rfind(".bz2") == file.length() - 4; - if (is_bz2) { - std::ostringstream stream; - if (!readBZ2File(file, stream)) { - LOGW("bz2 decompress failed"); - return false; - } - raw_ = stream.str(); - } else { - raw_ = util::read_file(file); - } +bool LogReader::load(const std::string &file, std::atomic *abort) { + raw_ = decompressBZ2(read(file, abort)); + if (raw_.empty()) return false; kj::ArrayPtr words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); while (words.size() > 0) { diff --git a/selfdrive/ui/replay/logreader.h b/selfdrive/ui/replay/logreader.h index 0fb3711325..5bb613d9de 100644 --- a/selfdrive/ui/replay/logreader.h +++ b/selfdrive/ui/replay/logreader.h @@ -7,6 +7,7 @@ #include "cereal/gen/cpp/log.capnp.h" #include "selfdrive/camerad/cameras/camera_common.h" +#include "selfdrive/ui/replay/filereader.h" const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam}; const int MAX_CAMERAS = std::size(ALL_CAMERAS); @@ -45,11 +46,11 @@ public: bool frame; }; -class LogReader { +class LogReader : protected FileReader { public: - LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE); + LogReader(bool local_cache = false, int chunk_size = -1, int retries = 0, size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE); ~LogReader(); - bool load(const std::string &file); + bool load(const std::string &file, std::atomic *abort = nullptr); std::vector events; diff --git a/selfdrive/ui/replay/main.cc b/selfdrive/ui/replay/main.cc index 2f417fc6a5..b0fe8eb3ad 100644 --- a/selfdrive/ui/replay/main.cc +++ b/selfdrive/ui/replay/main.cc @@ -1,20 +1,26 @@ -#include "selfdrive/ui/replay/replay.h" - -#include -#include #include #include #include #include #include +#include +#include + +#include "selfdrive/ui/replay/replay.h" const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36"; struct termios oldt = {}; +Replay *replay = nullptr; void sigHandler(int s) { std::signal(s, SIG_DFL); - tcsetattr(STDIN_FILENO, TCSANOW, &oldt); + if (oldt.c_lflag) { + tcsetattr(STDIN_FILENO, TCSANOW, &oldt); + } + if (replay) { + replay->stop(); + } qApp->quit(); } @@ -69,7 +75,7 @@ void keyboardThread(Replay *replay) { } } -int main(int argc, char *argv[]){ +int main(int argc, char *argv[]) { QApplication app(argc, argv); std::signal(SIGINT, sigHandler); std::signal(SIGTERM, sigHandler); @@ -78,6 +84,7 @@ int main(int argc, char *argv[]){ {"dcam", REPLAY_FLAG_DCAM, "load driver camera"}, {"ecam", REPLAY_FLAG_ECAM, "load wide road camera"}, {"no-loop", REPLAY_FLAG_NO_LOOP, "stop at the end of the route"}, + {"no-cache", REPLAY_FLAG_NO_FILE_CACHE, "turn off local cache"}, }; QCommandLineParser parser; @@ -109,7 +116,7 @@ int main(int argc, char *argv[]){ replay_flags |= flag; } } - Replay *replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app); + replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app); if (!replay->load()) { return 0; } diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index 67806f5560..5b17fd5e4a 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -5,8 +5,8 @@ #include #include "cereal/services.h" -#include "selfdrive/common/timing.h" #include "selfdrive/common/params.h" +#include "selfdrive/common/timing.h" #include "selfdrive/hardware/hw.h" #include "selfdrive/ui/replay/util.h" @@ -35,16 +35,21 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s } Replay::~Replay() { + stop(); + delete pm; + delete events_; +} + +void Replay::stop() { + if (stream_thread_ == nullptr) return; + qDebug() << "shutdown: in progress..."; exit_ = updating_events_ = true; - if (stream_thread_) { - stream_cv_.notify_one(); - stream_thread_->quit(); - stream_thread_->wait(); - } + stream_cv_.notify_one(); + stream_thread_->quit(); + stream_thread_->wait(); + stream_thread_ = nullptr; - delete pm; - delete events_; segments_.clear(); camera_server_.reset(nullptr); qDebug() << "shutdown: done"; @@ -91,6 +96,7 @@ void Replay::doSeek(int seconds, bool relative) { if (relative) { seconds += currentSeconds(); } + seconds = std::max(0, seconds); int seg = seconds / 60; if (segments_.find(seg) == segments_.end()) { qInfo() << "can't seek to" << seconds << "s, segment" << seg << "is invalid"; @@ -134,26 +140,30 @@ void Replay::segmentLoadFinished(bool success) { void Replay::queueSegment() { if (segments_.empty()) return; - SegmentMap::iterator begin, cur, end; - begin = cur = end = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first)); - // set fwd to 0 to just load the current segment when seeking to a new window. - const int fwd = cur->second == nullptr ? 0 : FORWARD_SEGS; - for (int i = 0; end != segments_.end() && i <= fwd; ++end, ++i) { - auto &[n, seg] = *end; - if (!seg) { - seg = std::make_unique(n, route_->at(n), hasFlag(REPLAY_FLAG_DCAM), hasFlag(REPLAY_FLAG_ECAM)); - QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); - qInfo() << "loading segment" << n << "..."; + SegmentMap::iterator cur, end; + cur = end = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first)); + for (int i = 0; end != segments_.end() && i <= FORWARD_SEGS; ++i) { + ++end; + } + // load one segment at a time + for (auto it = cur; it != end; ++it) { + if (!it->second) { + if (it == cur || std::prev(it)->second->isLoaded()) { + auto &[n, seg] = *it; + seg = std::make_unique(n, route_->at(n), hasFlag(REPLAY_FLAG_DCAM), hasFlag(REPLAY_FLAG_ECAM), !hasFlag(REPLAY_FLAG_NO_FILE_CACHE)); + QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); + qInfo() << "loading segment" << n << "..."; + } + break; } } - const auto &cur_segment = cur->second; enableHttpLogging(!cur_segment->isLoaded()); // merge the previous adjacent segment if it's loaded - auto prev = segments_.find(cur_segment->seg_num - 1); - if (prev != segments_.end() && prev->second && prev->second->isLoaded()) { - begin = prev; + auto begin = segments_.find(cur_segment->seg_num - 1); + if (begin == segments_.end() || !(begin->second && begin->second->isLoaded())) { + begin = cur; } mergeSegments(begin, end); @@ -168,9 +178,9 @@ void Replay::queueSegment() { } void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) { - // segments must be merged in sequence. + // merge 3 segments in sequence. std::vector segments_need_merge; - for (auto it = begin; it != end && it->second->isLoaded(); ++it) { + for (auto it = begin; it != end && it->second->isLoaded() && segments_need_merge.size() < 3; ++it) { segments_need_merge.push_back(it->first); } diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index 5b5f7dfa8b..57d9b582d3 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -5,13 +5,15 @@ #include "selfdrive/ui/replay/camera.h" #include "selfdrive/ui/replay/route.h" -constexpr int FORWARD_SEGS = 2; +// one segment uses about 100M of memory +constexpr int FORWARD_SEGS = 5; enum REPLAY_FLAGS { REPLAY_FLAG_NONE = 0x0000, REPLAY_FLAG_DCAM = 0x0002, REPLAY_FLAG_ECAM = 0x0004, REPLAY_FLAG_NO_LOOP = 0x0010, + REPLAY_FLAG_NO_FILE_CACHE = 0x0020, }; class Replay : public QObject { @@ -23,6 +25,7 @@ public: ~Replay(); bool load(); void start(int seconds = 0); + void stop(); void pause(bool pause); bool isPaused() const { return paused_; } inline bool hasFlag(REPLAY_FLAGS flag) { return flags_ & flag; }; diff --git a/selfdrive/ui/replay/route.cc b/selfdrive/ui/replay/route.cc index 6338fd16e6..04ad7b2144 100644 --- a/selfdrive/ui/replay/route.cc +++ b/selfdrive/ui/replay/route.cc @@ -1,15 +1,19 @@ #include "selfdrive/ui/replay/route.h" +#include #include #include #include #include +#include #include "selfdrive/hardware/hw.h" #include "selfdrive/ui/qt/api.h" #include "selfdrive/ui/replay/util.h" -Route::Route(const QString &route, const QString &data_dir) : route_(parseRoute(route)), data_dir_(data_dir) {} +Route::Route(const QString &route, const QString &data_dir) : data_dir_(data_dir) { + route_ = parseRoute(route); +} RouteIdentifier Route::parseRoute(const QString &str) { QRegExp rx(R"(^([a-z0-9]{16})([|_/])(\d{4}-\d{2}-\d{2}--\d{2}-\d{2}-\d{2})(?:(--|/)(\d*))?$)"); @@ -41,7 +45,7 @@ bool Route::loadFromServer() { bool Route::loadFromJson(const QString &json) { QRegExp rx(R"(\/(\d+)\/)"); - for (const auto &value : QJsonDocument::fromJson(json.trimmed().toUtf8()).object()) { + for (const auto &value : QJsonDocument::fromJson(json.trimmed().toUtf8()).object()) { for (const auto &url : value.toArray()) { QString url_str = url.toString(); if (rx.indexIn(url_str) != -1) { @@ -86,10 +90,7 @@ void Route::addFileToSegment(int n, const QString &file) { // class Segment -Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam) : seg_num(n) { - static std::once_flag once_flag; - std::call_once(once_flag, [=]() { if (!CACHE_DIR.exists()) QDir().mkdir(CACHE_DIR.absolutePath()); }); - +Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam, bool local_cache) : seg_num(n) { // [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog const QString file_list[] = { files.road_cam.isEmpty() ? files.qcamera : files.road_cam, @@ -100,71 +101,34 @@ Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam for (int i = 0; i < std::size(file_list); i++) { if (!file_list[i].isEmpty()) { loading_++; - QThread *t = new QThread(); - QObject::connect(t, &QThread::started, [=]() { loadFile(i, file_list[i].toStdString()); }); - loading_threads_.emplace_back(t)->start(); + synchronizer_.addFuture(QtConcurrent::run([=] { loadFile(i, file_list[i].toStdString(), local_cache); })); } } } Segment::~Segment() { - aborting_ = true; - for (QThread *t : loading_threads_) { - if (t->isRunning()) { - t->quit(); - t->wait(); - } - delete t; - } + disconnect(); + abort_ = true; + synchronizer_.setCancelOnWait(true); + synchronizer_.waitForFinished(); } -void Segment::loadFile(int id, const std::string file) { - const bool is_remote = file.find("https://") == 0; - const std::string local_file = is_remote ? cacheFilePath(file) : file; - bool file_ready = util::file_exists(local_file); - - if (!file_ready && is_remote) { - file_ready = downloadFile(id, file, local_file); - } - - if (!aborting_ && file_ready) { - if (id < MAX_CAMERAS) { - frames[id] = std::make_unique(); - success_ = success_ && frames[id]->load(local_file); - } else { - std::string decompressed = cacheFilePath(local_file + ".decompressed"); - if (!util::file_exists(decompressed)) { - std::ofstream ostrm(decompressed, std::ios::binary); - readBZ2File(local_file, ostrm); - } - log = std::make_unique(); - success_ = success_ && log->load(decompressed); - } +void Segment::loadFile(int id, const std::string file, bool local_cache) { + bool success = false; + if (id < MAX_CAMERAS) { + frames[id] = std::make_unique(local_cache, 20 * 1024 * 1024, 3); + success = frames[id]->load(file, &abort_); + } else { + log = std::make_unique(local_cache, -1, 3); + success = log->load(file, &abort_); } - if (!aborting_ && --loading_ == 0) { - emit loadFinished(success_); + if (!success) { + // abort all loading jobs. + abort_ = true; } -} - -bool Segment::downloadFile(int id, const std::string &url, const std::string local_file) { - bool ret = false; - int retries = 0; - while (!aborting_) { - ret = httpMultiPartDownload(url, local_file, id < MAX_CAMERAS ? 3 : 1, &aborting_); - if (ret || aborting_) break; - if (++retries > max_retries_) { - qInfo() << "download failed after retries" << max_retries_; - break; - } - qInfo() << "download failed, retrying" << retries; + if (--loading_ == 0) { + emit loadFinished(!abort_); } - return ret; -} - -std::string Segment::cacheFilePath(const std::string &file) { - QString url_no_query = QUrl(file.c_str()).toString(QUrl::RemoveQuery); - QString sha256 = QCryptographicHash::hash(url_no_query.toUtf8(), QCryptographicHash::Sha256).toHex(); - return CACHE_DIR.filePath(sha256 + "." + QFileInfo(url_no_query).suffix()).toStdString(); } diff --git a/selfdrive/ui/replay/route.h b/selfdrive/ui/replay/route.h index f0609c5ef3..14289cc4f1 100644 --- a/selfdrive/ui/replay/route.h +++ b/selfdrive/ui/replay/route.h @@ -1,14 +1,10 @@ #pragma once -#include -#include +#include -#include "selfdrive/common/util.h" #include "selfdrive/ui/replay/framereader.h" #include "selfdrive/ui/replay/logreader.h" -const QDir CACHE_DIR(util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/").c_str()); - struct RouteIdentifier { QString dongle_id; QString timestamp; @@ -49,9 +45,9 @@ class Segment : public QObject { Q_OBJECT public: - Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam); + Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam, bool local_cache); ~Segment(); - inline bool isLoaded() const { return !loading_ && success_; } + inline bool isLoaded() const { return !loading_ && !abort_; } const int seg_num = 0; std::unique_ptr log; @@ -61,12 +57,9 @@ signals: void loadFinished(bool success); protected: - void loadFile(int id, const std::string file); - bool downloadFile(int id, const std::string &url, const std::string local_file); - std::string cacheFilePath(const std::string &file); + void loadFile(int id, const std::string file, bool local_cache); - std::atomic success_ = true, aborting_ = false; + std::atomic abort_ = false; std::atomic loading_ = 0; - std::vector loading_threads_; - const int max_retries_ = 3; + QFutureSynchronizer synchronizer_; }; diff --git a/selfdrive/ui/replay/tests/test_replay.cc b/selfdrive/ui/replay/tests/test_replay.cc index 4693a9807a..6e2ec15a34 100644 --- a/selfdrive/ui/replay/tests/test_replay.cc +++ b/selfdrive/ui/replay/tests/test_replay.cc @@ -1,31 +1,37 @@ -#include #include #include +#include +#include #include "catch2/catch.hpp" +#include "selfdrive/common/util.h" #include "selfdrive/ui/replay/replay.h" #include "selfdrive/ui/replay/util.h" const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36"; -std::string sha_256(const QString &dat) { - return QString(QCryptographicHash::hash(dat.toUtf8(), QCryptographicHash::Sha256).toHex()).toStdString(); -} +const std::string TEST_RLOG_URL = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2"; +const std::string TEST_RLOG_CHECKSUM = "5b966d4bb21a100a8c4e59195faeb741b975ccbe268211765efd1763d892bfb3"; TEST_CASE("httpMultiPartDownload") { char filename[] = "/tmp/XXXXXX"; close(mkstemp(filename)); - const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2"; - SECTION("5 connections") { - REQUIRE(httpMultiPartDownload(stream_url, filename, 5)); + std::string content; + auto file_size = getRemoteFileSize(TEST_RLOG_URL); + REQUIRE(file_size > 0); + SECTION("5 connections, download to file") { + std::ofstream of(filename, of.binary | of.out); + REQUIRE(httpMultiPartDownload(TEST_RLOG_URL, of, 5, file_size)); + content = util::read_file(filename); } - SECTION("1 connection") { - REQUIRE(httpMultiPartDownload(stream_url, filename, 1)); + SECTION("5 connection, download to buffer") { + std::ostringstream oss; + content.resize(file_size); + oss.rdbuf()->pubsetbuf(content.data(), content.size()); + REQUIRE(httpMultiPartDownload(TEST_RLOG_URL, oss, 5, file_size)); } - std::string content = util::read_file(filename); REQUIRE(content.size() == 9112651); - std::string checksum = sha_256(QString::fromStdString(content)); - REQUIRE(checksum == "e44edfbb545abdddfd17020ced2b18b6ec36506152267f32b6a8e3341f8126d6"); + REQUIRE(sha256(content) == TEST_RLOG_CHECKSUM); } int random_int(int min, int max) { @@ -35,13 +41,28 @@ int random_int(int min, int max) { return dist(rng); } +TEST_CASE("FileReader") { + auto enable_local_cache = GENERATE(true, false); + std::string cache_file = cacheFilePath(TEST_RLOG_URL); + system(("rm " + cache_file + " -f").c_str()); + + FileReader reader(enable_local_cache); + std::string content = reader.read(TEST_RLOG_URL); + REQUIRE(sha256(content) == TEST_RLOG_CHECKSUM); + if (enable_local_cache) { + REQUIRE(sha256(util::read_file(cache_file)) == TEST_RLOG_CHECKSUM); + } else { + REQUIRE(util::file_exists(cache_file) == false); + } +} + TEST_CASE("Segment") { Route demo_route(DEMO_ROUTE); REQUIRE(demo_route.load()); REQUIRE(demo_route.segments().size() == 11); QEventLoop loop; - Segment segment(0, demo_route.at(0), false, false); + Segment segment(0, demo_route.at(0), false, false, false); QObject::connect(&segment, &Segment::loadFinished, [&]() { REQUIRE(segment.isLoaded() == true); REQUIRE(segment.log != nullptr); @@ -68,8 +89,8 @@ TEST_CASE("Segment") { // helper class for unit tests class TestReplay : public Replay { -public: - TestReplay(const QString &route) : Replay(route, {}, {}) {} + public: + TestReplay(const QString &route, uint8_t flags = REPLAY_FLAG_NO_FILE_CACHE) : Replay(route, {}, {}, nullptr, flags) {} void test_seek(); void testSeekTo(int seek_to); }; @@ -113,7 +134,7 @@ void TestReplay::test_seek() { QEventLoop loop; std::thread thread = std::thread([&]() { for (int i = 0; i < 100; ++i) { - testSeekTo(random_int(0, 5 * 60)); + testSeekTo(random_int(0, 3 * 60)); } loop.quit(); }); @@ -122,7 +143,8 @@ void TestReplay::test_seek() { } TEST_CASE("Replay") { - TestReplay replay(DEMO_ROUTE); + auto flag = GENERATE(REPLAY_FLAG_NO_FILE_CACHE, REPLAY_FLAG_NONE); + TestReplay replay(DEMO_ROUTE, flag); REQUIRE(replay.load()); replay.test_seek(); } diff --git a/selfdrive/ui/replay/util.cc b/selfdrive/ui/replay/util.cc index d350c144b9..0560571474 100644 --- a/selfdrive/ui/replay/util.cc +++ b/selfdrive/ui/replay/util.cc @@ -1,42 +1,60 @@ #include "selfdrive/ui/replay/util.h" -#include +#include +#include +#include + #include +#include #include #include #include - -#include -#include +#include #include "selfdrive/common/timing.h" #include "selfdrive/common/util.h" +namespace { + +static std::atomic enable_http_logging = false; + struct CURLGlobalInitializer { CURLGlobalInitializer() { curl_global_init(CURL_GLOBAL_DEFAULT); } ~CURLGlobalInitializer() { curl_global_cleanup(); } }; struct MultiPartWriter { - int64_t offset; - int64_t end; - int64_t written; - FILE *fp; + size_t offset; + size_t end; + size_t written; + std::ostream *os; }; -static size_t write_cb(char *data, size_t size, size_t count, void *userp) { +size_t write_cb(char *data, size_t size, size_t count, void *userp) { MultiPartWriter *w = (MultiPartWriter *)userp; - fseek(w->fp, w->offset, SEEK_SET); - fwrite(data, size, count, w->fp); + w->os->seekp(w->offset); size_t bytes = size * count; + w->os->write(data, bytes); w->offset += bytes; w->written += bytes; return bytes; } -static size_t dumy_write_cb(char *data, size_t size, size_t count, void *userp) { return size * count; } +size_t dumy_write_cb(char *data, size_t size, size_t count, void *userp) { return size * count; } + +std::string formattedDataSize(size_t size) { + if (size < 1024) { + return std::to_string(size) + " B"; + } else if (size < 1024 * 1024) { + return util::string_format("%.2f KB", (float)size / 1024); + } else { + return util::string_format("%.2f MB", (float)size / (1024 * 1024)); + } +} -int64_t getRemoteFileSize(const std::string &url) { +} // namespace + +size_t getRemoteFileSize(const std::string &url) { CURL *curl = curl_easy_init(); if (!curl) return -1; @@ -52,40 +70,26 @@ int64_t getRemoteFileSize(const std::string &url) { std::cout << "Download failed: error code: " << res << std::endl; } curl_easy_cleanup(curl); - return res == CURLE_OK ? (int64_t)content_length : -1; + return content_length > 0 ? content_length : 0; } -std::string formattedDataSize(size_t size) { - if (size < 1024) { - return std::to_string(size) + " B"; - } else if (size < 1024 * 1024) { - return util::string_format("%.2f KB", (float)size / 1024); - } else { - return util::string_format("%.2f MB", (float)size / (1024 * 1024)); - } +std::string getUrlWithoutQuery(const std::string &url) { + size_t idx = url.find("?"); + return (idx == std::string::npos ? url : url.substr(0, idx)); } -static std::atomic enable_http_logging = false; - void enableHttpLogging(bool enable) { enable_http_logging = enable; } -bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic *abort) { +bool httpMultiPartDownload(const std::string &url, std::ostream &os, int parts, size_t content_length, std::atomic *abort) { static CURLGlobalInitializer curl_initializer; static std::mutex lock; static uint64_t total_written = 0, prev_total_written = 0; static double last_print_ts = 0; - int64_t content_length = getRemoteFileSize(url); - if (content_length <= 0) return false; - - // create a tmp sparse file - const std::string tmp_file = target_file + ".tmp"; - FILE *fp = fopen(tmp_file.c_str(), "wb"); - assert(fp); - fseek(fp, content_length - 1, SEEK_SET); - fwrite("\0", 1, 1, fp); + os.seekp(content_length - 1); + os.write("\0", 1); CURLM *cm = curl_multi_init(); @@ -94,8 +98,8 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil for (int i = 0; i < parts; ++i) { CURL *eh = curl_easy_init(); writers[eh] = { - .fp = fp, - .offset = i * part_size, + .os = &os, + .offset = (size_t)(i * part_size), .end = i == parts - 1 ? content_length - 1 : (i + 1) * part_size - 1, }; curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_cb); @@ -126,9 +130,7 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil if (enable_http_logging && last_print_ts > 0) { size_t average = (total_written - prev_total_written) / ((ts - last_print_ts) / 1000.); int progress = std::min(100, 100.0 * (double)written / (double)content_length); - - size_t idx = url.find("?"); - std::cout << "downloading " << (idx == std::string::npos ? url : url.substr(0, idx)) << " - " << progress << "% (" << formattedDataSize(average) << "/s)" << std::endl; + std::cout << "downloading " << getUrlWithoutQuery(url) << " - " << progress << "% (" << formattedDataSize(average) << "/s)" << std::endl; } prev_total_written = total_written; last_print_ts = ts; @@ -160,32 +162,34 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil } curl_multi_cleanup(cm); - fclose(fp); - - bool ret = complete == parts; - ret = ret && ::rename(tmp_file.c_str(), target_file.c_str()) == 0; - return ret; + return complete == parts; } -bool readBZ2File(const std::string_view file, std::ostream &stream) { - std::unique_ptr f(fopen(file.data(), "r"), &fclose); - if (!f) return false; +std::string decompressBZ2(const std::string &in) { + if (in.empty()) return {}; - int bzerror = BZ_OK; - BZFILE *bz_file = BZ2_bzReadOpen(&bzerror, f.get(), 0, 0, nullptr, 0); - if (!bz_file) return false; + bz_stream strm = {}; + int bzerror = BZ2_bzDecompressInit(&strm, 0, 0); + assert(bzerror == BZ_OK); - std::array buf; + strm.next_in = (char *)in.data(); + strm.avail_in = in.size(); + std::string out(in.size() * 5, '\0'); do { - int size = BZ2_bzRead(&bzerror, bz_file, buf.data(), buf.size()); - if (bzerror == BZ_OK || bzerror == BZ_STREAM_END) { - stream.write(buf.data(), size); + strm.next_out = (char *)(&out[strm.total_out_lo32]); + strm.avail_out = out.size() - strm.total_out_lo32; + bzerror = BZ2_bzDecompress(&strm); + if (bzerror == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) { + out.resize(out.size() * 2); } } while (bzerror == BZ_OK); - bool success = (bzerror == BZ_STREAM_END); - BZ2_bzReadClose(&bzerror, bz_file); - return success; + BZ2_bzDecompressEnd(&strm); + if (bzerror == BZ_STREAM_END) { + out.resize(strm.total_out_lo32); + return out; + } + return {}; } void precise_nano_sleep(long sleep_ns) { @@ -205,3 +209,16 @@ void precise_nano_sleep(long sleep_ns) { } } } + +std::string sha256(const std::string &str) { + unsigned char hash[SHA256_DIGEST_LENGTH]; + SHA256_CTX sha256; + SHA256_Init(&sha256); + SHA256_Update(&sha256, str.c_str(), str.size()); + SHA256_Final(hash, &sha256); + std::stringstream ss; + for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) { + ss << std::hex << std::setw(2) << std::setfill('0') << (int)hash[i]; + } + return ss.str(); +} diff --git a/selfdrive/ui/replay/util.h b/selfdrive/ui/replay/util.h index db7fa8483e..30a26c4314 100644 --- a/selfdrive/ui/replay/util.h +++ b/selfdrive/ui/replay/util.h @@ -4,7 +4,10 @@ #include #include +std::string sha256(const std::string &str); void precise_nano_sleep(long sleep_ns); -bool readBZ2File(const std::string_view file, std::ostream &stream); +std::string decompressBZ2(const std::string &in); void enableHttpLogging(bool enable); -bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic *abort = nullptr); +std::string getUrlWithoutQuery(const std::string &url); +size_t getRemoteFileSize(const std::string &url); +bool httpMultiPartDownload(const std::string &url, std::ostream &os, int parts, size_t content_length, std::atomic *abort = nullptr);