Refactor FrameReader (#21002)

* Refactor FrameReader

* decodeThread

* delete frame

* remove joined

* continue

* less diff

* robust cv wait

* cache fist 15 frames

* notify_all

* rename variables

* need call avformat_find_stream_info before dump_format

* get width&height from codec

* use std::string

* delete in removeSegment

* use std::mutex in lockmgr_cb

* fix wrong min/max

* no get in process

* cleanup

* always notify decodeThread to do prefetch
pull/21006/head
Dean Lee 4 years ago committed by GitHub
parent aa0c2a1e7f
commit 5540dcae78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      selfdrive/ui/replay/replay.cc
  2. 2
      selfdrive/ui/replay/replay.h
  3. 192
      tools/clib/framereader.cc
  4. 59
      tools/clib/framereader.h

@ -76,7 +76,15 @@ void Replay::addSegment(int n) {
QObject::connect(t, &QThread::started, lrs[n], &LogReader::process); QObject::connect(t, &QThread::started, lrs[n], &LogReader::process);
t->start(); t->start();
frs.insert(n, new FrameReader(qPrintable(camera_paths.at(n).toString()))); QThread *frame_thread = QThread::create([=]{
FrameReader *frame_reader = new FrameReader(qPrintable(camera_paths.at(n).toString()));
frame_reader->process();
frs.insert(n, frame_reader);
});
QObject::connect(frame_thread, &QThread::finished, frame_thread, &QThread::deleteLater);
frame_thread->start();
} }
void Replay::removeSegment(int n) { void Replay::removeSegment(int n) {
@ -86,10 +94,6 @@ void Replay::removeSegment(int n) {
auto lr = lrs.take(n); auto lr = lrs.take(n);
delete lr; delete lr;
} }
if (frs.contains(n)) {
auto fr = frs.take(n);
delete fr;
}
events_lock.lockForWrite(); events_lock.lockForWrite();
auto eit = events.begin(); auto eit = events.begin();
@ -102,6 +106,10 @@ void Replay::removeSegment(int n) {
} }
events_lock.unlock(); events_lock.unlock();
*/ */
if (frs.contains(n)) {
auto fr = frs.take(n);
delete fr;
}
} }
void Replay::start(){ void Replay::start(){

@ -41,7 +41,7 @@ private:
uint64_t route_start_ts; uint64_t route_start_ts;
std::atomic<int> seek_ts = 0; std::atomic<int> seek_ts = 0;
std::atomic<int> current_ts = 0; std::atomic<int> current_ts = 0;
std::atomic<int> current_segment; std::atomic<int> current_segment = 0;
QThread *thread; QThread *thread;
QThread *kb_thread; QThread *kb_thread;

@ -4,125 +4,114 @@
#include <unistd.h> #include <unistd.h>
static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) { static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) {
pthread_mutex_t *mutex = (pthread_mutex_t *)*arg; std::mutex *mutex = (std::mutex *)*arg;
int err;
switch (op) { switch (op) {
case AV_LOCK_CREATE: case AV_LOCK_CREATE:
mutex = (pthread_mutex_t *)malloc(sizeof(*mutex)); mutex = new std::mutex();
if (!mutex) break;
return AVERROR(ENOMEM);
if ((err = pthread_mutex_init(mutex, NULL))) {
free(mutex);
return AVERROR(err);
}
*arg = mutex;
return 0;
case AV_LOCK_OBTAIN: case AV_LOCK_OBTAIN:
if ((err = pthread_mutex_lock(mutex))) mutex->lock();
return AVERROR(err); break;
return 0;
case AV_LOCK_RELEASE: case AV_LOCK_RELEASE:
if ((err = pthread_mutex_unlock(mutex))) mutex->unlock();
return AVERROR(err);
return 0;
case AV_LOCK_DESTROY: case AV_LOCK_DESTROY:
if (mutex) delete mutex;
pthread_mutex_destroy(mutex); break;
free(mutex);
*arg = NULL;
return 0;
} }
return 1; return 0;
} }
FrameReader::FrameReader(const char *fn) { FrameReader::FrameReader(const std::string &fn) : url(fn) {
int ret; int ret = av_lockmgr_register(ffmpeg_lockmgr_cb);
ret = av_lockmgr_register(ffmpeg_lockmgr_cb);
assert(ret >= 0); assert(ret >= 0);
avformat_network_init(); avformat_network_init();
av_register_all(); av_register_all();
snprintf(url, sizeof(url)-1,"%s",fn);
t = new std::thread([&]() { this->loaderThread(); });
} }
void FrameReader::loaderThread() { FrameReader::~FrameReader() {
int ret; exit_ = true;
thread.join();
for (auto &f : frames) {
delete f->pkt;
if (f->picture) {
av_frame_free(&f->picture);
}
delete f;
}
avcodec_free_context(&pCodecCtx);
avformat_free_context(pFormatCtx);
sws_freeContext(sws_ctx);
avformat_network_deinit();
}
if (avformat_open_input(&pFormatCtx, url, NULL, NULL) != 0) { void FrameReader::process() {
fprintf(stderr, "error loading %s\n", url); if (avformat_open_input(&pFormatCtx, url.c_str(), NULL, NULL) != 0) {
fprintf(stderr, "error loading %s\n", url.c_str());
valid = false; valid = false;
return; return;
} }
av_dump_format(pFormatCtx, 0, url, 0); avformat_find_stream_info(pFormatCtx, NULL);
av_dump_format(pFormatCtx, 0, url.c_str(), 0);
auto pCodecCtxOrig = pFormatCtx->streams[0]->codec; auto pCodecCtxOrig = pFormatCtx->streams[0]->codec;
auto pCodec = avcodec_find_decoder(pCodecCtxOrig->codec_id); auto pCodec = avcodec_find_decoder(pCodecCtxOrig->codec_id);
assert(pCodec != NULL); assert(pCodec != NULL);
pCodecCtx = avcodec_alloc_context3(pCodec); pCodecCtx = avcodec_alloc_context3(pCodec);
ret = avcodec_copy_context(pCodecCtx, pCodecCtxOrig); int ret = avcodec_copy_context(pCodecCtx, pCodecCtxOrig);
assert(ret == 0); assert(ret == 0);
ret = avcodec_open2(pCodecCtx, pCodec, NULL); ret = avcodec_open2(pCodecCtx, pCodec, NULL);
assert(ret >= 0); assert(ret >= 0);
width = pCodecCtxOrig->width;
height = pCodecCtxOrig->height;
sws_ctx = sws_getContext(width, height, AV_PIX_FMT_YUV420P, sws_ctx = sws_getContext(width, height, AV_PIX_FMT_YUV420P,
width, height, AV_PIX_FMT_BGR24, width, height, AV_PIX_FMT_BGR24,
SWS_BILINEAR, NULL, NULL, NULL); SWS_BILINEAR, NULL, NULL, NULL);
assert(sws_ctx != NULL); assert(sws_ctx != NULL);
AVPacket *pkt = (AVPacket *)malloc(sizeof(AVPacket)); do {
assert(pkt != NULL); AVPacket *pkt = new AVPacket;
bool first = true; if (av_read_frame(pFormatCtx, pkt) < 0) {
while (av_read_frame(pFormatCtx, pkt)>=0) { delete pkt;
//printf("%d pkt %d %d\n", pkts.size(), pkt->size, pkt->pos); break;
if (first) {
AVFrame *pFrame = av_frame_alloc();
int frameFinished;
avcodec_decode_video2(pCodecCtx, pFrame, &frameFinished, pkt);
first = false;
} }
pkts.push_back(pkt); Frame *frame = new Frame{.pkt = pkt};
pkt = (AVPacket *)malloc(sizeof(AVPacket)); frames.push_back(frame);
assert(pkt != NULL); } while (true);
}
free(pkt);
printf("framereader download done\n"); printf("framereader download done\n");
joined = true;
// cache thread = std::thread(&FrameReader::decodeThread, this);
while (1) {
GOPCache(to_cache.get());
}
} }
void FrameReader::decodeThread() {
while (!exit_) {
int gop = 0;
{
std::unique_lock lk(mutex);
cv_decode.wait(lk, [=] { return exit_ || decode_idx != -1; });
if (exit_) break;
void FrameReader::GOPCache(int idx) { gop = std::max(decode_idx - decode_idx % 15, 0);
AVFrame *pFrame; decode_idx = -1;
int gop = idx - idx%15; }
mcache.lock(); for (int i = gop; i < std::min(gop + 15, (int)frames.size()); ++i) {
bool has_gop = cache.find(gop) != cache.end(); if (frames[i]->picture != nullptr) continue;
mcache.unlock();
if (!has_gop) {
//printf("caching %d\n", gop);
for (int i = gop; i < gop+15; i++) {
if (i >= pkts.size()) break;
//printf("decode %d\n", i);
int frameFinished; int frameFinished;
pFrame = av_frame_alloc(); AVFrame *pFrame = av_frame_alloc();
avcodec_decode_video2(pCodecCtx, pFrame, &frameFinished, pkts[i]); avcodec_decode_video2(pCodecCtx, pFrame, &frameFinished, frames[i]->pkt);
uint8_t *dat = toRGB(pFrame)->data[0]; AVFrame *picture = toRGB(pFrame);
mcache.lock(); av_frame_free(&pFrame);
cache.insert(std::make_pair(i, dat));
mcache.unlock(); std::unique_lock lk(mutex);
frames[i]->picture = picture;
cv_frame.notify_all();
} }
} }
} }
@ -130,48 +119,23 @@ void FrameReader::GOPCache(int idx) {
AVFrame *FrameReader::toRGB(AVFrame *pFrame) { AVFrame *FrameReader::toRGB(AVFrame *pFrame) {
AVFrame *pFrameRGB = av_frame_alloc(); AVFrame *pFrameRGB = av_frame_alloc();
int numBytes = avpicture_get_size(AV_PIX_FMT_BGR24, pFrame->width, pFrame->height); int numBytes = avpicture_get_size(AV_PIX_FMT_BGR24, pFrame->width, pFrame->height);
uint8_t *buffer = (uint8_t *)av_malloc(numBytes*sizeof(uint8_t)); uint8_t *buffer = (uint8_t *)av_malloc(numBytes * sizeof(uint8_t));
avpicture_fill((AVPicture *)pFrameRGB, buffer, AV_PIX_FMT_BGR24, pFrame->width, pFrame->height); avpicture_fill((AVPicture *)pFrameRGB, buffer, AV_PIX_FMT_BGR24, pFrame->width, pFrame->height);
sws_scale(sws_ctx, (uint8_t const * const *)pFrame->data, sws_scale(sws_ctx, (uint8_t const *const *)pFrame->data,
pFrame->linesize, 0, pFrame->height, pFrame->linesize, 0, pFrame->height,
pFrameRGB->data, pFrameRGB->linesize); pFrameRGB->data, pFrameRGB->linesize);
return pFrameRGB; return pFrameRGB;
} }
uint8_t *FrameReader::get(int idx) { uint8_t *FrameReader::get(int idx) {
if (!valid) return NULL; if (!valid || idx < 0 || idx >= frames.size()) return nullptr;
waitForReady();
// TODO: one line? std::unique_lock lk(mutex);
uint8_t *dat = NULL; decode_idx = idx;
cv_decode.notify_one();
// lookahead Frame *frame = frames[idx];
to_cache.put(idx); if (!frame->picture) {
to_cache.put(idx+15); cv_frame.wait(lk, [=] { return exit_ || frame->picture != nullptr; });
mcache.lock();
auto it = cache.find(idx);
if (it != cache.end()) {
dat = it->second;
} }
mcache.unlock(); return frame->picture ? frame->picture->data[0] : nullptr;
if (dat == NULL) {
to_cache.put_front(idx);
// lookahead
while (dat == NULL) {
// wait for frame
usleep(50*1000);
// check for frame
mcache.lock();
auto it = cache.find(idx);
if (it != cache.end()) dat = it->second;
mcache.unlock();
if (dat == NULL) {
printf(".");
fflush(stdout);
}
}
}
return dat;
} }

@ -1,15 +1,13 @@
#ifndef FRAMEREADER_HPP #pragma once
#define FRAMEREADER_HPP
#include <unistd.h> #include <unistd.h>
#include <vector>
#include <map>
#include <thread>
#include <mutex>
#include <list>
#include <condition_variable>
#include "tools/clib/channel.h" #include <atomic>
#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
// independent of QT, needs ffmpeg // independent of QT, needs ffmpeg
extern "C" { extern "C" {
@ -21,40 +19,35 @@ extern "C" {
class FrameReader { class FrameReader {
public: public:
FrameReader(const char *fn); FrameReader(const std::string &fn);
~FrameReader();
uint8_t *get(int idx); uint8_t *get(int idx);
AVFrame *toRGB(AVFrame *); AVFrame *toRGB(AVFrame *);
void waitForReady() {
while (!joined) usleep(10*1000);
}
int getRGBSize() { return width*height*3; } int getRGBSize() { return width*height*3; }
void loaderThread(); void process();
void cacherThread();
//TODO: get this from the actual frame int width = 0, height = 0;
int width = 1164;
int height = 874;
private: private:
void decodeThread();
struct Frame{
AVPacket *pkt;
AVFrame *picture;
};
std::vector<Frame*> frames;
AVFormatContext *pFormatCtx = NULL; AVFormatContext *pFormatCtx = NULL;
AVCodecContext *pCodecCtx = NULL; AVCodecContext *pCodecCtx = NULL;
struct SwsContext *sws_ctx = NULL; struct SwsContext *sws_ctx = NULL;
std::vector<AVPacket *> pkts; std::mutex mutex;
std::condition_variable cv_decode;
std::thread *t; std::condition_variable cv_frame;
bool joined = false; int decode_idx = -1;
std::atomic<bool> exit_ = false;
std::map<int, uint8_t *> cache; std::thread thread;
std::mutex mcache;
void GOPCache(int idx);
channel<int> to_cache;
bool valid = true; bool valid = true;
char url[0x400]; std::string url;
}; };
#endif

Loading…
Cancel
Save