diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index ec1caf7b16..f4fd778dc8 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -76,7 +76,15 @@ void Replay::addSegment(int n) { QObject::connect(t, &QThread::started, lrs[n], &LogReader::process); t->start(); - frs.insert(n, new FrameReader(qPrintable(camera_paths.at(n).toString()))); + QThread *frame_thread = QThread::create([=]{ + FrameReader *frame_reader = new FrameReader(qPrintable(camera_paths.at(n).toString())); + frame_reader->process(); + frs.insert(n, frame_reader); + }); + QObject::connect(frame_thread, &QThread::finished, frame_thread, &QThread::deleteLater); + frame_thread->start(); + + } void Replay::removeSegment(int n) { @@ -86,10 +94,6 @@ void Replay::removeSegment(int n) { auto lr = lrs.take(n); delete lr; } - if (frs.contains(n)) { - auto fr = frs.take(n); - delete fr; - } events_lock.lockForWrite(); auto eit = events.begin(); @@ -102,6 +106,10 @@ void Replay::removeSegment(int n) { } events_lock.unlock(); */ + if (frs.contains(n)) { + auto fr = frs.take(n); + delete fr; + } } void Replay::start(){ diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index e7c81cbdc6..7b86ee2ece 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -41,7 +41,7 @@ private: uint64_t route_start_ts; std::atomic seek_ts = 0; std::atomic current_ts = 0; - std::atomic current_segment; + std::atomic current_segment = 0; QThread *thread; QThread *kb_thread; diff --git a/tools/clib/framereader.cc b/tools/clib/framereader.cc index 4b5af4466d..f4bc63701b 100644 --- a/tools/clib/framereader.cc +++ b/tools/clib/framereader.cc @@ -4,125 +4,114 @@ #include static int ffmpeg_lockmgr_cb(void **arg, enum AVLockOp op) { - pthread_mutex_t *mutex = (pthread_mutex_t *)*arg; - int err; - + std::mutex *mutex = (std::mutex *)*arg; switch (op) { case AV_LOCK_CREATE: - mutex = (pthread_mutex_t *)malloc(sizeof(*mutex)); - if (!mutex) - return AVERROR(ENOMEM); - if ((err = pthread_mutex_init(mutex, NULL))) { - free(mutex); - return AVERROR(err); - } - *arg = mutex; - return 0; + mutex = new std::mutex(); + break; case AV_LOCK_OBTAIN: - if ((err = pthread_mutex_lock(mutex))) - return AVERROR(err); - - return 0; + mutex->lock(); + break; case AV_LOCK_RELEASE: - if ((err = pthread_mutex_unlock(mutex))) - return AVERROR(err); - - return 0; + mutex->unlock(); case AV_LOCK_DESTROY: - if (mutex) - pthread_mutex_destroy(mutex); - free(mutex); - *arg = NULL; - return 0; + delete mutex; + break; } - return 1; + return 0; } -FrameReader::FrameReader(const char *fn) { - int ret; - - ret = av_lockmgr_register(ffmpeg_lockmgr_cb); +FrameReader::FrameReader(const std::string &fn) : url(fn) { + int ret = av_lockmgr_register(ffmpeg_lockmgr_cb); assert(ret >= 0); avformat_network_init(); av_register_all(); - - snprintf(url, sizeof(url)-1,"%s",fn); - t = new std::thread([&]() { this->loaderThread(); }); } -void FrameReader::loaderThread() { - int ret; +FrameReader::~FrameReader() { + exit_ = true; + thread.join(); + for (auto &f : frames) { + delete f->pkt; + if (f->picture) { + av_frame_free(&f->picture); + } + delete f; + } + avcodec_free_context(&pCodecCtx); + avformat_free_context(pFormatCtx); + sws_freeContext(sws_ctx); + avformat_network_deinit(); +} - if (avformat_open_input(&pFormatCtx, url, NULL, NULL) != 0) { - fprintf(stderr, "error loading %s\n", url); +void FrameReader::process() { + if (avformat_open_input(&pFormatCtx, url.c_str(), NULL, NULL) != 0) { + fprintf(stderr, "error loading %s\n", url.c_str()); valid = false; return; } - av_dump_format(pFormatCtx, 0, url, 0); + avformat_find_stream_info(pFormatCtx, NULL); + av_dump_format(pFormatCtx, 0, url.c_str(), 0); auto pCodecCtxOrig = pFormatCtx->streams[0]->codec; auto pCodec = avcodec_find_decoder(pCodecCtxOrig->codec_id); assert(pCodec != NULL); pCodecCtx = avcodec_alloc_context3(pCodec); - ret = avcodec_copy_context(pCodecCtx, pCodecCtxOrig); + int ret = avcodec_copy_context(pCodecCtx, pCodecCtxOrig); assert(ret == 0); ret = avcodec_open2(pCodecCtx, pCodec, NULL); assert(ret >= 0); + width = pCodecCtxOrig->width; + height = pCodecCtxOrig->height; + sws_ctx = sws_getContext(width, height, AV_PIX_FMT_YUV420P, width, height, AV_PIX_FMT_BGR24, SWS_BILINEAR, NULL, NULL, NULL); assert(sws_ctx != NULL); - AVPacket *pkt = (AVPacket *)malloc(sizeof(AVPacket)); - assert(pkt != NULL); - bool first = true; - while (av_read_frame(pFormatCtx, pkt)>=0) { - //printf("%d pkt %d %d\n", pkts.size(), pkt->size, pkt->pos); - if (first) { - AVFrame *pFrame = av_frame_alloc(); - int frameFinished; - avcodec_decode_video2(pCodecCtx, pFrame, &frameFinished, pkt); - first = false; + do { + AVPacket *pkt = new AVPacket; + if (av_read_frame(pFormatCtx, pkt) < 0) { + delete pkt; + break; } - pkts.push_back(pkt); - pkt = (AVPacket *)malloc(sizeof(AVPacket)); - assert(pkt != NULL); - } - free(pkt); + Frame *frame = new Frame{.pkt = pkt}; + frames.push_back(frame); + } while (true); + printf("framereader download done\n"); - joined = true; - // cache - while (1) { - GOPCache(to_cache.get()); - } + thread = std::thread(&FrameReader::decodeThread, this); } +void FrameReader::decodeThread() { + while (!exit_) { + int gop = 0; + { + std::unique_lock lk(mutex); + cv_decode.wait(lk, [=] { return exit_ || decode_idx != -1; }); + if (exit_) break; -void FrameReader::GOPCache(int idx) { - AVFrame *pFrame; - int gop = idx - idx%15; + gop = std::max(decode_idx - decode_idx % 15, 0); + decode_idx = -1; + } - mcache.lock(); - bool has_gop = cache.find(gop) != cache.end(); - mcache.unlock(); + for (int i = gop; i < std::min(gop + 15, (int)frames.size()); ++i) { + if (frames[i]->picture != nullptr) continue; - if (!has_gop) { - //printf("caching %d\n", gop); - for (int i = gop; i < gop+15; i++) { - if (i >= pkts.size()) break; - //printf("decode %d\n", i); int frameFinished; - pFrame = av_frame_alloc(); - avcodec_decode_video2(pCodecCtx, pFrame, &frameFinished, pkts[i]); - uint8_t *dat = toRGB(pFrame)->data[0]; - mcache.lock(); - cache.insert(std::make_pair(i, dat)); - mcache.unlock(); + AVFrame *pFrame = av_frame_alloc(); + avcodec_decode_video2(pCodecCtx, pFrame, &frameFinished, frames[i]->pkt); + AVFrame *picture = toRGB(pFrame); + av_frame_free(&pFrame); + + std::unique_lock lk(mutex); + frames[i]->picture = picture; + cv_frame.notify_all(); } } } @@ -130,48 +119,23 @@ void FrameReader::GOPCache(int idx) { AVFrame *FrameReader::toRGB(AVFrame *pFrame) { AVFrame *pFrameRGB = av_frame_alloc(); int numBytes = avpicture_get_size(AV_PIX_FMT_BGR24, pFrame->width, pFrame->height); - uint8_t *buffer = (uint8_t *)av_malloc(numBytes*sizeof(uint8_t)); + uint8_t *buffer = (uint8_t *)av_malloc(numBytes * sizeof(uint8_t)); avpicture_fill((AVPicture *)pFrameRGB, buffer, AV_PIX_FMT_BGR24, pFrame->width, pFrame->height); - sws_scale(sws_ctx, (uint8_t const * const *)pFrame->data, - pFrame->linesize, 0, pFrame->height, - pFrameRGB->data, pFrameRGB->linesize); + sws_scale(sws_ctx, (uint8_t const *const *)pFrame->data, + pFrame->linesize, 0, pFrame->height, + pFrameRGB->data, pFrameRGB->linesize); return pFrameRGB; } uint8_t *FrameReader::get(int idx) { - if (!valid) return NULL; - waitForReady(); - // TODO: one line? - uint8_t *dat = NULL; - - // lookahead - to_cache.put(idx); - to_cache.put(idx+15); - - mcache.lock(); - auto it = cache.find(idx); - if (it != cache.end()) { - dat = it->second; + if (!valid || idx < 0 || idx >= frames.size()) return nullptr; + + std::unique_lock lk(mutex); + decode_idx = idx; + cv_decode.notify_one(); + Frame *frame = frames[idx]; + if (!frame->picture) { + cv_frame.wait(lk, [=] { return exit_ || frame->picture != nullptr; }); } - mcache.unlock(); - - if (dat == NULL) { - to_cache.put_front(idx); - // lookahead - while (dat == NULL) { - // wait for frame - usleep(50*1000); - // check for frame - mcache.lock(); - auto it = cache.find(idx); - if (it != cache.end()) dat = it->second; - mcache.unlock(); - if (dat == NULL) { - printf("."); - fflush(stdout); - } - } - } - return dat; + return frame->picture ? frame->picture->data[0] : nullptr; } - diff --git a/tools/clib/framereader.h b/tools/clib/framereader.h index e5870c25e1..dd39416310 100644 --- a/tools/clib/framereader.h +++ b/tools/clib/framereader.h @@ -1,15 +1,13 @@ -#ifndef FRAMEREADER_HPP -#define FRAMEREADER_HPP +#pragma once #include -#include -#include -#include -#include -#include -#include -#include "tools/clib/channel.h" +#include +#include +#include +#include +#include +#include // independent of QT, needs ffmpeg extern "C" { @@ -21,40 +19,35 @@ extern "C" { class FrameReader { public: - FrameReader(const char *fn); + FrameReader(const std::string &fn); + ~FrameReader(); uint8_t *get(int idx); AVFrame *toRGB(AVFrame *); - void waitForReady() { - while (!joined) usleep(10*1000); - } int getRGBSize() { return width*height*3; } - void loaderThread(); - void cacherThread(); + void process(); - //TODO: get this from the actual frame - int width = 1164; - int height = 874; + int width = 0, height = 0; private: + void decodeThread(); + + struct Frame{ + AVPacket *pkt; + AVFrame *picture; + }; + std::vector frames; + AVFormatContext *pFormatCtx = NULL; AVCodecContext *pCodecCtx = NULL; - struct SwsContext *sws_ctx = NULL; - std::vector pkts; - - std::thread *t; - bool joined = false; - - std::map cache; - std::mutex mcache; - - void GOPCache(int idx); - channel to_cache; + std::mutex mutex; + std::condition_variable cv_decode; + std::condition_variable cv_frame; + int decode_idx = -1; + std::atomic exit_ = false; + std::thread thread; bool valid = true; - char url[0x400]; + std::string url; }; - -#endif -