replay: improve segment download and merge (#22654)

* no-cache mode

* fix test cases build error

* space

* don't create cache dir in no-cache mode

* fix errors in test cases

* no_local_cache_

* set the number of connections by chunk_size

* use size_t instead of int64_t

* add test case for no-cache mode

* rename variables

* fix SIGSEGV

* cleanup

* faster decompressBZ2

* always decompress bz2

* add test cases

* prepare for python interface

* fix test cases build error

* continue

* camera_replay: cache remote file

* protected inheritance

* single option name

* TODO

* test_case for LogReader&FrameReader

* fix wrong require

* test case for FileReader

* cleanup test

* test:fix wrong filename

* check cached file's checksum

* fix mkdir permissions err

cleanup filereader

* remove initialize libav network libraries.

dd

* abort all loading if one failed

* cleanup tests

* use threadpool to limit concurrent downloads

* cache more segments

* merge 3 segments for replay

* one segment uses about 100M of memory

* use segments_need_merge.size()

* shutdown

* fix stuck if exit replay  before keyboard thread started

* load one segment at a time

* small cleanup

* cleanup filereader

* space

* tiny cleanup

* merge master

* cleanup test cases

* use util:create_directories

* cleanup framereader
old-commit-hash: 2b4a477fbc
commatwo_master
Dean Lee 4 years ago committed by GitHub
parent 3a8fdaba2f
commit a031b938b0
  1. 8
      selfdrive/camerad/SConscript
  2. 3
      selfdrive/camerad/cameras/camera_replay.cc
  3. 2
      selfdrive/loggerd/SConscript
  4. 8
      selfdrive/loggerd/tests/test_logger.cc
  5. 2
      selfdrive/ui/SConscript
  6. 62
      selfdrive/ui/replay/filereader.cc
  7. 20
      selfdrive/ui/replay/filereader.h
  8. 61
      selfdrive/ui/replay/framereader.cc
  9. 8
      selfdrive/ui/replay/framereader.h
  10. 20
      selfdrive/ui/replay/logreader.cc
  11. 7
      selfdrive/ui/replay/logreader.h
  12. 19
      selfdrive/ui/replay/main.cc
  13. 48
      selfdrive/ui/replay/replay.cc
  14. 5
      selfdrive/ui/replay/replay.h
  15. 80
      selfdrive/ui/replay/route.cc
  16. 19
      selfdrive/ui/replay/route.h
  17. 56
      selfdrive/ui/replay/tests/test_replay.cc
  18. 131
      selfdrive/ui/replay/util.cc
  19. 7
      selfdrive/ui/replay/util.h

@ -21,8 +21,12 @@ else:
if USE_FRAME_STREAM:
cameras = ['cameras/camera_frame_stream.cc']
else:
libs += ['avutil', 'avcodec', 'avformat', 'swscale']
cameras = ['cameras/camera_replay.cc', env.Object('camera-framereader', '#/selfdrive/ui/replay/framereader.cc')]
libs += ['avutil', 'avcodec', 'avformat', 'swscale', 'bz2', 'ssl', 'curl', 'crypto']
# TODO: import replay_lib from root SConstruct
cameras = ['cameras/camera_replay.cc',
env.Object('camera-util', '#/selfdrive/ui/replay/util.cc'),
env.Object('camera-framereader', '#/selfdrive/ui/replay/framereader.cc'),
env.Object('camera-filereader', '#/selfdrive/ui/replay/filereader.cc')]
if arch == "Darwin":
del libs[libs.index('OpenCL')]

@ -23,8 +23,7 @@ std::string get_url(std::string route_name, const std::string &camera, int segme
}
void camera_init(VisionIpcServer *v, CameraState *s, int camera_id, unsigned int fps, cl_device_id device_id, cl_context ctx, VisionStreamType rgb_type, VisionStreamType yuv_type, const std::string &url) {
// TODO: cache url file
s->frame = new FrameReader();
s->frame = new FrameReader(true);
if (!s->frame->load(url)) {
printf("failed to load stream from %s", url.c_str());
assert(0);

@ -28,4 +28,4 @@ env.Program(src, LIBS=libs)
env.Program('bootlog.cc', LIBS=libs)
if GetOption('test'):
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc', env.Object('logger_util', '#/selfdrive/ui/replay/util.cc')], LIBS=[libs] + ['curl'])
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc', env.Object('logger_util', '#/selfdrive/ui/replay/util.cc')], LIBS=[libs] + ['curl', 'crypto'])

@ -21,12 +21,8 @@ void verify_segment(const std::string &route_path, int segment, int max_segment,
REQUIRE(!util::file_exists(segment_path + "/rlog.bz2.lock"));
for (const char *fn : {"/rlog.bz2", "/qlog.bz2"}) {
const std::string log_file = segment_path + fn;
INFO(log_file);
std::ostringstream stream;
bool ret = readBZ2File(log_file, stream);
REQUIRE(ret);
std::string log = stream.str();
std::string log = decompressBZ2(util::read_file(log_file));
REQUIRE(!log.empty());
int event_cnt = 0, i = 0;
kj::ArrayPtr<const capnp::word> words((capnp::word *)log.data(), log.size() / sizeof(capnp::word));
while (words.size() > 0) {

@ -112,7 +112,7 @@ if GetOption('extras'):
if arch in ['x86_64', 'Darwin'] or GetOption('extras'):
qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"]
replay_lib_src = ["replay/replay.cc", "replay/camera.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"]
replay_lib_src = ["replay/replay.cc", "replay/camera.cc", "replay/filereader.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs)
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'bz2', 'curl', 'swscale'] + qt_libs

@ -0,0 +1,62 @@
#include "selfdrive/ui/replay/filereader.h"
#include <sys/stat.h>
#include <cassert>
#include <cmath>
#include <fstream>
#include <iostream>
#include <sstream>
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/util.h"
std::string cacheFilePath(const std::string &url) {
static std::string cache_path = [] {
const std::string comma_cache = util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/");
util::create_directories(comma_cache, 0755);
return comma_cache.back() == '/' ? comma_cache : comma_cache + "/";
}();
return cache_path + sha256(getUrlWithoutQuery(url));;
}
std::string FileReader::read(const std::string &file, std::atomic<bool> *abort) {
const bool is_remote = file.find("https://") == 0;
const std::string local_file = is_remote ? cacheFilePath(file) : file;
std::string result;
if ((!is_remote || cache_to_local_) && util::file_exists(local_file)) {
result = util::read_file(local_file);
} else if (is_remote) {
result = download(file, abort);
if (cache_to_local_ && !result.empty()) {
std::ofstream fs(local_file, fs.binary | fs.out);
fs.write(result.data(), result.size());
}
}
return result;
}
std::string FileReader::download(const std::string &url, std::atomic<bool> *abort) {
std::string result;
size_t remote_file_size = 0;
for (int i = 0; i <= max_retries_ && !(abort && *abort); ++i) {
if (i > 0) {
std::cout << "download failed, retrying" << i << std::endl;
}
if (remote_file_size <= 0) {
remote_file_size = getRemoteFileSize(url);
}
if (remote_file_size > 0 && !(abort && *abort)) {
std::ostringstream oss;
result.resize(remote_file_size);
oss.rdbuf()->pubsetbuf(result.data(), result.size());
int chunks = chunk_size_ > 0 ? std::min(1, (int)std::nearbyint(remote_file_size / (float)chunk_size_)) : 1;
if (httpMultiPartDownload(url, oss, chunks, remote_file_size, abort)) {
return result;
}
}
}
return {};
}

@ -0,0 +1,20 @@
#pragma once
#include <atomic>
#include <string>
class FileReader {
public:
FileReader(bool cache_to_local, int chunk_size = -1, int max_retries = 3)
: cache_to_local_(cache_to_local), chunk_size_(chunk_size), max_retries_(max_retries) {}
virtual ~FileReader() {}
std::string read(const std::string &file, std::atomic<bool> *abort = nullptr);
private:
std::string download(const std::string &url, std::atomic<bool> *abort);
int chunk_size_;
int max_retries_;
bool cache_to_local_;
};
std::string cacheFilePath(const std::string &url);

@ -3,8 +3,11 @@
#include <unistd.h>
#include <cassert>
#include <mutex>
#include <sstream>
static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) {
namespace {
int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) {
std::mutex *mutex = (std::mutex *)*arg;
switch (op) {
case AV_LOCK_CREATE:
@ -22,38 +25,56 @@ static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) {
return 0;
}
struct AVInitializer {
AVInitializer() {
int ret = av_lockmgr_register(ffmpeg_lockmgr_cb);
assert(ret >= 0);
int readFunction(void *opaque, uint8_t *buf, int buf_size) {
auto &iss = *reinterpret_cast<std::istringstream *>(opaque);
iss.read(reinterpret_cast<char *>(buf), buf_size);
return iss.gcount() ? iss.gcount() : AVERROR_EOF;
}
} // namespace
FrameReader::FrameReader(bool local_cache, int chunk_size, int retries) : FileReader(local_cache, chunk_size, retries) {
static std::once_flag once_flag;
std::call_once(once_flag, [] {
av_lockmgr_register(ffmpeg_lockmgr_cb);
av_register_all();
avformat_network_init();
}
~AVInitializer() { avformat_network_deinit(); }
};
});
pFormatCtx_ = avformat_alloc_context();
av_frame_ = av_frame_alloc();
rgb_frame_ = av_frame_alloc();
yuv_frame_ = av_frame_alloc();;
FrameReader::FrameReader() {
static AVInitializer av_initializer;
}
FrameReader::~FrameReader() {
for (auto &f : frames_) {
av_free_packet(&f.pkt);
}
if (pCodecCtx_) {
avcodec_close(pCodecCtx_);
avcodec_free_context(&pCodecCtx_);
}
if (pCodecCtx_) avcodec_free_context(&pCodecCtx_);
if (pFormatCtx_) avformat_close_input(&pFormatCtx_);
if (av_frame_) av_frame_free(&av_frame_);
if (rgb_frame_) av_frame_free(&rgb_frame_);
if (yuv_frame_) av_frame_free(&yuv_frame_);
if (rgb_sws_ctx_) sws_freeContext(rgb_sws_ctx_);
if (yuv_sws_ctx_) sws_freeContext(yuv_sws_ctx_);
if (avio_ctx_) {
av_freep(&avio_ctx_->buffer);
av_freep(&avio_ctx_);
}
}
bool FrameReader::load(const std::string &url) {
pFormatCtx_ = avformat_alloc_context();
bool FrameReader::load(const std::string &url, std::atomic<bool> *abort) {
std::string content = read(url, abort);
if (content.empty()) return false;
std::istringstream iss(content);
const int avio_ctx_buffer_size = 64 * 1024;
unsigned char *avio_ctx_buffer = (unsigned char *)av_malloc(avio_ctx_buffer_size);
avio_ctx_ = avio_alloc_context(avio_ctx_buffer, avio_ctx_buffer_size, 0, &iss, readFunction, nullptr, nullptr);
pFormatCtx_->pb = avio_ctx_;
pFormatCtx_->probesize = 10 * 1024 * 1024; // 10MB
if (avformat_open_input(&pFormatCtx_, url.c_str(), NULL, NULL) != 0) {
printf("error loading %s\n", url.c_str());
@ -75,10 +96,6 @@ bool FrameReader::load(const std::string &url) {
ret = avcodec_open2(pCodecCtx_, pCodec, NULL);
if (ret < 0) return false;
av_frame_ = av_frame_alloc();
rgb_frame_ = av_frame_alloc();
yuv_frame_ = av_frame_alloc();;
width = (pCodecCtxOrig->width + 3) & ~3;
height = pCodecCtxOrig->height;
rgb_sws_ctx_ = sws_getContext(pCodecCtxOrig->width, pCodecCtxOrig->height, AV_PIX_FMT_YUV420P,
@ -92,7 +109,7 @@ bool FrameReader::load(const std::string &url) {
if (!yuv_sws_ctx_) return false;
frames_.reserve(60 * 20); // 20fps, one minute
while (true) {
while (!(abort && *abort)) {
Frame &frame = frames_.emplace_back();
int err = av_read_frame(pFormatCtx_, &frame.pkt);
if (err < 0) {

@ -2,6 +2,7 @@
#include <string>
#include <vector>
#include "selfdrive/ui/replay/filereader.h"
extern "C" {
#include <libavcodec/avcodec.h>
@ -10,11 +11,11 @@ extern "C" {
#include <libavutil/imgutils.h>
}
class FrameReader {
class FrameReader : protected FileReader {
public:
FrameReader();
FrameReader(bool local_cache = false, int chunk_size = -1, int retries = 0);
~FrameReader();
bool load(const std::string &url);
bool load(const std::string &url, std::atomic<bool> *abort = nullptr);
bool get(int idx, uint8_t *rgb, uint8_t *yuv);
int getRGBSize() const { return width * height * 3; }
int getYUVSize() const { return width * height * 3 / 2; }
@ -39,4 +40,5 @@ private:
AVCodecContext *pCodecCtx_ = nullptr;
int key_frames_count_ = 0;
bool valid_ = false;
AVIOContext *avio_ctx_ = nullptr;
};

@ -1,7 +1,6 @@
#include "selfdrive/ui/replay/logreader.h"
#include <sstream>
#include "selfdrive/common/util.h"
#include <algorithm>
#include "selfdrive/ui/replay/util.h"
Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(amsg), frame(frame) {
@ -27,7 +26,7 @@ Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(a
// class LogReader
LogReader::LogReader(size_t memory_pool_block_size) {
LogReader::LogReader(bool local_cache, int chunk_size, int retries, size_t memory_pool_block_size) : FileReader(local_cache, chunk_size, retries) {
#ifdef HAS_MEMORY_RESOURCE
const size_t buf_size = sizeof(Event) * memory_pool_block_size;
pool_buffer_ = ::operator new(buf_size);
@ -47,18 +46,9 @@ LogReader::~LogReader() {
#endif
}
bool LogReader::load(const std::string &file) {
bool is_bz2 = file.rfind(".bz2") == file.length() - 4;
if (is_bz2) {
std::ostringstream stream;
if (!readBZ2File(file, stream)) {
LOGW("bz2 decompress failed");
return false;
}
raw_ = stream.str();
} else {
raw_ = util::read_file(file);
}
bool LogReader::load(const std::string &file, std::atomic<bool> *abort) {
raw_ = decompressBZ2(read(file, abort));
if (raw_.empty()) return false;
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));
while (words.size() > 0) {

@ -7,6 +7,7 @@
#include "cereal/gen/cpp/log.capnp.h"
#include "selfdrive/camerad/cameras/camera_common.h"
#include "selfdrive/ui/replay/filereader.h"
const CameraType ALL_CAMERAS[] = {RoadCam, DriverCam, WideRoadCam};
const int MAX_CAMERAS = std::size(ALL_CAMERAS);
@ -45,11 +46,11 @@ public:
bool frame;
};
class LogReader {
class LogReader : protected FileReader {
public:
LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE);
LogReader(bool local_cache = false, int chunk_size = -1, int retries = 0, size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE);
~LogReader();
bool load(const std::string &file);
bool load(const std::string &file, std::atomic<bool> *abort = nullptr);
std::vector<Event*> events;

@ -1,20 +1,26 @@
#include "selfdrive/ui/replay/replay.h"
#include <csignal>
#include <iostream>
#include <termios.h>
#include <QApplication>
#include <QCommandLineParser>
#include <QDebug>
#include <QThread>
#include <csignal>
#include <iostream>
#include "selfdrive/ui/replay/replay.h"
const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36";
struct termios oldt = {};
Replay *replay = nullptr;
void sigHandler(int s) {
std::signal(s, SIG_DFL);
if (oldt.c_lflag) {
tcsetattr(STDIN_FILENO, TCSANOW, &oldt);
}
if (replay) {
replay->stop();
}
qApp->quit();
}
@ -69,7 +75,7 @@ void keyboardThread(Replay *replay) {
}
}
int main(int argc, char *argv[]){
int main(int argc, char *argv[]) {
QApplication app(argc, argv);
std::signal(SIGINT, sigHandler);
std::signal(SIGTERM, sigHandler);
@ -78,6 +84,7 @@ int main(int argc, char *argv[]){
{"dcam", REPLAY_FLAG_DCAM, "load driver camera"},
{"ecam", REPLAY_FLAG_ECAM, "load wide road camera"},
{"no-loop", REPLAY_FLAG_NO_LOOP, "stop at the end of the route"},
{"no-cache", REPLAY_FLAG_NO_FILE_CACHE, "turn off local cache"},
};
QCommandLineParser parser;
@ -109,7 +116,7 @@ int main(int argc, char *argv[]){
replay_flags |= flag;
}
}
Replay *replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app);
replay = new Replay(route, allow, block, nullptr, replay_flags, parser.value("data_dir"), &app);
if (!replay->load()) {
return 0;
}

@ -5,8 +5,8 @@
#include <capnp/dynamic.h>
#include "cereal/services.h"
#include "selfdrive/common/timing.h"
#include "selfdrive/common/params.h"
#include "selfdrive/common/timing.h"
#include "selfdrive/hardware/hw.h"
#include "selfdrive/ui/replay/util.h"
@ -35,16 +35,21 @@ Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *s
}
Replay::~Replay() {
stop();
delete pm;
delete events_;
}
void Replay::stop() {
if (stream_thread_ == nullptr) return;
qDebug() << "shutdown: in progress...";
exit_ = updating_events_ = true;
if (stream_thread_) {
stream_cv_.notify_one();
stream_thread_->quit();
stream_thread_->wait();
}
stream_thread_ = nullptr;
delete pm;
delete events_;
segments_.clear();
camera_server_.reset(nullptr);
qDebug() << "shutdown: done";
@ -91,6 +96,7 @@ void Replay::doSeek(int seconds, bool relative) {
if (relative) {
seconds += currentSeconds();
}
seconds = std::max(0, seconds);
int seg = seconds / 60;
if (segments_.find(seg) == segments_.end()) {
qInfo() << "can't seek to" << seconds << "s, segment" << seg << "is invalid";
@ -134,26 +140,30 @@ void Replay::segmentLoadFinished(bool success) {
void Replay::queueSegment() {
if (segments_.empty()) return;
SegmentMap::iterator begin, cur, end;
begin = cur = end = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first));
// set fwd to 0 to just load the current segment when seeking to a new window.
const int fwd = cur->second == nullptr ? 0 : FORWARD_SEGS;
for (int i = 0; end != segments_.end() && i <= fwd; ++end, ++i) {
auto &[n, seg] = *end;
if (!seg) {
seg = std::make_unique<Segment>(n, route_->at(n), hasFlag(REPLAY_FLAG_DCAM), hasFlag(REPLAY_FLAG_ECAM));
SegmentMap::iterator cur, end;
cur = end = segments_.lower_bound(std::min(current_segment_.load(), segments_.rbegin()->first));
for (int i = 0; end != segments_.end() && i <= FORWARD_SEGS; ++i) {
++end;
}
// load one segment at a time
for (auto it = cur; it != end; ++it) {
if (!it->second) {
if (it == cur || std::prev(it)->second->isLoaded()) {
auto &[n, seg] = *it;
seg = std::make_unique<Segment>(n, route_->at(n), hasFlag(REPLAY_FLAG_DCAM), hasFlag(REPLAY_FLAG_ECAM), !hasFlag(REPLAY_FLAG_NO_FILE_CACHE));
QObject::connect(seg.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished);
qInfo() << "loading segment" << n << "...";
}
break;
}
}
const auto &cur_segment = cur->second;
enableHttpLogging(!cur_segment->isLoaded());
// merge the previous adjacent segment if it's loaded
auto prev = segments_.find(cur_segment->seg_num - 1);
if (prev != segments_.end() && prev->second && prev->second->isLoaded()) {
begin = prev;
auto begin = segments_.find(cur_segment->seg_num - 1);
if (begin == segments_.end() || !(begin->second && begin->second->isLoaded())) {
begin = cur;
}
mergeSegments(begin, end);
@ -168,9 +178,9 @@ void Replay::queueSegment() {
}
void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap::iterator &end) {
// segments must be merged in sequence.
// merge 3 segments in sequence.
std::vector<int> segments_need_merge;
for (auto it = begin; it != end && it->second->isLoaded(); ++it) {
for (auto it = begin; it != end && it->second->isLoaded() && segments_need_merge.size() < 3; ++it) {
segments_need_merge.push_back(it->first);
}

@ -5,13 +5,15 @@
#include "selfdrive/ui/replay/camera.h"
#include "selfdrive/ui/replay/route.h"
constexpr int FORWARD_SEGS = 2;
// one segment uses about 100M of memory
constexpr int FORWARD_SEGS = 5;
enum REPLAY_FLAGS {
REPLAY_FLAG_NONE = 0x0000,
REPLAY_FLAG_DCAM = 0x0002,
REPLAY_FLAG_ECAM = 0x0004,
REPLAY_FLAG_NO_LOOP = 0x0010,
REPLAY_FLAG_NO_FILE_CACHE = 0x0020,
};
class Replay : public QObject {
@ -23,6 +25,7 @@ public:
~Replay();
bool load();
void start(int seconds = 0);
void stop();
void pause(bool pause);
bool isPaused() const { return paused_; }
inline bool hasFlag(REPLAY_FLAGS flag) { return flags_ & flag; };

@ -1,15 +1,19 @@
#include "selfdrive/ui/replay/route.h"
#include <QDir>
#include <QEventLoop>
#include <QJsonArray>
#include <QJsonDocument>
#include <QRegExp>
#include <QtConcurrent>
#include "selfdrive/hardware/hw.h"
#include "selfdrive/ui/qt/api.h"
#include "selfdrive/ui/replay/util.h"
Route::Route(const QString &route, const QString &data_dir) : route_(parseRoute(route)), data_dir_(data_dir) {}
Route::Route(const QString &route, const QString &data_dir) : data_dir_(data_dir) {
route_ = parseRoute(route);
}
RouteIdentifier Route::parseRoute(const QString &str) {
QRegExp rx(R"(^([a-z0-9]{16})([|_/])(\d{4}-\d{2}-\d{2}--\d{2}-\d{2}-\d{2})(?:(--|/)(\d*))?$)");
@ -86,10 +90,7 @@ void Route::addFileToSegment(int n, const QString &file) {
// class Segment
Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam) : seg_num(n) {
static std::once_flag once_flag;
std::call_once(once_flag, [=]() { if (!CACHE_DIR.exists()) QDir().mkdir(CACHE_DIR.absolutePath()); });
Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam, bool local_cache) : seg_num(n) {
// [RoadCam, DriverCam, WideRoadCam, log]. fallback to qcamera/qlog
const QString file_list[] = {
files.road_cam.isEmpty() ? files.qcamera : files.road_cam,
@ -100,71 +101,34 @@ Segment::Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam
for (int i = 0; i < std::size(file_list); i++) {
if (!file_list[i].isEmpty()) {
loading_++;
QThread *t = new QThread();
QObject::connect(t, &QThread::started, [=]() { loadFile(i, file_list[i].toStdString()); });
loading_threads_.emplace_back(t)->start();
synchronizer_.addFuture(QtConcurrent::run([=] { loadFile(i, file_list[i].toStdString(), local_cache); }));
}
}
}
Segment::~Segment() {
aborting_ = true;
for (QThread *t : loading_threads_) {
if (t->isRunning()) {
t->quit();
t->wait();
}
delete t;
}
disconnect();
abort_ = true;
synchronizer_.setCancelOnWait(true);
synchronizer_.waitForFinished();
}
void Segment::loadFile(int id, const std::string file) {
const bool is_remote = file.find("https://") == 0;
const std::string local_file = is_remote ? cacheFilePath(file) : file;
bool file_ready = util::file_exists(local_file);
if (!file_ready && is_remote) {
file_ready = downloadFile(id, file, local_file);
}
if (!aborting_ && file_ready) {
void Segment::loadFile(int id, const std::string file, bool local_cache) {
bool success = false;
if (id < MAX_CAMERAS) {
frames[id] = std::make_unique<FrameReader>();
success_ = success_ && frames[id]->load(local_file);
frames[id] = std::make_unique<FrameReader>(local_cache, 20 * 1024 * 1024, 3);
success = frames[id]->load(file, &abort_);
} else {
std::string decompressed = cacheFilePath(local_file + ".decompressed");
if (!util::file_exists(decompressed)) {
std::ofstream ostrm(decompressed, std::ios::binary);
readBZ2File(local_file, ostrm);
}
log = std::make_unique<LogReader>();
success_ = success_ && log->load(decompressed);
}
log = std::make_unique<LogReader>(local_cache, -1, 3);
success = log->load(file, &abort_);
}
if (!aborting_ && --loading_ == 0) {
emit loadFinished(success_);
if (!success) {
// abort all loading jobs.
abort_ = true;
}
}
bool Segment::downloadFile(int id, const std::string &url, const std::string local_file) {
bool ret = false;
int retries = 0;
while (!aborting_) {
ret = httpMultiPartDownload(url, local_file, id < MAX_CAMERAS ? 3 : 1, &aborting_);
if (ret || aborting_) break;
if (++retries > max_retries_) {
qInfo() << "download failed after retries" << max_retries_;
break;
if (--loading_ == 0) {
emit loadFinished(!abort_);
}
qInfo() << "download failed, retrying" << retries;
}
return ret;
}
std::string Segment::cacheFilePath(const std::string &file) {
QString url_no_query = QUrl(file.c_str()).toString(QUrl::RemoveQuery);
QString sha256 = QCryptographicHash::hash(url_no_query.toUtf8(), QCryptographicHash::Sha256).toHex();
return CACHE_DIR.filePath(sha256 + "." + QFileInfo(url_no_query).suffix()).toStdString();
}

@ -1,14 +1,10 @@
#pragma once
#include <QDir>
#include <QThread>
#include <QFutureSynchronizer>
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/framereader.h"
#include "selfdrive/ui/replay/logreader.h"
const QDir CACHE_DIR(util::getenv("COMMA_CACHE", "/tmp/comma_download_cache/").c_str());
struct RouteIdentifier {
QString dongle_id;
QString timestamp;
@ -49,9 +45,9 @@ class Segment : public QObject {
Q_OBJECT
public:
Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam);
Segment(int n, const SegmentFile &files, bool load_dcam, bool load_ecam, bool local_cache);
~Segment();
inline bool isLoaded() const { return !loading_ && success_; }
inline bool isLoaded() const { return !loading_ && !abort_; }
const int seg_num = 0;
std::unique_ptr<LogReader> log;
@ -61,12 +57,9 @@ signals:
void loadFinished(bool success);
protected:
void loadFile(int id, const std::string file);
bool downloadFile(int id, const std::string &url, const std::string local_file);
std::string cacheFilePath(const std::string &file);
void loadFile(int id, const std::string file, bool local_cache);
std::atomic<bool> success_ = true, aborting_ = false;
std::atomic<bool> abort_ = false;
std::atomic<int> loading_ = 0;
std::vector<QThread*> loading_threads_;
const int max_retries_ = 3;
QFutureSynchronizer<void> synchronizer_;
};

@ -1,31 +1,37 @@
#include <QCryptographicHash>
#include <QDebug>
#include <QEventLoop>
#include <fstream>
#include <sstream>
#include "catch2/catch.hpp"
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/replay.h"
#include "selfdrive/ui/replay/util.h"
const QString DEMO_ROUTE = "4cf7a6ad03080c90|2021-09-29--13-46-36";
std::string sha_256(const QString &dat) {
return QString(QCryptographicHash::hash(dat.toUtf8(), QCryptographicHash::Sha256).toHex()).toStdString();
}
const std::string TEST_RLOG_URL = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2";
const std::string TEST_RLOG_CHECKSUM = "5b966d4bb21a100a8c4e59195faeb741b975ccbe268211765efd1763d892bfb3";
TEST_CASE("httpMultiPartDownload") {
char filename[] = "/tmp/XXXXXX";
close(mkstemp(filename));
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/rlog.bz2";
SECTION("5 connections") {
REQUIRE(httpMultiPartDownload(stream_url, filename, 5));
std::string content;
auto file_size = getRemoteFileSize(TEST_RLOG_URL);
REQUIRE(file_size > 0);
SECTION("5 connections, download to file") {
std::ofstream of(filename, of.binary | of.out);
REQUIRE(httpMultiPartDownload(TEST_RLOG_URL, of, 5, file_size));
content = util::read_file(filename);
}
SECTION("1 connection") {
REQUIRE(httpMultiPartDownload(stream_url, filename, 1));
SECTION("5 connection, download to buffer") {
std::ostringstream oss;
content.resize(file_size);
oss.rdbuf()->pubsetbuf(content.data(), content.size());
REQUIRE(httpMultiPartDownload(TEST_RLOG_URL, oss, 5, file_size));
}
std::string content = util::read_file(filename);
REQUIRE(content.size() == 9112651);
std::string checksum = sha_256(QString::fromStdString(content));
REQUIRE(checksum == "e44edfbb545abdddfd17020ced2b18b6ec36506152267f32b6a8e3341f8126d6");
REQUIRE(sha256(content) == TEST_RLOG_CHECKSUM);
}
int random_int(int min, int max) {
@ -35,13 +41,28 @@ int random_int(int min, int max) {
return dist(rng);
}
TEST_CASE("FileReader") {
auto enable_local_cache = GENERATE(true, false);
std::string cache_file = cacheFilePath(TEST_RLOG_URL);
system(("rm " + cache_file + " -f").c_str());
FileReader reader(enable_local_cache);
std::string content = reader.read(TEST_RLOG_URL);
REQUIRE(sha256(content) == TEST_RLOG_CHECKSUM);
if (enable_local_cache) {
REQUIRE(sha256(util::read_file(cache_file)) == TEST_RLOG_CHECKSUM);
} else {
REQUIRE(util::file_exists(cache_file) == false);
}
}
TEST_CASE("Segment") {
Route demo_route(DEMO_ROUTE);
REQUIRE(demo_route.load());
REQUIRE(demo_route.segments().size() == 11);
QEventLoop loop;
Segment segment(0, demo_route.at(0), false, false);
Segment segment(0, demo_route.at(0), false, false, false);
QObject::connect(&segment, &Segment::loadFinished, [&]() {
REQUIRE(segment.isLoaded() == true);
REQUIRE(segment.log != nullptr);
@ -68,8 +89,8 @@ TEST_CASE("Segment") {
// helper class for unit tests
class TestReplay : public Replay {
public:
TestReplay(const QString &route) : Replay(route, {}, {}) {}
public:
TestReplay(const QString &route, uint8_t flags = REPLAY_FLAG_NO_FILE_CACHE) : Replay(route, {}, {}, nullptr, flags) {}
void test_seek();
void testSeekTo(int seek_to);
};
@ -113,7 +134,7 @@ void TestReplay::test_seek() {
QEventLoop loop;
std::thread thread = std::thread([&]() {
for (int i = 0; i < 100; ++i) {
testSeekTo(random_int(0, 5 * 60));
testSeekTo(random_int(0, 3 * 60));
}
loop.quit();
});
@ -122,7 +143,8 @@ void TestReplay::test_seek() {
}
TEST_CASE("Replay") {
TestReplay replay(DEMO_ROUTE);
auto flag = GENERATE(REPLAY_FLAG_NO_FILE_CACHE, REPLAY_FLAG_NONE);
TestReplay replay(DEMO_ROUTE, flag);
REQUIRE(replay.load());
replay.test_seek();
}

@ -1,42 +1,60 @@
#include "selfdrive/ui/replay/util.h"
#include <array>
#include <bzlib.h>
#include <curl/curl.h>
#include <openssl/sha.h>
#include <cassert>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <numeric>
#include <bzlib.h>
#include <curl/curl.h>
#include <sstream>
#include "selfdrive/common/timing.h"
#include "selfdrive/common/util.h"
namespace {
static std::atomic<bool> enable_http_logging = false;
struct CURLGlobalInitializer {
CURLGlobalInitializer() { curl_global_init(CURL_GLOBAL_DEFAULT); }
~CURLGlobalInitializer() { curl_global_cleanup(); }
};
struct MultiPartWriter {
int64_t offset;
int64_t end;
int64_t written;
FILE *fp;
size_t offset;
size_t end;
size_t written;
std::ostream *os;
};
static size_t write_cb(char *data, size_t size, size_t count, void *userp) {
size_t write_cb(char *data, size_t size, size_t count, void *userp) {
MultiPartWriter *w = (MultiPartWriter *)userp;
fseek(w->fp, w->offset, SEEK_SET);
fwrite(data, size, count, w->fp);
w->os->seekp(w->offset);
size_t bytes = size * count;
w->os->write(data, bytes);
w->offset += bytes;
w->written += bytes;
return bytes;
}
static size_t dumy_write_cb(char *data, size_t size, size_t count, void *userp) { return size * count; }
size_t dumy_write_cb(char *data, size_t size, size_t count, void *userp) { return size * count; }
std::string formattedDataSize(size_t size) {
if (size < 1024) {
return std::to_string(size) + " B";
} else if (size < 1024 * 1024) {
return util::string_format("%.2f KB", (float)size / 1024);
} else {
return util::string_format("%.2f MB", (float)size / (1024 * 1024));
}
}
} // namespace
int64_t getRemoteFileSize(const std::string &url) {
size_t getRemoteFileSize(const std::string &url) {
CURL *curl = curl_easy_init();
if (!curl) return -1;
@ -52,40 +70,26 @@ int64_t getRemoteFileSize(const std::string &url) {
std::cout << "Download failed: error code: " << res << std::endl;
}
curl_easy_cleanup(curl);
return res == CURLE_OK ? (int64_t)content_length : -1;
return content_length > 0 ? content_length : 0;
}
std::string formattedDataSize(size_t size) {
if (size < 1024) {
return std::to_string(size) + " B";
} else if (size < 1024 * 1024) {
return util::string_format("%.2f KB", (float)size / 1024);
} else {
return util::string_format("%.2f MB", (float)size / (1024 * 1024));
}
std::string getUrlWithoutQuery(const std::string &url) {
size_t idx = url.find("?");
return (idx == std::string::npos ? url : url.substr(0, idx));
}
static std::atomic<bool> enable_http_logging = false;
void enableHttpLogging(bool enable) {
enable_http_logging = enable;
}
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort) {
bool httpMultiPartDownload(const std::string &url, std::ostream &os, int parts, size_t content_length, std::atomic<bool> *abort) {
static CURLGlobalInitializer curl_initializer;
static std::mutex lock;
static uint64_t total_written = 0, prev_total_written = 0;
static double last_print_ts = 0;
int64_t content_length = getRemoteFileSize(url);
if (content_length <= 0) return false;
// create a tmp sparse file
const std::string tmp_file = target_file + ".tmp";
FILE *fp = fopen(tmp_file.c_str(), "wb");
assert(fp);
fseek(fp, content_length - 1, SEEK_SET);
fwrite("\0", 1, 1, fp);
os.seekp(content_length - 1);
os.write("\0", 1);
CURLM *cm = curl_multi_init();
@ -94,8 +98,8 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil
for (int i = 0; i < parts; ++i) {
CURL *eh = curl_easy_init();
writers[eh] = {
.fp = fp,
.offset = i * part_size,
.os = &os,
.offset = (size_t)(i * part_size),
.end = i == parts - 1 ? content_length - 1 : (i + 1) * part_size - 1,
};
curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_cb);
@ -126,9 +130,7 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil
if (enable_http_logging && last_print_ts > 0) {
size_t average = (total_written - prev_total_written) / ((ts - last_print_ts) / 1000.);
int progress = std::min<int>(100, 100.0 * (double)written / (double)content_length);
size_t idx = url.find("?");
std::cout << "downloading " << (idx == std::string::npos ? url : url.substr(0, idx)) << " - " << progress << "% (" << formattedDataSize(average) << "/s)" << std::endl;
std::cout << "downloading " << getUrlWithoutQuery(url) << " - " << progress << "% (" << formattedDataSize(average) << "/s)" << std::endl;
}
prev_total_written = total_written;
last_print_ts = ts;
@ -160,32 +162,34 @@ bool httpMultiPartDownload(const std::string &url, const std::string &target_fil
}
curl_multi_cleanup(cm);
fclose(fp);
bool ret = complete == parts;
ret = ret && ::rename(tmp_file.c_str(), target_file.c_str()) == 0;
return ret;
return complete == parts;
}
bool readBZ2File(const std::string_view file, std::ostream &stream) {
std::unique_ptr<FILE, decltype(&fclose)> f(fopen(file.data(), "r"), &fclose);
if (!f) return false;
std::string decompressBZ2(const std::string &in) {
if (in.empty()) return {};
int bzerror = BZ_OK;
BZFILE *bz_file = BZ2_bzReadOpen(&bzerror, f.get(), 0, 0, nullptr, 0);
if (!bz_file) return false;
bz_stream strm = {};
int bzerror = BZ2_bzDecompressInit(&strm, 0, 0);
assert(bzerror == BZ_OK);
std::array<char, 64 * 1024> buf;
strm.next_in = (char *)in.data();
strm.avail_in = in.size();
std::string out(in.size() * 5, '\0');
do {
int size = BZ2_bzRead(&bzerror, bz_file, buf.data(), buf.size());
if (bzerror == BZ_OK || bzerror == BZ_STREAM_END) {
stream.write(buf.data(), size);
strm.next_out = (char *)(&out[strm.total_out_lo32]);
strm.avail_out = out.size() - strm.total_out_lo32;
bzerror = BZ2_bzDecompress(&strm);
if (bzerror == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) {
out.resize(out.size() * 2);
}
} while (bzerror == BZ_OK);
bool success = (bzerror == BZ_STREAM_END);
BZ2_bzReadClose(&bzerror, bz_file);
return success;
BZ2_bzDecompressEnd(&strm);
if (bzerror == BZ_STREAM_END) {
out.resize(strm.total_out_lo32);
return out;
}
return {};
}
void precise_nano_sleep(long sleep_ns) {
@ -205,3 +209,16 @@ void precise_nano_sleep(long sleep_ns) {
}
}
}
std::string sha256(const std::string &str) {
unsigned char hash[SHA256_DIGEST_LENGTH];
SHA256_CTX sha256;
SHA256_Init(&sha256);
SHA256_Update(&sha256, str.c_str(), str.size());
SHA256_Final(hash, &sha256);
std::stringstream ss;
for (int i = 0; i < SHA256_DIGEST_LENGTH; i++) {
ss << std::hex << std::setw(2) << std::setfill('0') << (int)hash[i];
}
return ss.str();
}

@ -4,7 +4,10 @@
#include <ostream>
#include <string>
std::string sha256(const std::string &str);
void precise_nano_sleep(long sleep_ns);
bool readBZ2File(const std::string_view file, std::ostream &stream);
std::string decompressBZ2(const std::string &in);
void enableHttpLogging(bool enable);
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort = nullptr);
std::string getUrlWithoutQuery(const std::string &url);
size_t getRemoteFileSize(const std::string &url);
bool httpMultiPartDownload(const std::string &url, std::ostream &os, int parts, size_t content_length, std::atomic<bool> *abort = nullptr);

Loading…
Cancel
Save