Refactor FrameReader (#21002)

* Refactor FrameReader

* decodeThread

* delete frame

* remove joined

* continue

* less diff

* robust cv wait

* cache fist 15 frames

* notify_all

* rename variables

* need call avformat_find_stream_info before dump_format

* get width&height from codec

* use std::string

* delete in removeSegment

* use std::mutex in lockmgr_cb

* fix wrong min/max

* no get in process

* cleanup

* always notify decodeThread to do prefetch
old-commit-hash: 5540dcae78
commatwo_master
Dean Lee 4 years ago committed by GitHub
parent c2a95c9d2c
commit 7b253433f2
  1. 18
      selfdrive/ui/replay/replay.cc
  2. 2
      selfdrive/ui/replay/replay.h
  3. 192
      tools/clib/framereader.cc
  4. 59
      tools/clib/framereader.h

@ -76,7 +76,15 @@ void Replay::addSegment(int n) {
QObject::connect(t, &QThread::started, lrs[n], &LogReader::process);
t->start();
frs.insert(n, new FrameReader(qPrintable(camera_paths.at(n).toString())));
QThread *frame_thread = QThread::create([=]{
FrameReader *frame_reader = new FrameReader(qPrintable(camera_paths.at(n).toString()));
frame_reader->process();
frs.insert(n, frame_reader);
});
QObject::connect(frame_thread, &QThread::finished, frame_thread, &QThread::deleteLater);
frame_thread->start();
}
void Replay::removeSegment(int n) {
@ -86,10 +94,6 @@ void Replay::removeSegment(int n) {
auto lr = lrs.take(n);
delete lr;
}
if (frs.contains(n)) {
auto fr = frs.take(n);
delete fr;
}
events_lock.lockForWrite();
auto eit = events.begin();
@ -102,6 +106,10 @@ void Replay::removeSegment(int n) {
}
events_lock.unlock();
*/
if (frs.contains(n)) {
auto fr = frs.take(n);
delete fr;
}
}
void Replay::start(){

@ -41,7 +41,7 @@ private:
uint64_t route_start_ts;
std::atomic<int> seek_ts = 0;
std::atomic<int> current_ts = 0;
std::atomic<int> current_segment;
std::atomic<int> current_segment = 0;
QThread *thread;
QThread *kb_thread;

@ -4,125 +4,114 @@
#include <unistd.h>
static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) {
pthread_mutex_t *mutex = (pthread_mutex_t *)*arg;
int err;
std::mutex *mutex = (std::mutex *)*arg;
switch (op) {
case AV_LOCK_CREATE:
mutex = (pthread_mutex_t *)malloc(sizeof(*mutex));
if (!mutex)
return AVERROR(ENOMEM);
if ((err = pthread_mutex_init(mutex, NULL))) {
free(mutex);
return AVERROR(err);
}
*arg = mutex;
return 0;
mutex = new std::mutex();
break;
case AV_LOCK_OBTAIN:
if ((err = pthread_mutex_lock(mutex)))
return AVERROR(err);
return 0;
mutex->lock();
break;
case AV_LOCK_RELEASE:
if ((err = pthread_mutex_unlock(mutex)))
return AVERROR(err);
return 0;
mutex->unlock();
case AV_LOCK_DESTROY:
if (mutex)
pthread_mutex_destroy(mutex);
free(mutex);
*arg = NULL;
return 0;
delete mutex;
break;
}
return 1;
return 0;
}
FrameReader::FrameReader(const char *fn) {
int ret;
ret = av_lockmgr_register(ffmpeg_lockmgr_cb);
FrameReader::FrameReader(const std::string &fn) : url(fn) {
int ret = av_lockmgr_register(ffmpeg_lockmgr_cb);
assert(ret >= 0);
avformat_network_init();
av_register_all();
snprintf(url, sizeof(url)-1,"%s",fn);
t = new std::thread([&]() { this->loaderThread(); });
}
void FrameReader::loaderThread() {
int ret;
FrameReader::~FrameReader() {
exit_ = true;
thread.join();
for (auto &f : frames) {
delete f->pkt;
if (f->picture) {
av_frame_free(&f->picture);
}
delete f;
}
avcodec_free_context(&pCodecCtx);
avformat_free_context(pFormatCtx);
sws_freeContext(sws_ctx);
avformat_network_deinit();
}
if (avformat_open_input(&pFormatCtx, url, NULL, NULL) != 0) {
fprintf(stderr, "error loading %s\n", url);
void FrameReader::process() {
if (avformat_open_input(&pFormatCtx, url.c_str(), NULL, NULL) != 0) {
fprintf(stderr, "error loading %s\n", url.c_str());
valid = false;
return;
}
av_dump_format(pFormatCtx, 0, url, 0);
avformat_find_stream_info(pFormatCtx, NULL);
av_dump_format(pFormatCtx, 0, url.c_str(), 0);
auto pCodecCtxOrig = pFormatCtx->streams[0]->codec;
auto pCodec = avcodec_find_decoder(pCodecCtxOrig->codec_id);
assert(pCodec != NULL);
pCodecCtx = avcodec_alloc_context3(pCodec);
ret = avcodec_copy_context(pCodecCtx, pCodecCtxOrig);
int ret = avcodec_copy_context(pCodecCtx, pCodecCtxOrig);
assert(ret == 0);
ret = avcodec_open2(pCodecCtx, pCodec, NULL);
assert(ret >= 0);
width = pCodecCtxOrig->width;
height = pCodecCtxOrig->height;
sws_ctx = sws_getContext(width, height, AV_PIX_FMT_YUV420P,
width, height, AV_PIX_FMT_BGR24,
SWS_BILINEAR, NULL, NULL, NULL);
assert(sws_ctx != NULL);
AVPacket *pkt = (AVPacket *)malloc(sizeof(AVPacket));
assert(pkt != NULL);
bool first = true;
while (av_read_frame(pFormatCtx, pkt)>=0) {
//printf("%d pkt %d %d\n", pkts.size(), pkt->size, pkt->pos);
if (first) {
AVFrame *pFrame = av_frame_alloc();
int frameFinished;
avcodec_decode_video2(pCodecCtx, pFrame, &frameFinished, pkt);
first = false;
do {
AVPacket *pkt = new AVPacket;
if (av_read_frame(pFormatCtx, pkt) < 0) {
delete pkt;
break;
}
pkts.push_back(pkt);
pkt = (AVPacket *)malloc(sizeof(AVPacket));
assert(pkt != NULL);
}
free(pkt);
Frame *frame = new Frame{.pkt = pkt};
frames.push_back(frame);
} while (true);
printf("framereader download done\n");
joined = true;
// cache
while (1) {
GOPCache(to_cache.get());
}
thread = std::thread(&FrameReader::decodeThread, this);
}
void FrameReader::decodeThread() {
while (!exit_) {
int gop = 0;
{
std::unique_lock lk(mutex);
cv_decode.wait(lk, [=] { return exit_ || decode_idx != -1; });
if (exit_) break;
void FrameReader::GOPCache(int idx) {
AVFrame *pFrame;
int gop = idx - idx%15;
gop = std::max(decode_idx - decode_idx % 15, 0);
decode_idx = -1;
}
mcache.lock();
bool has_gop = cache.find(gop) != cache.end();
mcache.unlock();
for (int i = gop; i < std::min(gop + 15, (int)frames.size()); ++i) {
if (frames[i]->picture != nullptr) continue;
if (!has_gop) {
//printf("caching %d\n", gop);
for (int i = gop; i < gop+15; i++) {
if (i >= pkts.size()) break;
//printf("decode %d\n", i);
int frameFinished;
pFrame = av_frame_alloc();
avcodec_decode_video2(pCodecCtx, pFrame, &frameFinished, pkts[i]);
uint8_t *dat = toRGB(pFrame)->data[0];
mcache.lock();
cache.insert(std::make_pair(i, dat));
mcache.unlock();
AVFrame *pFrame = av_frame_alloc();
avcodec_decode_video2(pCodecCtx, pFrame, &frameFinished, frames[i]->pkt);
AVFrame *picture = toRGB(pFrame);
av_frame_free(&pFrame);
std::unique_lock lk(mutex);
frames[i]->picture = picture;
cv_frame.notify_all();
}
}
}
@ -130,48 +119,23 @@ void FrameReader::GOPCache(int idx) {
AVFrame *FrameReader::toRGB(AVFrame *pFrame) {
AVFrame *pFrameRGB = av_frame_alloc();
int numBytes = avpicture_get_size(AV_PIX_FMT_BGR24, pFrame->width, pFrame->height);
uint8_t *buffer = (uint8_t *)av_malloc(numBytes*sizeof(uint8_t));
uint8_t *buffer = (uint8_t *)av_malloc(numBytes * sizeof(uint8_t));
avpicture_fill((AVPicture *)pFrameRGB, buffer, AV_PIX_FMT_BGR24, pFrame->width, pFrame->height);
sws_scale(sws_ctx, (uint8_t const * const *)pFrame->data,
pFrame->linesize, 0, pFrame->height,
pFrameRGB->data, pFrameRGB->linesize);
sws_scale(sws_ctx, (uint8_t const *const *)pFrame->data,
pFrame->linesize, 0, pFrame->height,
pFrameRGB->data, pFrameRGB->linesize);
return pFrameRGB;
}
uint8_t *FrameReader::get(int idx) {
if (!valid) return NULL;
waitForReady();
// TODO: one line?
uint8_t *dat = NULL;
// lookahead
to_cache.put(idx);
to_cache.put(idx+15);
mcache.lock();
auto it = cache.find(idx);
if (it != cache.end()) {
dat = it->second;
if (!valid || idx < 0 || idx >= frames.size()) return nullptr;
std::unique_lock lk(mutex);
decode_idx = idx;
cv_decode.notify_one();
Frame *frame = frames[idx];
if (!frame->picture) {
cv_frame.wait(lk, [=] { return exit_ || frame->picture != nullptr; });
}
mcache.unlock();
if (dat == NULL) {
to_cache.put_front(idx);
// lookahead
while (dat == NULL) {
// wait for frame
usleep(50*1000);
// check for frame
mcache.lock();
auto it = cache.find(idx);
if (it != cache.end()) dat = it->second;
mcache.unlock();
if (dat == NULL) {
printf(".");
fflush(stdout);
}
}
}
return dat;
return frame->picture ? frame->picture->data[0] : nullptr;
}

@ -1,15 +1,13 @@
#ifndef FRAMEREADER_HPP
#define FRAMEREADER_HPP
#pragma once
#include <unistd.h>
#include <vector>
#include <map>
#include <thread>
#include <mutex>
#include <list>
#include <condition_variable>
#include "tools/clib/channel.h"
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
// independent of QT, needs ffmpeg
extern "C" {
@ -21,40 +19,35 @@ extern "C" {
class FrameReader {
public:
FrameReader(const char *fn);
FrameReader(const std::string &fn);
~FrameReader();
uint8_t *get(int idx);
AVFrame *toRGB(AVFrame *);
void waitForReady() {
while (!joined) usleep(10*1000);
}
int getRGBSize() { return width*height*3; }
void loaderThread();
void cacherThread();
void process();
//TODO: get this from the actual frame
int width = 1164;
int height = 874;
int width = 0, height = 0;
private:
void decodeThread();
struct Frame{
AVPacket *pkt;
AVFrame *picture;
};
std::vector<Frame*> frames;
AVFormatContext *pFormatCtx = NULL;
AVCodecContext *pCodecCtx = NULL;
struct SwsContext *sws_ctx = NULL;
std::vector<AVPacket *> pkts;
std::thread *t;
bool joined = false;
std::map<int, uint8_t *> cache;
std::mutex mcache;
void GOPCache(int idx);
channel<int> to_cache;
std::mutex mutex;
std::condition_variable cv_decode;
std::condition_variable cv_frame;
int decode_idx = -1;
std::atomic<bool> exit_ = false;
std::thread thread;
bool valid = true;
char url[0x400];
std::string url;
};
#endif

Loading…
Cancel
Save