diff --git a/selfdrive/camerad/cameras/camera_replay.cc b/selfdrive/camerad/cameras/camera_replay.cc index 6bc53d8c94..731aab2a29 100644 --- a/selfdrive/camerad/cameras/camera_replay.cc +++ b/selfdrive/camerad/cameras/camera_replay.cc @@ -23,7 +23,7 @@ std::string get_url(std::string route_name, const std::string &camera, int segme } void camera_init(VisionIpcServer *v, CameraState *s, int camera_id, unsigned int fps, cl_device_id device_id, cl_context ctx, VisionStreamType rgb_type, VisionStreamType yuv_type, const std::string &url) { - s->frame = new FrameReader(true); + s->frame = new FrameReader(); if (!s->frame->load(url)) { printf("failed to load stream from %s", url.c_str()); assert(0); diff --git a/selfdrive/ui/replay/framereader.cc b/selfdrive/ui/replay/framereader.cc index e42859fd4f..32af922f1f 100644 --- a/selfdrive/ui/replay/framereader.cc +++ b/selfdrive/ui/replay/framereader.cc @@ -34,8 +34,7 @@ enum AVPixelFormat get_hw_format(AVCodecContext *ctx, const enum AVPixelFormat * } // namespace -FrameReader::FrameReader(bool local_cache, int chunk_size, int retries) : FileReader(local_cache, chunk_size, retries) { -} +FrameReader::FrameReader() {} FrameReader::~FrameReader() { for (AVPacket *pkt : packets) { @@ -52,17 +51,22 @@ FrameReader::~FrameReader() { } } -bool FrameReader::load(const std::string &url, bool no_cuda, std::atomic *abort) { - std::string content = read(url, abort); - if (content.empty()) return false; +bool FrameReader::load(const std::string &url, bool no_cuda, std::atomic *abort, bool local_cache, int chunk_size, int retries) { + FileReader f(local_cache, chunk_size, retries); + std::string data = f.read(url, abort); + if (data.empty()) return false; + + return load((std::byte *)data.data(), data.size(), no_cuda, abort); +} +bool FrameReader::load(const std::byte *data, size_t size, bool no_cuda, std::atomic *abort) { input_ctx = avformat_alloc_context(); if (!input_ctx) return false; struct buffer_data bd = { - .data = (uint8_t *)content.data(), + .data = (const uint8_t*)data, .offset = 0, - .size = content.size(), + .size = size, }; const int avio_ctx_buffer_size = 64 * 1024; unsigned char *avio_ctx_buffer = (unsigned char *)av_malloc(avio_ctx_buffer_size); @@ -70,11 +74,11 @@ bool FrameReader::load(const std::string &url, bool no_cuda, std::atomic * input_ctx->pb = avio_ctx_; input_ctx->probesize = 10 * 1024 * 1024; // 10MB - int ret = avformat_open_input(&input_ctx, url.c_str(), NULL, NULL); + int ret = avformat_open_input(&input_ctx, nullptr, nullptr, nullptr); if (ret != 0) { char err_str[1024] = {0}; av_strerror(ret, err_str, std::size(err_str)); - printf("Error loading video - %s - %s\n", err_str, url.c_str()); + printf("Error loading video - %s\n", err_str); return false; } @@ -103,7 +107,7 @@ bool FrameReader::load(const std::string &url, bool no_cuda, std::atomic * } } - ret = avcodec_open2(decoder_ctx, decoder, NULL); + ret = avcodec_open2(decoder_ctx, decoder, nullptr); if (ret < 0) return false; packets.reserve(60 * 20); // 20fps, one minute diff --git a/selfdrive/ui/replay/framereader.h b/selfdrive/ui/replay/framereader.h index 8a3f404158..d572b727e5 100644 --- a/selfdrive/ui/replay/framereader.h +++ b/selfdrive/ui/replay/framereader.h @@ -15,11 +15,12 @@ struct AVFrameDeleter { void operator()(AVFrame* frame) const { av_frame_free(&frame); } }; -class FrameReader : protected FileReader { +class FrameReader { public: - FrameReader(bool local_cache = false, int chunk_size = -1, int retries = 0); + FrameReader(); ~FrameReader(); - bool load(const std::string &url, bool no_cuda = false, std::atomic *abort = nullptr); + bool load(const std::string &url, bool no_cuda = false, std::atomic *abort = nullptr, bool local_cache = false, int chunk_size = -1, int retries = 0); + bool load(const std::byte *data, size_t size, bool no_cuda = false, std::atomic *abort = nullptr); bool get(int idx, uint8_t *rgb, uint8_t *yuv); int getRGBSize() const { return width * height * 3; } int getYUVSize() const { return width * height * 3 / 2; } diff --git a/selfdrive/ui/replay/logreader.cc b/selfdrive/ui/replay/logreader.cc index 7e7a943f88..8e2836a4ff 100644 --- a/selfdrive/ui/replay/logreader.cc +++ b/selfdrive/ui/replay/logreader.cc @@ -1,6 +1,7 @@ #include "selfdrive/ui/replay/logreader.h" #include +#include #include "selfdrive/ui/replay/util.h" Event::Event(const kj::ArrayPtr &amsg, bool frame) : reader(amsg), frame(frame) { @@ -26,7 +27,7 @@ Event::Event(const kj::ArrayPtr &amsg, bool frame) : reader(a // class LogReader -LogReader::LogReader(bool local_cache, int chunk_size, int retries, size_t memory_pool_block_size) : FileReader(local_cache, chunk_size, retries) { +LogReader::LogReader(size_t memory_pool_block_size) { #ifdef HAS_MEMORY_RESOURCE const size_t buf_size = sizeof(Event) * memory_pool_block_size; pool_buffer_ = ::operator new(buf_size); @@ -39,19 +40,32 @@ LogReader::~LogReader() { for (Event *e : events) { delete e; } + #ifdef HAS_MEMORY_RESOURCE delete mbr_; ::operator delete(pool_buffer_); #endif } -bool LogReader::load(const std::string &file, std::atomic *abort) { - raw_ = decompressBZ2(read(file, abort)); - if (raw_.empty()) return false; +bool LogReader::load(const std::string &url, std::atomic *abort, bool local_cache, int chunk_size, int retries) { + FileReader f(local_cache, chunk_size, retries); + std::string data = f.read(url, abort); + if (data.empty()) return false; + + return load((std::byte*)data.data(), data.size(), abort); +} + +bool LogReader::load(const std::byte *data, size_t size, std::atomic *abort) { + raw_ = decompressBZ2(data, size); + if (raw_.empty()) { + std::cout << "failed to decompress log" << std::endl; + return false; + } + + try { + kj::ArrayPtr words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); + while (words.size() > 0) { - kj::ArrayPtr words((const capnp::word *)raw_.data(), raw_.size() / sizeof(capnp::word)); - while (words.size() > 0) { - try { #ifdef HAS_MEMORY_RESOURCE Event *evt = new (mbr_) Event(words); #else @@ -62,20 +76,26 @@ bool LogReader::load(const std::string &file, std::atomic *abort) { if (evt->which == cereal::Event::ROAD_ENCODE_IDX || evt->which == cereal::Event::DRIVER_ENCODE_IDX || evt->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) { + #ifdef HAS_MEMORY_RESOURCE Event *frame_evt = new (mbr_) Event(words, true); #else Event *frame_evt = new Event(words, true); #endif + events.push_back(frame_evt); } words = kj::arrayPtr(evt->reader.getEnd(), words.end()); events.push_back(evt); - } catch (const kj::Exception &e) { - return false; } + } catch (const kj::Exception &e) { + std::cout << "failed to parse log : " << e.getDescription().cStr() << std::endl; + if (events.empty()) return false; + + std::cout << "read " << events.size() << " events from corrupt log" << std::endl; } + std::sort(events.begin(), events.end(), Event::lessThan()); return true; } diff --git a/selfdrive/ui/replay/logreader.h b/selfdrive/ui/replay/logreader.h index 5bb613d9de..33d7ea82f2 100644 --- a/selfdrive/ui/replay/logreader.h +++ b/selfdrive/ui/replay/logreader.h @@ -46,11 +46,12 @@ public: bool frame; }; -class LogReader : protected FileReader { +class LogReader { public: - LogReader(bool local_cache = false, int chunk_size = -1, int retries = 0, size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE); + LogReader(size_t memory_pool_block_size = DEFAULT_EVENT_MEMORY_POOL_BLOCK_SIZE); ~LogReader(); - bool load(const std::string &file, std::atomic *abort = nullptr); + bool load(const std::string &url, std::atomic *abort = nullptr, bool local_cache = false, int chunk_size = -1, int retries = 0); + bool load(const std::byte *data, size_t size, std::atomic *abort = nullptr); std::vector events; diff --git a/selfdrive/ui/replay/route.cc b/selfdrive/ui/replay/route.cc index b3736fa75f..23c27073cb 100644 --- a/selfdrive/ui/replay/route.cc +++ b/selfdrive/ui/replay/route.cc @@ -116,11 +116,11 @@ void Segment::loadFile(int id, const std::string file) { const bool local_cache = !(flags & REPLAY_FLAG_NO_FILE_CACHE); bool success = false; if (id < MAX_CAMERAS) { - frames[id] = std::make_unique(local_cache, 20 * 1024 * 1024, 3); - success = frames[id]->load(file, flags & REPLAY_FLAG_NO_CUDA, &abort_); + frames[id] = std::make_unique(); + success = frames[id]->load(file, flags & REPLAY_FLAG_NO_CUDA, &abort_, local_cache, 20 * 1024 * 1024, 3); } else { - log = std::make_unique(local_cache, 0, 3); - success = log->load(file, &abort_); + log = std::make_unique(); + success = log->load(file, &abort_, local_cache, 0, 3); } if (!success) { diff --git a/selfdrive/ui/replay/tests/test_replay.cc b/selfdrive/ui/replay/tests/test_replay.cc index 9efc55a610..5b9b7cdeb7 100644 --- a/selfdrive/ui/replay/tests/test_replay.cc +++ b/selfdrive/ui/replay/tests/test_replay.cc @@ -50,6 +50,17 @@ TEST_CASE("FileReader") { } } +TEST_CASE("LogReader") { + SECTION("corrupt log") { + FileReader reader(true); + std::string corrupt_content = reader.read(TEST_RLOG_URL); + corrupt_content.resize(corrupt_content.length() / 2); + LogReader log; + REQUIRE(log.load((std::byte *)corrupt_content.data(), corrupt_content.size())); + REQUIRE(log.events.size() > 0); + } +} + TEST_CASE("Segment") { auto flags = GENERATE(REPLAY_FLAG_DCAM | REPLAY_FLAG_ECAM, REPLAY_FLAG_QCAMERA); Route demo_route(DEMO_ROUTE); diff --git a/selfdrive/ui/replay/util.cc b/selfdrive/ui/replay/util.cc index 27f0bd9cfd..d6791465f2 100644 --- a/selfdrive/ui/replay/util.cc +++ b/selfdrive/ui/replay/util.cc @@ -194,19 +194,32 @@ bool httpDownload(const std::string &url, const std::string &file, size_t chunk_ } std::string decompressBZ2(const std::string &in) { - if (in.empty()) return {}; + return decompressBZ2((std::byte *)in.data(), in.size()); +} + +std::string decompressBZ2(const std::byte *in, size_t in_size) { + if (in_size == 0) return {}; bz_stream strm = {}; int bzerror = BZ2_bzDecompressInit(&strm, 0, 0); assert(bzerror == BZ_OK); - strm.next_in = (char *)in.data(); - strm.avail_in = in.size(); - std::string out(in.size() * 5, '\0'); + strm.next_in = (char *)in; + strm.avail_in = in_size; + std::string out(in_size * 5, '\0'); do { strm.next_out = (char *)(&out[strm.total_out_lo32]); strm.avail_out = out.size() - strm.total_out_lo32; + + const char *prev_write_pos = strm.next_out; bzerror = BZ2_bzDecompress(&strm); + if (bzerror == BZ_OK && prev_write_pos == strm.next_out) { + // content is corrupt + bzerror = BZ_STREAM_END; + std::cout << "decompressBZ2 error : content is corrupt" << std::endl; + break; + } + if (bzerror == BZ_OK && strm.avail_in > 0 && strm.avail_out == 0) { out.resize(out.size() * 2); } diff --git a/selfdrive/ui/replay/util.h b/selfdrive/ui/replay/util.h index 85d7af0125..726e65cb94 100644 --- a/selfdrive/ui/replay/util.h +++ b/selfdrive/ui/replay/util.h @@ -6,6 +6,7 @@ std::string sha256(const std::string &str); void precise_nano_sleep(long sleep_ns); std::string decompressBZ2(const std::string &in); +std::string decompressBZ2(const std::byte *in, size_t in_size); void enableHttpLogging(bool enable); std::string getUrlWithoutQuery(const std::string &url); size_t getRemoteFileSize(const std::string &url);