replay : move utility functions into separate file (#22414)

* move functions into util

* read bz2 into stream

* pre-decompress log in the download thread

* cleanup logreader

* cache sha256 path

* use readBZ2file in test_logger

* Revert "cache sha256 path"

This reverts commit 60459d3ea09a2c80f4560cf95b1ce7d6af59f06d.

* use macro

* use ostringstream

* cleanup readBZ2File

* move precise_nano_sleep into util
old-commit-hash: d28b98c602
commatwo_master
Dean Lee 4 years ago committed by GitHub
parent 430e9808b3
commit 66ca3985c9
  1. 2
      selfdrive/loggerd/SConscript
  2. 34
      selfdrive/loggerd/tests/test_logger.cc
  3. 2
      selfdrive/ui/SConscript
  4. 66
      selfdrive/ui/replay/logreader.cc
  5. 36
      selfdrive/ui/replay/logreader.h
  6. 17
      selfdrive/ui/replay/replay.cc
  7. 120
      selfdrive/ui/replay/route.cc
  8. 2
      selfdrive/ui/replay/route.h
  9. 3
      selfdrive/ui/replay/tests/test_replay.cc
  10. 151
      selfdrive/ui/replay/util.cc
  11. 10
      selfdrive/ui/replay/util.h

@ -28,4 +28,4 @@ env.Program(src, LIBS=libs)
env.Program('bootlog.cc', LIBS=libs)
if GetOption('test'):
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc'], LIBS=[libs])
env.Program('tests/test_logger', ['tests/test_runner.cc', 'tests/test_logger.cc', env.Object('logger_util', '#/selfdrive/ui/replay/util.cc')], LIBS=[libs] + ['curl'])

@ -1,38 +1,18 @@
#include <sys/stat.h>
#include <climits>
#include <condition_variable>
#include <sstream>
#include <thread>
#include <climits>
#include "catch2/catch.hpp"
#include "cereal/messaging/messaging.h"
#include "selfdrive/common/util.h"
#include "selfdrive/loggerd/logger.h"
#include "selfdrive/ui/replay/util.h"
typedef cereal::Sentinel::SentinelType SentinelType;
bool decompressBZ2(std::vector<uint8_t> &dest, const char srcData[], size_t srcSize, size_t outputSizeIncrement = 0x100000U) {
bz_stream strm = {};
int ret = BZ2_bzDecompressInit(&strm, 0, 0);
assert(ret == BZ_OK);
dest.resize(1024 * 1024);
strm.next_in = const_cast<char *>(srcData);
strm.avail_in = srcSize;
do {
strm.next_out = (char *)&dest[strm.total_out_lo32];
strm.avail_out = dest.size() - strm.total_out_lo32;
ret = BZ2_bzDecompress(&strm);
if (ret == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) {
dest.resize(dest.size() + outputSizeIncrement);
}
} while (ret == BZ_OK && strm.avail_in > 0);
BZ2_bzDecompressEnd(&strm);
dest.resize(strm.total_out_lo32);
return ret == BZ_STREAM_END;
}
void verify_segment(const std::string &route_path, int segment, int max_segment, int required_event_cnt) {
const std::string segment_path = route_path + "--" + std::to_string(segment);
SentinelType begin_sentinel = segment == 0 ? SentinelType::START_OF_ROUTE : SentinelType::START_OF_SEGMENT;
@ -42,13 +22,11 @@ void verify_segment(const std::string &route_path, int segment, int max_segment,
for (const char *fn : {"/rlog.bz2", "/qlog.bz2"}) {
const std::string log_file = segment_path + fn;
INFO(log_file);
std::string log_bz2 = util::read_file(log_file);
REQUIRE(log_bz2.size() > 0);
std::vector<uint8_t> log;
bool ret = decompressBZ2(log, log_bz2.data(), log_bz2.size());
std::ostringstream stream;
bool ret = readBZ2File(log_file, stream);
REQUIRE(ret);
std::string log = stream.str();
int event_cnt = 0, i = 0;
kj::ArrayPtr<const capnp::word> words((capnp::word *)log.data(), log.size() / sizeof(capnp::word));
while (words.size() > 0) {

@ -108,7 +108,7 @@ if GetOption('setup'):
if arch in ['x86_64', 'Darwin'] and os.path.exists(Dir("#tools/").get_abspath()):
qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"]
replay_lib_src = ["replay/replay.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc"]
replay_lib_src = ["replay/replay.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"]
replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs)
replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2', 'curl'] + qt_libs

@ -1,41 +1,57 @@
#include "selfdrive/ui/replay/logreader.h"
#include <cassert>
#include <bzlib.h>
#include <sstream>
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/util.h"
static bool decompressBZ2(std::vector<uint8_t> &dest, const char srcData[], size_t srcSize,
size_t outputSizeIncrement = 0x100000U) {
bz_stream strm = {};
int ret = BZ2_bzDecompressInit(&strm, 0, 0);
assert(ret == BZ_OK);
strm.next_in = const_cast<char *>(srcData);
strm.avail_in = srcSize;
do {
strm.next_out = (char *)&dest[strm.total_out_lo32];
strm.avail_out = dest.size() - strm.total_out_lo32;
ret = BZ2_bzDecompress(&strm);
if (ret == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) {
dest.resize(dest.size() + outputSizeIncrement);
Event::Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame) : reader(amsg), frame(frame) {
words = kj::ArrayPtr<const capnp::word>(amsg.begin(), reader.getEnd());
event = reader.getRoot<cereal::Event>();
which = event.which();
mono_time = event.getLogMonoTime();
// 1) Send video data at t=timestampEof/timestampSof
// 2) Send encodeIndex packet at t=logMonoTime
if (frame) {
cereal::EncodeIndex::Reader idx;
if (which == cereal::Event::ROAD_ENCODE_IDX) {
idx = event.getRoadEncodeIdx();
} else if (which == cereal::Event::DRIVER_ENCODE_IDX) {
idx = event.getDriverEncodeIdx();
} else if (which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
idx = event.getWideRoadEncodeIdx();
} else {
assert(false);
}
} while (ret == BZ_OK);
BZ2_bzDecompressEnd(&strm);
dest.resize(strm.total_out_lo32);
return ret == BZ_STREAM_END;
// C2 only has eof set, and some older routes have neither
uint64_t sof = idx.getTimestampSof();
uint64_t eof = idx.getTimestampEof();
if (sof > 0) {
mono_time = sof;
} else if (eof > 0) {
mono_time = eof;
}
}
}
// class LogReader
LogReader::~LogReader() {
for (auto e : events) delete e;
}
bool LogReader::load(const std::string &file) {
raw_.resize(1024 * 1024 * 64);
std::string dat = util::read_file(file);
if (dat.empty() || !decompressBZ2(raw_, dat.data(), dat.size())) {
LOGW("bz2 decompress failed");
return false;
bool LogReader::load(const std::string &file, bool is_bz2file) {
if (is_bz2file) {
std::ostringstream stream;
if (!readBZ2File(file, stream)) {
LOGW("bz2 decompress failed");
return false;
}
raw_ = stream.str();
} else {
raw_ = util::read_file(file);
}
kj::ArrayPtr<const capnp::word> words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word));

@ -1,7 +1,6 @@
#pragma once
#include <unordered_map>
#include <vector>
#include <cassert>
#include <capnp/serialize.h>
@ -18,36 +17,7 @@ public:
this->which = which;
this->mono_time = mono_time;
}
Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame=false) : reader(amsg), frame(frame) {
words = kj::ArrayPtr<const capnp::word>(amsg.begin(), reader.getEnd());
event = reader.getRoot<cereal::Event>();
which = event.which();
mono_time = event.getLogMonoTime();
// 1) Send video data at t=timestampEof/timestampSof
// 2) Send encodeIndex packet at t=logMonoTime
if (frame) {
cereal::EncodeIndex::Reader idx;
if (which == cereal::Event::ROAD_ENCODE_IDX) {
idx = event.getRoadEncodeIdx();
} else if (which == cereal::Event::DRIVER_ENCODE_IDX) {
idx = event.getDriverEncodeIdx();
} else if (which == cereal::Event::WIDE_ROAD_ENCODE_IDX) {
idx = event.getWideRoadEncodeIdx();
} else {
assert(false);
}
// C2 only has eof set, and some older routes have neither
uint64_t sof = idx.getTimestampSof();
uint64_t eof = idx.getTimestampEof();
if (sof > 0) {
mono_time = sof;
} else if (eof > 0) {
mono_time = eof;
}
}
}
Event(const kj::ArrayPtr<const capnp::word> &amsg, bool frame = false);
inline kj::ArrayPtr<const capnp::byte> bytes() const { return words.asBytes(); }
struct lessThan {
@ -68,10 +38,10 @@ class LogReader {
public:
LogReader() = default;
~LogReader();
bool load(const std::string &file);
bool load(const std::string &file, bool is_bz2file);
std::vector<Event*> events;
private:
std::vector<uint8_t> raw_;
std::string raw_;
};

@ -7,22 +7,7 @@
#include "selfdrive/camerad/cameras/camera_common.h"
#include "selfdrive/common/timing.h"
#include "selfdrive/hardware/hw.h"
inline void precise_nano_sleep(long sleep_ns) {
const long estimate_ns = 1 * 1e6; // 1ms
struct timespec req = {.tv_nsec = estimate_ns};
uint64_t start_sleep = nanos_since_boot();
while (sleep_ns > estimate_ns) {
nanosleep(&req, nullptr);
uint64_t end_sleep = nanos_since_boot();
sleep_ns -= (end_sleep - start_sleep);
start_sleep = end_sleep;
}
// spin wait
if (sleep_ns > 0) {
while ((nanos_since_boot() - start_sleep) <= sleep_ns) { usleep(0); }
}
}
#include "selfdrive/ui/replay/util.h"
Replay::Replay(QString route, QStringList allow, QStringList block, SubMaster *sm_, bool dcam, bool ecam, QObject *parent)
: sm(sm_), load_dcam(dcam), load_ecam(ecam), QObject(parent) {

@ -1,7 +1,5 @@
#include "selfdrive/ui/replay/route.h"
#include <curl/curl.h>
#include <QEventLoop>
#include <QFile>
#include <QJsonArray>
@ -12,108 +10,7 @@
#include "selfdrive/hardware/hw.h"
#include "selfdrive/ui/qt/api.h"
struct CURLGlobalInitializer {
CURLGlobalInitializer() { curl_global_init(CURL_GLOBAL_DEFAULT); }
~CURLGlobalInitializer() { curl_global_cleanup(); }
};
struct MultiPartWriter {
int64_t offset;
int64_t end;
FILE *fp;
};
static size_t write_cb(char *data, size_t n, size_t l, void *userp) {
MultiPartWriter *w = (MultiPartWriter *)userp;
fseek(w->fp, w->offset, SEEK_SET);
fwrite(data, l, n, w->fp);
w->offset += n * l;
return n * l;
}
static size_t dumy_write_cb(char *data, size_t n, size_t l, void *userp) { return n * l; }
int64_t getDownloadContentLength(const std::string &url) {
CURL *curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dumy_write_cb);
curl_easy_setopt(curl, CURLOPT_HEADER, 1);
curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
CURLcode res = curl_easy_perform(curl);
double content_length = -1;
if (res == CURLE_OK) {
res = curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &content_length);
}
curl_easy_cleanup(curl);
return res == CURLE_OK ? (int64_t)content_length : -1;
}
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort) {
int64_t content_length = getDownloadContentLength(url);
if (content_length == -1) return false;
std::string tmp_file = target_file + ".tmp";
FILE *fp = fopen(tmp_file.c_str(), "wb");
// create a sparse file
fseek(fp, content_length, SEEK_SET);
CURLM *cm = curl_multi_init();
std::map<CURL *, MultiPartWriter> writers;
const int part_size = content_length / parts;
for (int i = 0; i < parts; ++i) {
CURL *eh = curl_easy_init();
writers[eh] = {
.fp = fp,
.offset = i * part_size,
.end = i == parts - 1 ? content_length - 1 : (i + 1) * part_size - 1,
};
curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_cb);
curl_easy_setopt(eh, CURLOPT_WRITEDATA, (void *)(&writers[eh]));
curl_easy_setopt(eh, CURLOPT_URL, url.c_str());
curl_easy_setopt(eh, CURLOPT_RANGE, util::string_format("%d-%d", writers[eh].offset, writers[eh].end).c_str());
curl_easy_setopt(eh, CURLOPT_HTTPGET, 1);
curl_easy_setopt(eh, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(eh, CURLOPT_FOLLOWLOCATION, 1);
curl_multi_add_handle(cm, eh);
}
int running = 1, success_cnt = 0;
while (!(abort && abort->load())){
CURLMcode ret = curl_multi_perform(cm, &running);
if (!running) {
CURLMsg *msg;
int msgs_left = -1;
while ((msg = curl_multi_info_read(cm, &msgs_left))) {
if (msg->msg == CURLMSG_DONE && msg->data.result == CURLE_OK) {
int http_status_code = 0;
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_status_code);
success_cnt += (http_status_code == 206);
}
}
break;
}
if (ret == CURLM_OK) {
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
}
};
fclose(fp);
bool success = success_cnt == parts;
if (success) {
success = ::rename(tmp_file.c_str(), target_file.c_str()) == 0;
}
// cleanup curl
for (auto &[e, w] : writers) {
curl_multi_remove_handle(cm, e);
curl_easy_cleanup(e);
}
curl_multi_cleanup(cm);
return success;
}
#include "selfdrive/ui/replay/util.h"
Route::Route(const QString &route) : route_(route) {}
@ -172,7 +69,6 @@ bool Route::loadFromJson(const QString &json) {
// class Segment
Segment::Segment(int n, const SegmentFile &segment_files, bool load_dcam, bool load_ecam) : seg_num_(n), files_(segment_files) {
static CURLGlobalInitializer curl_initializer;
static std::once_flag once_flag;
std::call_once(once_flag, [=]() {
if (!CACHE_DIR.exists()) QDir().mkdir(CACHE_DIR.absolutePath());
@ -215,7 +111,13 @@ Segment::~Segment() {
void Segment::downloadFile(const QString &url) {
qDebug() << "download" << url;
download_threads_.emplace_back(QThread::create([=]() {
httpMultiPartDownload(url.toStdString(), localPath(url).toStdString(), connections_per_file, &aborting_);
const std::string local_file = localPath(url).toStdString();
bool ret = httpMultiPartDownload(url.toStdString(), local_file, connections_per_file, &aborting_);
if (ret && url == log_path_) {
// pre-decompress log file.
std::ofstream ostrm(local_file + "_decompressed", std::ios::binary);
readBZ2File(local_file, ostrm);
}
if (--downloading_ == 0 && !aborting_) {
load();
}
@ -225,9 +127,13 @@ void Segment::downloadFile(const QString &url) {
// load concurrency
void Segment::load() {
std::vector<std::future<bool>> futures;
futures.emplace_back(std::async(std::launch::async, [=]() {
const std::string bzip_file = localPath(log_path_).toStdString();
const std::string decompressed_file = bzip_file + "_decompressed";
bool is_bzip = !util::file_exists(decompressed_file);
log = std::make_unique<LogReader>();
return log->load(localPath(log_path_).toStdString());
return log->load(is_bzip ? bzip_file : decompressed_file, is_bzip);
}));
QString camera_files[] = {road_cam_path_, files_.driver_cam, files_.wide_road_cam};

@ -67,5 +67,3 @@ protected:
QString log_path_;
std::vector<QThread*> download_threads_;
};
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort = nullptr);

@ -6,7 +6,10 @@
#include "catch2/catch.hpp"
#include "selfdrive/common/util.h"
#include "selfdrive/ui/replay/framereader.h"
#include "selfdrive/ui/replay/replay.h"
#include "selfdrive/ui/replay/route.h"
#include "selfdrive/ui/replay/util.h"
const char *stream_url = "https://commadataci.blob.core.windows.net/openpilotci/0c94aa1e1296d7c6/2021-05-05--19-48-37/0/fcamera.hevc";

@ -0,0 +1,151 @@
#include "selfdrive/ui/replay/util.h"
#include <cassert>
#include <bzlib.h>
#include <curl/curl.h>
#include "selfdrive/common/timing.h"
#include "selfdrive/common/util.h"
struct CURLGlobalInitializer {
CURLGlobalInitializer() { curl_global_init(CURL_GLOBAL_DEFAULT); }
~CURLGlobalInitializer() { curl_global_cleanup(); }
};
struct MultiPartWriter {
int64_t offset;
int64_t end;
FILE *fp;
};
static size_t write_cb(char *data, size_t n, size_t l, void *userp) {
MultiPartWriter *w = (MultiPartWriter *)userp;
fseek(w->fp, w->offset, SEEK_SET);
fwrite(data, l, n, w->fp);
w->offset += n * l;
return n * l;
}
static size_t dumy_write_cb(char *data, size_t n, size_t l, void *userp) { return n * l; }
int64_t getDownloadContentLength(const std::string &url) {
CURL *curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, dumy_write_cb);
curl_easy_setopt(curl, CURLOPT_HEADER, 1);
curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
CURLcode res = curl_easy_perform(curl);
double content_length = -1;
if (res == CURLE_OK) {
res = curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &content_length);
}
curl_easy_cleanup(curl);
return res == CURLE_OK ? (int64_t)content_length : -1;
}
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort) {
static CURLGlobalInitializer curl_initializer;
int64_t content_length = getDownloadContentLength(url);
if (content_length == -1) return false;
std::string tmp_file = target_file + ".tmp";
FILE *fp = fopen(tmp_file.c_str(), "wb");
// create a sparse file
fseek(fp, content_length, SEEK_SET);
CURLM *cm = curl_multi_init();
std::map<CURL *, MultiPartWriter> writers;
const int part_size = content_length / parts;
for (int i = 0; i < parts; ++i) {
CURL *eh = curl_easy_init();
writers[eh] = {
.fp = fp,
.offset = i * part_size,
.end = i == parts - 1 ? content_length - 1 : (i + 1) * part_size - 1,
};
curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, write_cb);
curl_easy_setopt(eh, CURLOPT_WRITEDATA, (void *)(&writers[eh]));
curl_easy_setopt(eh, CURLOPT_URL, url.c_str());
curl_easy_setopt(eh, CURLOPT_RANGE, util::string_format("%d-%d", writers[eh].offset, writers[eh].end).c_str());
curl_easy_setopt(eh, CURLOPT_HTTPGET, 1);
curl_easy_setopt(eh, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(eh, CURLOPT_FOLLOWLOCATION, 1);
curl_multi_add_handle(cm, eh);
}
int running = 1, success_cnt = 0;
while (!(abort && abort->load())) {
CURLMcode ret = curl_multi_perform(cm, &running);
if (!running) {
CURLMsg *msg;
int msgs_left = -1;
while ((msg = curl_multi_info_read(cm, &msgs_left))) {
if (msg->msg == CURLMSG_DONE && msg->data.result == CURLE_OK) {
int http_status_code = 0;
curl_easy_getinfo(msg->easy_handle, CURLINFO_RESPONSE_CODE, &http_status_code);
success_cnt += (http_status_code == 206);
}
}
break;
}
if (ret == CURLM_OK) {
curl_multi_wait(cm, nullptr, 0, 1000, nullptr);
}
};
fclose(fp);
bool success = success_cnt == parts;
if (success) {
success = ::rename(tmp_file.c_str(), target_file.c_str()) == 0;
}
// cleanup curl
for (auto &[e, w] : writers) {
curl_multi_remove_handle(cm, e);
curl_easy_cleanup(e);
}
curl_multi_cleanup(cm);
return success;
}
bool readBZ2File(const std::string_view file, std::ostream &stream) {
std::unique_ptr<FILE, decltype(&fclose)> f(fopen(file.data(), "r"), &fclose);
if (!f) return false;
int bzerror = BZ_OK;
BZFILE *bz_file = BZ2_bzReadOpen(&bzerror, f.get(), 0, 0, nullptr, 0);
if (!bz_file) return false;
std::array<char, 64 * 1024> buf;
do {
int size = BZ2_bzRead(&bzerror, bz_file, buf.data(), buf.size());
if (bzerror == BZ_OK || bzerror == BZ_STREAM_END) {
stream.write(buf.data(), size);
}
} while (bzerror == BZ_OK);
bool success = (bzerror == BZ_STREAM_END);
BZ2_bzReadClose(&bzerror, bz_file);
return success;
}
void precise_nano_sleep(long sleep_ns) {
const long estimate_ns = 1 * 1e6; // 1ms
struct timespec req = {.tv_nsec = estimate_ns};
uint64_t start_sleep = nanos_since_boot();
while (sleep_ns > estimate_ns) {
nanosleep(&req, nullptr);
uint64_t end_sleep = nanos_since_boot();
sleep_ns -= (end_sleep - start_sleep);
start_sleep = end_sleep;
}
// spin wait
if (sleep_ns > 0) {
while ((nanos_since_boot() - start_sleep) <= sleep_ns) {
usleep(0);
}
}
}

@ -0,0 +1,10 @@
#pragma once
#include <atomic>
#include <ostream>
#include <string>
#include <vector>
void precise_nano_sleep(long sleep_ns);
bool readBZ2File(const std::string_view file, std::ostream &stream);
bool httpMultiPartDownload(const std::string &url, const std::string &target_file, int parts, std::atomic<bool> *abort = nullptr);
Loading…
Cancel
Save