diff --git a/selfdrive/loggerd/SConscript b/selfdrive/loggerd/SConscript index ca8dc8285b..dfa3dca714 100644 --- a/selfdrive/loggerd/SConscript +++ b/selfdrive/loggerd/SConscript @@ -28,4 +28,4 @@ env.Program(src, LIBS=libs) env.Program('bootlog.cc', LIBS=libs) if GetOption('test'): - env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc'], LIBS=[libs]) + env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc', env.Object('logger_util', '#/selfdrive/ui/replay/util.cc')], LIBS=[libs] + ['curl']) diff --git a/selfdrive/loggerd/tests/test_logger.cc b/selfdrive/loggerd/tests/test_logger.cc index c621d00c42..e59c2f0153 100644 --- a/selfdrive/loggerd/tests/test_logger.cc +++ b/selfdrive/loggerd/tests/test_logger.cc @@ -1,38 +1,18 @@ #include +#include #include +#include #include -#include - #include "catch2/catch.hpp" #include "cereal/messaging/messaging.h" #include "selfdrive/common/util.h" #include "selfdrive/loggerd/logger.h" +#include "selfdrive/ui/replay/util.h" typedef cereal::Sentinel::SentinelType SentinelType; -bool decompressBZ2(std::vector &dest, const char srcData[], size_t srcSize, size_t outputSizeIncrement = 0x100000U) { - bz_stream strm = {}; - int ret = BZ2_bzDecompressInit(&strm, 0, 0); - assert(ret == BZ_OK); - dest.resize(1024 * 1024); - strm.next_in = const_cast(srcData); - strm.avail_in = srcSize; - do { - strm.next_out = (char *)&dest[strm.total_out_lo32]; - strm.avail_out = dest.size() - strm.total_out_lo32; - ret = BZ2_bzDecompress(&strm); - if (ret == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) { - dest.resize(dest.size() + outputSizeIncrement); - } - } while (ret == BZ_OK && strm.avail_in > 0); - - BZ2_bzDecompressEnd(&strm); - dest.resize(strm.total_out_lo32); - return ret == BZ_STREAM_END; -} - void verify_segment(const std::string &route_path, int segment, int max_segment, int required_event_cnt) { const std::string segment_path = route_path + "--" + std::to_string(segment); SentinelType begin_sentinel = segment == 0 ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT; @@ -42,13 +22,11 @@ void verify_segment(const std::string &route_path, int segment, int max_segment, for (const char *fn : {"/rlog.bz2", "/qlog.bz2"}) { const std::string log_file = segment_path + fn; INFO(log_file); - std::string log_bz2 = util::read_file(log_file); - REQUIRE(log_bz2.size() > 0); - - std::vector log; - bool ret = decompressBZ2(log, log_bz2.data(), log_bz2.size()); + + std::ostringstream stream; + bool ret = readBZ2File(log_file, stream); REQUIRE(ret); - + std::string log = stream.str(); int event_cnt = 0, i = 0; kj::ArrayPtr words((capnp::word *)log.data(), log.size() / sizeof(capnp::word)); while (words.size() > 0) { diff --git a/selfdrive/ui/SConscript b/selfdrive/ui/SConscript index b077f6de2e..137b6e47f7 100644 --- a/selfdrive/ui/SConscript +++ b/selfdrive/ui/SConscript @@ -108,7 +108,7 @@ if GetOption('setup'): if arch in ['x86_64', 'Darwin'] and os.path.exists(Dir("#tools/").get_abspath()): qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"] - replay_lib_src = ["replay/replay.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc"] + replay_lib_src = ["replay/replay.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"] replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs) replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2', 'curl'] + qt_libs diff --git a/selfdrive/ui/replay/logreader.cc b/selfdrive/ui/replay/logreader.cc index 233c35a257..3980f5f209 100644 --- a/selfdrive/ui/replay/logreader.cc +++ b/selfdrive/ui/replay/logreader.cc @@ -1,41 +1,57 @@ #include "selfdrive/ui/replay/logreader.h" #include -#include +#include #include "selfdrive/common/util.h" +#include "selfdrive/ui/replay/util.h" -static bool decompressBZ2(std::vector &dest, const char srcData[], size_t srcSize, - size_t outputSizeIncrement = 0x100000U) { - bz_stream strm = {}; - int ret = BZ2_bzDecompressInit(&strm, 0, 0); - assert(ret == BZ_OK); - - strm.next_in = const_cast(srcData); - strm.avail_in = srcSize; - do { - strm.next_out = (char *)&dest[strm.total_out_lo32]; - strm.avail_out = dest.size() - strm.total_out_lo32; - ret = BZ2_bzDecompress(&strm); - if (ret == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) { - dest.resize(dest.size() + outputSizeIncrement); +Event::Event(const kj::ArrayPtr &amsg, bool frame) : reader(amsg), frame(frame) { + words = kj::ArrayPtr(amsg.begin(), reader.getEnd()); + event = reader.getRoot(); + which = event.which(); + mono_time = event.getLogMonoTime(); + + // 1) Send video data at t=timestampEof/timestampSof + // 2) Send encodeIndex packet at t=logMonoTime + if (frame) { + cereal::EncodeIndex::Reader idx; + if (which == cereal::Event::ROAD_ENCODE_IDX) { + idx = event.getRoadEncodeIdx(); + } else if (which == cereal::Event::DRIVER_ENCODE_IDX) { + idx = event.getDriverEncodeIdx(); + } else if (which == cereal::Event::WIDE_ROAD_ENCODE_IDX) { + idx = event.getWideRoadEncodeIdx(); + } else { + assert(false); } - } while (ret == BZ_OK); - BZ2_bzDecompressEnd(&strm); - dest.resize(strm.total_out_lo32); - return ret == BZ_STREAM_END; + // C2 only has eof set, and some older routes have neither + uint64_t sof = idx.getTimestampSof(); + uint64_t eof = idx.getTimestampEof(); + if (sof > 0) { + mono_time = sof; + } else if (eof > 0) { + mono_time = eof; + } + } } +// class LogReader + LogReader::~LogReader() { for (auto e : events) delete e; } -bool LogReader::load(const std::string &file) { - raw_.resize(1024 * 1024 * 64); - std::string dat = util::read_file(file); - if (dat.empty() || !decompressBZ2(raw_, dat.data(), dat.size())) { - LOGW("bz2 decompress failed"); - return false; +bool LogReader::load(const std::string &file, bool is_bz2file) { + if (is_bz2file) { + std::ostringstream stream; + if (!readBZ2File(file, stream)) { + LOGW("bz2 decompress failed"); + return false; + } + raw_ = stream.str(); + } else { + raw_ = util::read_file(file); } kj::ArrayPtr words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); diff --git a/selfdrive/ui/replay/logreader.h b/selfdrive/ui/replay/logreader.h index 86962f6ee0..d57ac8c51d 100644 --- a/selfdrive/ui/replay/logreader.h +++ b/selfdrive/ui/replay/logreader.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include @@ -18,36 +17,7 @@ public: this->which = which; this->mono_time = mono_time; } - Event(const kj::ArrayPtr &amsg, bool frame=false) : reader(amsg), frame(frame) { - words = kj::ArrayPtr(amsg.begin(), reader.getEnd()); - event = reader.getRoot(); - which = event.which(); - mono_time = event.getLogMonoTime(); - - // 1) Send video data at t=timestampEof/timestampSof - // 2) Send encodeIndex packet at t=logMonoTime - if (frame) { - cereal::EncodeIndex::Reader idx; - if (which == cereal::Event::ROAD_ENCODE_IDX) { - idx = event.getRoadEncodeIdx(); - } else if (which == cereal::Event::DRIVER_ENCODE_IDX) { - idx = event.getDriverEncodeIdx(); - } else if (which == cereal::Event::WIDE_ROAD_ENCODE_IDX) { - idx = event.getWideRoadEncodeIdx(); - } else { - assert(false); - } - - // C2 only has eof set, and some older routes have neither - uint64_t sof = idx.getTimestampSof(); - uint64_t eof = idx.getTimestampEof(); - if (sof > 0) { - mono_time = sof; - } else if (eof > 0) { - mono_time = eof; - } - } - } + Event(const kj::ArrayPtr &amsg, bool frame = false); inline kj::ArrayPtr bytes() const { return words.asBytes(); } struct lessThan { @@ -68,10 +38,10 @@ class LogReader { public: LogReader() = default; ~LogReader(); - bool load(const std::string &file); + bool load(const std::string &file, bool is_bz2file); std::vector events; private: - std::vector raw_; + std::string raw_; }; diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index 313299e520..f16abcdc3d 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -7,22 +7,7 @@ #include "selfdrive/camerad/cameras/camera_common.h" #include "selfdrive/common/timing.h" #include "selfdrive/hardware/hw.h" - -inline void precise_nano_sleep(long sleep_ns) { - const long estimate_ns = 1 * 1e6; // 1ms - struct timespec req = {.tv_nsec = estimate_ns}; - uint64_t start_sleep = nanos_since_boot(); - while (sleep_ns > estimate_ns) { - nanosleep(&req, nullptr); - uint64_t end_sleep = nanos_since_boot(); - sleep_ns -= (end_sleep - start_sleep); - start_sleep = end_sleep; - } - // spin wait - if (sleep_ns > 0) { - while ((nanos_since_boot() - start_sleep) <= sleep_ns) { usleep(0); } - } -} +#include "selfdrive/ui/replay/util.h" Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_, bool dcam, bool ecam, QObject *parent) : sm(sm_), load_dcam(dcam), load_ecam(ecam), QObject(parent) { diff --git a/selfdrive/ui/replay/route.cc b/selfdrive/ui/replay/route.cc index c996bb4bdd..0a332f1348 100644 --- a/selfdrive/ui/replay/route.cc +++ b/selfdrive/ui/replay/route.cc @@ -1,7 +1,5 @@ #include "selfdrive/ui/replay/route.h" -#include - #include #include #include @@ -12,108 +10,7 @@ #include "selfdrive/hardware/hw.h" #include "selfdrive/ui/qt/api.h" - -struct CURLGlobalInitializer { - CURLGlobalInitializer() { curl_global_init(CURL_GLOBAL_DEFAULT); } - ~CURLGlobalInitializer() { curl_global_cleanup(); } -}; - -struct MultiPartWriter { - int64_t offset; - int64_t end; - FILE *fp; -}; - -static size_t write_cb(char *data, size_t n, size_t l, void *userp) { - MultiPartWriter *w = (MultiPartWriter *)userp; - fseek(w->fp, w->offset, SEEK_SET); - fwrite(data, l, n, w->fp); - w->offset += n * l; - return n * l; -} - -static size_t dumy_write_cb(char *data, size_t n, size_t l, void *userp) { return n * l; } - -int64_t getDownloadContentLength(const std::string &url) { - CURL *curl = curl_easy_init(); - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dumy_write_cb); - curl_easy_setopt(curl, CURLOPT_HEADER, 1); - curl_easy_setopt(curl, CURLOPT_NOBODY, 1); - CURLcode res = curl_easy_perform(curl); - double content_length = -1; - if (res == CURLE_OK) { - res = curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &content_length); - } - curl_easy_cleanup(curl); - return res == CURLE_OK ? (int64_t)content_length : -1; -} - -bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic *abort) { - int64_t content_length = getDownloadContentLength(url); - if (content_length == -1) return false; - - std::string tmp_file = target_file + ".tmp"; - FILE *fp = fopen(tmp_file.c_str(), "wb"); - // create a sparse file - fseek(fp, content_length, SEEK_SET); - - CURLM *cm = curl_multi_init(); - std::map writers; - const int part_size = content_length / parts; - for (int i = 0; i < parts; ++i) { - CURL *eh = curl_easy_init(); - writers[eh] = { - .fp = fp, - .offset = i * part_size, - .end = i == parts - 1 ? content_length - 1 : (i + 1) * part_size - 1, - }; - curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_cb); - curl_easy_setopt(eh, CURLOPT_WRITEDATA, (void *)(&writers[eh])); - curl_easy_setopt(eh, CURLOPT_URL, url.c_str()); - curl_easy_setopt(eh, CURLOPT_RANGE, util::string_format("%d-%d", writers[eh].offset, writers[eh].end).c_str()); - curl_easy_setopt(eh, CURLOPT_HTTPGET, 1); - curl_easy_setopt(eh, CURLOPT_NOSIGNAL, 1); - curl_easy_setopt(eh, CURLOPT_FOLLOWLOCATION, 1); - curl_multi_add_handle(cm, eh); - } - - int running = 1, success_cnt = 0; - while (!(abort && abort->load())){ - CURLMcode ret = curl_multi_perform(cm, &running); - - if (!running) { - CURLMsg *msg; - int msgs_left = -1; - while ((msg = curl_multi_info_read(cm, &msgs_left))) { - if (msg->msg == CURLMSG_DONE && msg->data.result == CURLE_OK) { - int http_status_code = 0; - curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_status_code); - success_cnt += (http_status_code == 206); - } - } - break; - } - - if (ret == CURLM_OK) { - curl_multi_wait(cm, nullptr, 0, 1000, nullptr); - } - }; - - fclose(fp); - bool success = success_cnt == parts; - if (success) { - success = ::rename(tmp_file.c_str(), target_file.c_str()) == 0; - } - - // cleanup curl - for (auto &[e, w] : writers) { - curl_multi_remove_handle(cm, e); - curl_easy_cleanup(e); - } - curl_multi_cleanup(cm); - return success; -} +#include "selfdrive/ui/replay/util.h" Route::Route(const QString &route) : route_(route) {} @@ -172,7 +69,6 @@ bool Route::loadFromJson(const QString &json) { // class Segment Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam) : seg_num_(n), files_(segment_files) { - static CURLGlobalInitializer curl_initializer; static std::once_flag once_flag; std::call_once(once_flag, [=]() { if (!CACHE_DIR.exists()) QDir().mkdir(CACHE_DIR.absolutePath()); @@ -215,7 +111,13 @@ Segment::~Segment() { void Segment::downloadFile(const QString &url) { qDebug() << "download" << url; download_threads_.emplace_back(QThread::create([=]() { - httpMultiPartDownload(url.toStdString(), localPath(url).toStdString(), connections_per_file, &aborting_); + const std::string local_file = localPath(url).toStdString(); + bool ret = httpMultiPartDownload(url.toStdString(), local_file, connections_per_file, &aborting_); + if (ret && url == log_path_) { + // pre-decompress log file. + std::ofstream ostrm(local_file + "_decompressed", std::ios::binary); + readBZ2File(local_file, ostrm); + } if (--downloading_ == 0 && !aborting_) { load(); } @@ -225,9 +127,13 @@ void Segment::downloadFile(const QString &url) { // load concurrency void Segment::load() { std::vector> futures; + futures.emplace_back(std::async(std::launch::async, [=]() { + const std::string bzip_file = localPath(log_path_).toStdString(); + const std::string decompressed_file = bzip_file + "_decompressed"; + bool is_bzip = !util::file_exists(decompressed_file); log = std::make_unique(); - return log->load(localPath(log_path_).toStdString()); + return log->load(is_bzip ? bzip_file : decompressed_file, is_bzip); })); QString camera_files[] = {road_cam_path_, files_.driver_cam, files_.wide_road_cam}; diff --git a/selfdrive/ui/replay/route.h b/selfdrive/ui/replay/route.h index fae2d45a47..0cce753414 100644 --- a/selfdrive/ui/replay/route.h +++ b/selfdrive/ui/replay/route.h @@ -67,5 +67,3 @@ protected: QString log_path_; std::vector download_threads_; }; - -bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic *abort = nullptr); diff --git a/selfdrive/ui/replay/tests/test_replay.cc b/selfdrive/ui/replay/tests/test_replay.cc index 329c3df43e..af2a8cae6c 100644 --- a/selfdrive/ui/replay/tests/test_replay.cc +++ b/selfdrive/ui/replay/tests/test_replay.cc @@ -6,7 +6,10 @@ #include "catch2/catch.hpp" #include "selfdrive/common/util.h" +#include "selfdrive/ui/replay/framereader.h" #include "selfdrive/ui/replay/replay.h" +#include "selfdrive/ui/replay/route.h" +#include "selfdrive/ui/replay/util.h" const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc"; diff --git a/selfdrive/ui/replay/util.cc b/selfdrive/ui/replay/util.cc new file mode 100644 index 0000000000..e48891cde6 --- /dev/null +++ b/selfdrive/ui/replay/util.cc @@ -0,0 +1,151 @@ +#include "selfdrive/ui/replay/util.h" + +#include +#include +#include + +#include "selfdrive/common/timing.h" +#include "selfdrive/common/util.h" + +struct CURLGlobalInitializer { + CURLGlobalInitializer() { curl_global_init(CURL_GLOBAL_DEFAULT); } + ~CURLGlobalInitializer() { curl_global_cleanup(); } +}; + +struct MultiPartWriter { + int64_t offset; + int64_t end; + FILE *fp; +}; + +static size_t write_cb(char *data, size_t n, size_t l, void *userp) { + MultiPartWriter *w = (MultiPartWriter *)userp; + fseek(w->fp, w->offset, SEEK_SET); + fwrite(data, l, n, w->fp); + w->offset += n * l; + return n * l; +} + +static size_t dumy_write_cb(char *data, size_t n, size_t l, void *userp) { return n * l; } + +int64_t getDownloadContentLength(const std::string &url) { + CURL *curl = curl_easy_init(); + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dumy_write_cb); + curl_easy_setopt(curl, CURLOPT_HEADER, 1); + curl_easy_setopt(curl, CURLOPT_NOBODY, 1); + CURLcode res = curl_easy_perform(curl); + double content_length = -1; + if (res == CURLE_OK) { + res = curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &content_length); + } + curl_easy_cleanup(curl); + return res == CURLE_OK ? (int64_t)content_length : -1; +} + +bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic *abort) { + static CURLGlobalInitializer curl_initializer; + + int64_t content_length = getDownloadContentLength(url); + if (content_length == -1) return false; + + std::string tmp_file = target_file + ".tmp"; + FILE *fp = fopen(tmp_file.c_str(), "wb"); + // create a sparse file + fseek(fp, content_length, SEEK_SET); + + CURLM *cm = curl_multi_init(); + std::map writers; + const int part_size = content_length / parts; + for (int i = 0; i < parts; ++i) { + CURL *eh = curl_easy_init(); + writers[eh] = { + .fp = fp, + .offset = i * part_size, + .end = i == parts - 1 ? content_length - 1 : (i + 1) * part_size - 1, + }; + curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_cb); + curl_easy_setopt(eh, CURLOPT_WRITEDATA, (void *)(&writers[eh])); + curl_easy_setopt(eh, CURLOPT_URL, url.c_str()); + curl_easy_setopt(eh, CURLOPT_RANGE, util::string_format("%d-%d", writers[eh].offset, writers[eh].end).c_str()); + curl_easy_setopt(eh, CURLOPT_HTTPGET, 1); + curl_easy_setopt(eh, CURLOPT_NOSIGNAL, 1); + curl_easy_setopt(eh, CURLOPT_FOLLOWLOCATION, 1); + curl_multi_add_handle(cm, eh); + } + + int running = 1, success_cnt = 0; + while (!(abort && abort->load())) { + CURLMcode ret = curl_multi_perform(cm, &running); + + if (!running) { + CURLMsg *msg; + int msgs_left = -1; + while ((msg = curl_multi_info_read(cm, &msgs_left))) { + if (msg->msg == CURLMSG_DONE && msg->data.result == CURLE_OK) { + int http_status_code = 0; + curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_status_code); + success_cnt += (http_status_code == 206); + } + } + break; + } + + if (ret == CURLM_OK) { + curl_multi_wait(cm, nullptr, 0, 1000, nullptr); + } + }; + + fclose(fp); + bool success = success_cnt == parts; + if (success) { + success = ::rename(tmp_file.c_str(), target_file.c_str()) == 0; + } + + // cleanup curl + for (auto &[e, w] : writers) { + curl_multi_remove_handle(cm, e); + curl_easy_cleanup(e); + } + curl_multi_cleanup(cm); + return success; +} + +bool readBZ2File(const std::string_view file, std::ostream &stream) { + std::unique_ptr f(fopen(file.data(), "r"), &fclose); + if (!f) return false; + + int bzerror = BZ_OK; + BZFILE *bz_file = BZ2_bzReadOpen(&bzerror, f.get(), 0, 0, nullptr, 0); + if (!bz_file) return false; + + std::array buf; + do { + int size = BZ2_bzRead(&bzerror, bz_file, buf.data(), buf.size()); + if (bzerror == BZ_OK || bzerror == BZ_STREAM_END) { + stream.write(buf.data(), size); + } + } while (bzerror == BZ_OK); + + bool success = (bzerror == BZ_STREAM_END); + BZ2_bzReadClose(&bzerror, bz_file); + return success; +} + +void precise_nano_sleep(long sleep_ns) { + const long estimate_ns = 1 * 1e6; // 1ms + struct timespec req = {.tv_nsec = estimate_ns}; + uint64_t start_sleep = nanos_since_boot(); + while (sleep_ns > estimate_ns) { + nanosleep(&req, nullptr); + uint64_t end_sleep = nanos_since_boot(); + sleep_ns -= (end_sleep - start_sleep); + start_sleep = end_sleep; + } + // spin wait + if (sleep_ns > 0) { + while ((nanos_since_boot() - start_sleep) <= sleep_ns) { + usleep(0); + } + } +} diff --git a/selfdrive/ui/replay/util.h b/selfdrive/ui/replay/util.h new file mode 100644 index 0000000000..70095bef44 --- /dev/null +++ b/selfdrive/ui/replay/util.h @@ -0,0 +1,10 @@ +#pragma once + +#include +#include +#include +#include + +void precise_nano_sleep(long sleep_ns); +bool readBZ2File(const std::string_view file, std::ostream &stream); +bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic *abort = nullptr);