From 67fe3feb09f7b800f197da328a5e5d837c30a141 Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Tue, 8 Jun 2021 14:31:59 +0800 Subject: [PATCH] refactor FrameReader (#21141) * refactor FrameReader * continue * move that * small cleanup * little more Co-authored-by: Adeeb Shihadeh --- selfdrive/ui/replay/framereader.cc | 214 ++++++++++++++++++----------- selfdrive/ui/replay/framereader.h | 52 ++++--- selfdrive/ui/replay/replay.cc | 25 ++-- 3 files changed, 175 insertions(+), 116 deletions(-) diff --git a/selfdrive/ui/replay/framereader.cc b/selfdrive/ui/replay/framereader.cc index f921041c92..e896e686aa 100644 --- a/selfdrive/ui/replay/framereader.cc +++ b/selfdrive/ui/replay/framereader.cc @@ -3,6 +3,9 @@ #include #include +#include + + static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) { std::mutex *mutex = (std::mutex *)*arg; switch (op) { @@ -21,122 +24,173 @@ static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) { return 0; } -FrameReader::FrameReader(const std::string &fn) : url(fn) { - int ret = av_lockmgr_register(ffmpeg_lockmgr_cb); - assert(ret >= 0); +class AVInitializer { +public: + AVInitializer() { + int ret = av_lockmgr_register(ffmpeg_lockmgr_cb); + assert(ret >= 0); + av_register_all(); + avformat_network_init(); + } - avformat_network_init(); - av_register_all(); + ~AVInitializer() { avformat_network_deinit(); } +}; + +static AVInitializer av_initializer; + +FrameReader::FrameReader(const std::string &url, QObject *parent) : url_(url), QObject(parent) { + process_thread_ = QThread::create(&FrameReader::process, this); + connect(process_thread_, &QThread::finished, process_thread_, &QThread::deleteLater); + process_thread_->start(); } FrameReader::~FrameReader() { + // wait until thread is finished. exit_ = true; - thread.join(); - for (auto &f : frames) { - delete f->pkt; - if (f->picture) { - av_frame_free(&f->picture); + process_thread_->wait(); + cv_decode_.notify_all(); + cv_frame_.notify_all(); + if (decode_thread_.joinable()) { + decode_thread_.join(); + } + + // free all. + for (auto &f : frames_) { + av_free_packet(&f.pkt); + if (f.data) { + delete[] f.data; } - delete f; } - avcodec_free_context(&pCodecCtx); - avformat_free_context(pFormatCtx); - sws_freeContext(sws_ctx); - avformat_network_deinit(); + while (!buffer_pool.empty()) { + delete[] buffer_pool.front(); + buffer_pool.pop(); + } + av_frame_free(&frmRgb_); + avcodec_close(pCodecCtx_); + avcodec_free_context(&pCodecCtx_); + avformat_close_input(&pFormatCtx_); + sws_freeContext(sws_ctx_); } void FrameReader::process() { - if (avformat_open_input(&pFormatCtx, url.c_str(), NULL, NULL) != 0) { - fprintf(stderr, "error loading %s\n", url.c_str()); - valid = false; - return; + if (processFrames()) { + decode_thread_ = std::thread(&FrameReader::decodeThread, this); } - avformat_find_stream_info(pFormatCtx, NULL); - av_dump_format(pFormatCtx, 0, url.c_str(), 0); + if (!exit_) { + emit finished(); + } +} - auto pCodecCtxOrig = pFormatCtx->streams[0]->codec; +bool FrameReader::processFrames() { + if (avformat_open_input(&pFormatCtx_, url_.c_str(), NULL, NULL) != 0) { + qDebug() << "error loading " << url_.c_str(); + return false; + } + avformat_find_stream_info(pFormatCtx_, NULL); + av_dump_format(pFormatCtx_, 0, url_.c_str(), 0); + + auto pCodecCtxOrig = pFormatCtx_->streams[0]->codec; auto pCodec = avcodec_find_decoder(pCodecCtxOrig->codec_id); - assert(pCodec != NULL); + assert(pCodec); - pCodecCtx = avcodec_alloc_context3(pCodec); - int ret = avcodec_copy_context(pCodecCtx, pCodecCtxOrig); + pCodecCtx_ = avcodec_alloc_context3(pCodec); + int ret = avcodec_copy_context(pCodecCtx_, pCodecCtxOrig); assert(ret == 0); - ret = avcodec_open2(pCodecCtx, pCodec, NULL); + ret = avcodec_open2(pCodecCtx_, pCodec, NULL); assert(ret >= 0); width = pCodecCtxOrig->width; height = pCodecCtxOrig->height; - sws_ctx = sws_getContext(width, height, AV_PIX_FMT_YUV420P, - width, height, AV_PIX_FMT_BGR24, - SWS_BILINEAR, NULL, NULL, NULL); - assert(sws_ctx != NULL); + sws_ctx_ = sws_getContext(width, height, AV_PIX_FMT_YUV420P, + width, height, AV_PIX_FMT_BGR24, + SWS_BILINEAR, NULL, NULL, NULL); + assert(sws_ctx_); + + frmRgb_ = av_frame_alloc(); + assert(frmRgb_); + frames_.reserve(60 * 20); // 20fps, one minute do { - AVPacket *pkt = new AVPacket; - if (av_read_frame(pFormatCtx, pkt) < 0) { - delete pkt; + Frame &frame = frames_.emplace_back(); + if (av_read_frame(pFormatCtx_, &frame.pkt) < 0) { + frames_.pop_back(); break; } - Frame *frame = new Frame; - frame->pkt = pkt; - frames.push_back(frame); - } while (true); + } while (!exit_); - printf("framereader download done\n"); + valid_ = !exit_; + return valid_; +} + +uint8_t *FrameReader::get(int idx) { + if (!valid_ || idx < 0 || idx >= frames_.size()) { + return nullptr; + } - thread = std::thread(&FrameReader::decodeThread, this); + { + std::unique_lock lk(mutex_); + decode_idx_ = idx; + cv_decode_.notify_one(); + cv_frame_.wait(lk, [=] { return exit_ || frames_[idx].data || frames_[idx].failed; }); + } + + return frames_[idx].data; } void FrameReader::decodeThread() { + int idx = 0; while (!exit_) { - int gop = 0; - { - std::unique_lock lk(mutex); - cv_decode.wait(lk, [=] { return exit_ || decode_idx != -1; }); - if (exit_) break; - - gop = std::max(decode_idx - decode_idx % 15, 0); - decode_idx = -1; + const int from = std::max(idx, 0); + const int to = std::min(idx + 20, (int)frames_.size()); + for (int i = 0; i < frames_.size() && !exit_; ++i) { + Frame &frame = frames_[i]; + if (i >= from && i < to) { + if (frame.data || frame.failed) continue; + + uint8_t *dat = decodeFrame(&frame.pkt); + std::unique_lock lk(mutex_); + frame.data = dat; + frame.failed = !dat; + cv_frame_.notify_all(); + } else if (frame.data) { + buffer_pool.push(frame.data); + frame.data = nullptr; + frame.failed = false; + } } - for (int i = gop; i < std::min(gop + 15, (int)frames.size()); ++i) { - if (frames[i]->picture != nullptr) continue; - - int frameFinished; - AVFrame *pFrame = av_frame_alloc(); - avcodec_decode_video2(pCodecCtx, pFrame, &frameFinished, frames[i]->pkt); - AVFrame *picture = toRGB(pFrame); - av_frame_free(&pFrame); - - std::unique_lock lk(mutex); - frames[i]->picture = picture; - cv_frame.notify_all(); - } + // sleep & wait + std::unique_lock lk(mutex_); + cv_decode_.wait(lk, [=] { return exit_ || decode_idx_ != -1; }); + idx = decode_idx_; + decode_idx_ = -1; } } -AVFrame *FrameReader::toRGB(AVFrame *pFrame) { - AVFrame *pFrameRGB = av_frame_alloc(); - int numBytes = avpicture_get_size(AV_PIX_FMT_BGR24, pFrame->width, pFrame->height); - uint8_t *buffer = (uint8_t *)av_malloc(numBytes * sizeof(uint8_t)); - avpicture_fill((AVPicture *)pFrameRGB, buffer, AV_PIX_FMT_BGR24, pFrame->width, pFrame->height); - sws_scale(sws_ctx, (uint8_t const *const *)pFrame->data, - pFrame->linesize, 0, pFrame->height, - pFrameRGB->data, pFrameRGB->linesize); - return pFrameRGB; -} +uint8_t *FrameReader::decodeFrame(AVPacket *pkt) { + int gotFrame; + AVFrame *f = av_frame_alloc(); + avcodec_decode_video2(pCodecCtx_, f, &gotFrame, pkt); + + uint8_t *dat = nullptr; + if (gotFrame) { + if (!buffer_pool.empty()) { + dat = buffer_pool.front(); + buffer_pool.pop(); + } else { + dat = new uint8_t[getRGBSize()]; + } -uint8_t *FrameReader::get(int idx) { - if (!valid || idx < 0 || idx >= frames.size()) return nullptr; - - std::unique_lock lk(mutex); - decode_idx = idx; - cv_decode.notify_one(); - Frame *frame = frames[idx]; - if (!frame->picture) { - cv_frame.wait(lk, [=] { return exit_ || frame->picture != nullptr; }); + int ret = avpicture_fill((AVPicture *)frmRgb_, dat, AV_PIX_FMT_BGR24, f->width, f->height); + assert(ret > 0); + if (sws_scale(sws_ctx_, (const uint8_t **)f->data, f->linesize, 0, + f->height, frmRgb_->data, frmRgb_->linesize) <= 0) { + delete[] dat; + dat = nullptr; + } } - return frame->picture ? frame->picture->data[0] : nullptr; + av_frame_free(&f); + return dat; } diff --git a/selfdrive/ui/replay/framereader.h b/selfdrive/ui/replay/framereader.h index dd39416310..30160f5b36 100644 --- a/selfdrive/ui/replay/framereader.h +++ b/selfdrive/ui/replay/framereader.h @@ -5,10 +5,13 @@ #include #include #include +#include #include #include #include +#include + // independent of QT, needs ffmpeg extern "C" { #include @@ -16,38 +19,47 @@ extern "C" { #include } +class FrameReader : public QObject { + Q_OBJECT -class FrameReader { public: - FrameReader(const std::string &fn); + FrameReader(const std::string &url, QObject *parent = nullptr); ~FrameReader(); uint8_t *get(int idx); - AVFrame *toRGB(AVFrame *); - int getRGBSize() { return width*height*3; } - void process(); + int getRGBSize() { return width * height * 3; } + bool valid() const { return valid_; } int width = 0, height = 0; +signals: + void finished(); + private: + void process(); + bool processFrames(); void decodeThread(); + uint8_t *decodeFrame(AVPacket *pkt); - struct Frame{ - AVPacket *pkt; - AVFrame *picture; + struct Frame { + AVPacket pkt = {}; + uint8_t *data = nullptr; + bool failed = false; }; - std::vector frames; + std::vector frames_; - AVFormatContext *pFormatCtx = NULL; - AVCodecContext *pCodecCtx = NULL; - struct SwsContext *sws_ctx = NULL; + AVFormatContext *pFormatCtx_ = NULL; + AVCodecContext *pCodecCtx_ = NULL; + AVFrame *frmRgb_ = nullptr; + std::queue buffer_pool; + struct SwsContext *sws_ctx_ = NULL; - std::mutex mutex; - std::condition_variable cv_decode; - std::condition_variable cv_frame; - int decode_idx = -1; + std::mutex mutex_; + std::condition_variable cv_decode_; + std::condition_variable cv_frame_; + int decode_idx_ = 0; std::atomic exit_ = false; - std::thread thread; - - bool valid = true; - std::string url; + bool valid_ = false; + std::string url_; + QThread *process_thread_; + std::thread decode_thread_; }; diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index f4fd778dc8..95fad13522 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -76,19 +76,11 @@ void Replay::addSegment(int n) { QObject::connect(t, &QThread::started, lrs[n], &LogReader::process); t->start(); - QThread *frame_thread = QThread::create([=]{ - FrameReader *frame_reader = new FrameReader(qPrintable(camera_paths.at(n).toString())); - frame_reader->process(); - frs.insert(n, frame_reader); - }); - QObject::connect(frame_thread, &QThread::finished, frame_thread, &QThread::deleteLater); - frame_thread->start(); - - + frs[n] = new FrameReader(qPrintable(camera_paths.at(n).toString()), this); } void Replay::removeSegment(int n) { - // TODO: fix FrameReader and LogReader destructors + // TODO: fix LogReader destructors /* if (lrs.contains(n)) { auto lr = lrs.take(n); @@ -255,8 +247,6 @@ void Replay::stream() { auto pp = *it_; if (frs.find(pp.first) != frs.end()) { auto frm = frs[pp.first]; - auto data = frm->get(pp.second); - if (vipc_server == nullptr) { cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT); cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err)); @@ -267,10 +257,13 @@ void Replay::stream() { vipc_server->start_listener(); } - VisionIpcBufExtra extra = {}; - VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK); - memcpy(buf->addr, data, frm->getRGBSize()); - vipc_server->send(buf, &extra, false); + uint8_t *dat = frm->get(pp.second); + if (dat) { + VisionIpcBufExtra extra = {}; + VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK); + memcpy(buf->addr, dat, frm->getRGBSize()); + vipc_server->send(buf, &extra, false); + } } } }