omx encoder: move writing into thread (#23771)

* omx encoder: move writing to separate thread

* fix include

* pop

* log buffers sizes

* split copy and write
pull/23823/head
Willem Melching 3 years ago committed by GitHub
parent a6214ff3b1
commit 2f00271ce6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 132
      selfdrive/loggerd/omx_encoder.cc
  2. 14
      selfdrive/loggerd/omx_encoder.h

@ -66,7 +66,6 @@ OMX_ERRORTYPE OmxEncoder::event_handler(OMX_HANDLETYPE component, OMX_PTR app_da
OMX_ERRORTYPE OmxEncoder::empty_buffer_done(OMX_HANDLETYPE component, OMX_PTR app_data, OMX_ERRORTYPE OmxEncoder::empty_buffer_done(OMX_HANDLETYPE component, OMX_PTR app_data,
OMX_BUFFERHEADERTYPE *buffer) { OMX_BUFFERHEADERTYPE *buffer) {
// printf("empty_buffer_done\n");
OmxEncoder *e = (OmxEncoder*)app_data; OmxEncoder *e = (OmxEncoder*)app_data;
e->free_in.push(buffer); e->free_in.push(buffer);
return OMX_ErrorNone; return OMX_ErrorNone;
@ -74,7 +73,6 @@ OMX_ERRORTYPE OmxEncoder::empty_buffer_done(OMX_HANDLETYPE component, OMX_PTR ap
OMX_ERRORTYPE OmxEncoder::fill_buffer_done(OMX_HANDLETYPE component, OMX_PTR app_data, OMX_ERRORTYPE OmxEncoder::fill_buffer_done(OMX_HANDLETYPE component, OMX_PTR app_data,
OMX_BUFFERHEADERTYPE *buffer) { OMX_BUFFERHEADERTYPE *buffer) {
// printf("fill_buffer_done\n");
OmxEncoder *e = (OmxEncoder*)app_data; OmxEncoder *e = (OmxEncoder*)app_data;
e->done_out.push(buffer); e->done_out.push(buffer);
return OMX_ErrorNone; return OMX_ErrorNone;
@ -155,7 +153,6 @@ static const char* omx_color_fomat_name(uint32_t format) {
// ***** encoder functions ***** // ***** encoder functions *****
OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write) { OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write) {
this->filename = filename; this->filename = filename;
this->write = write; this->write = write;
@ -325,27 +322,82 @@ OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int
for (auto &buf : this->in_buf_headers) { for (auto &buf : this->in_buf_headers) {
this->free_in.push(buf); this->free_in.push(buf);
} }
LOGE("omx initialized - in: %d - out %d", this->in_buf_headers.size(), this->out_buf_headers.size());
}
void OmxEncoder::callback_handler(OmxEncoder *e) {
// OMX documentation specifies to not empty the buffer from the callback function
// so we use this intermediate handler to copy the buffer for further writing
// and give it back to OMX. We could also send the data over msgq from here.
bool exit = false;
while (!exit) {
OMX_BUFFERHEADERTYPE *buffer = e->done_out.pop();
OmxBuffer *new_buffer = (OmxBuffer*)malloc(sizeof(OmxBuffer) + buffer->nFilledLen);
assert(new_buffer);
new_buffer->header = *buffer;
memcpy(new_buffer->data, buffer->pBuffer + buffer->nOffset, buffer->nFilledLen);
e->to_write.push(new_buffer);
#ifdef QCOM2
if (buffer->nFlags & OMX_BUFFERFLAG_CODECCONFIG) {
buffer->nTimeStamp = 0;
}
if (buffer->nFlags & OMX_BUFFERFLAG_EOS) {
buffer->nTimeStamp = 0;
}
#endif
if (buffer->nFlags & OMX_BUFFERFLAG_EOS) {
exit = true;
}
// give omx back the buffer
// TOOD: fails when shutting down
OMX_CHECK(OMX_FillThisBuffer(e->handle, buffer));
}
} }
void OmxEncoder::handle_out_buf(OmxEncoder *e, OMX_BUFFERHEADERTYPE *out_buf) {
void OmxEncoder::write_handler(OmxEncoder *e){
bool exit = false;
while (!exit) {
OmxBuffer *out_buf = e->to_write.pop();
OmxEncoder::handle_out_buf(e, out_buf);
if (out_buf->header.nFlags & OMX_BUFFERFLAG_EOS) {
exit = true;
}
free(out_buf);
}
}
void OmxEncoder::handle_out_buf(OmxEncoder *e, OmxBuffer *out_buf) {
int err; int err;
uint8_t *buf_data = out_buf->pBuffer + out_buf->nOffset;
if (out_buf->nFlags & OMX_BUFFERFLAG_CODECCONFIG) { if (out_buf->header.nFlags & OMX_BUFFERFLAG_CODECCONFIG) {
if (e->codec_config_len < out_buf->nFilledLen) { if (e->codec_config_len < out_buf->header.nFilledLen) {
e->codec_config = (uint8_t *)realloc(e->codec_config, out_buf->nFilledLen); e->codec_config = (uint8_t *)realloc(e->codec_config, out_buf->header.nFilledLen);
} }
e->codec_config_len = out_buf->nFilledLen; e->codec_config_len = out_buf->header.nFilledLen;
memcpy(e->codec_config, buf_data, out_buf->nFilledLen); memcpy(e->codec_config, out_buf->data, out_buf->header.nFilledLen);
// TODO: is still needed?
#ifdef QCOM2 #ifdef QCOM2
out_buf->nTimeStamp = 0; out_buf->header.nTimeStamp = 0;
#endif #endif
} }
if (e->of) { if (e->of) {
//printf("write %d flags 0x%x\n", out_buf->nFilledLen, out_buf->nFlags); //printf("write %d flags 0x%x\n", out_buf->nFilledLen, out_buf->nFlags);
size_t written = util::safe_fwrite(buf_data, 1, out_buf->nFilledLen, e->of); size_t written = util::safe_fwrite(out_buf->data, 1, out_buf->header.nFilledLen, e->of);
if (written != out_buf->nFilledLen) { if (written != out_buf->header.nFilledLen) {
LOGE("failed to write file.errno=%d", errno); LOGE("failed to write file.errno=%d", errno);
} }
} }
@ -365,20 +417,20 @@ void OmxEncoder::handle_out_buf(OmxEncoder *e, OMX_BUFFERHEADERTYPE *out_buf) {
e->wrote_codec_config = true; e->wrote_codec_config = true;
} }
if (out_buf->nTimeStamp > 0) { if (out_buf->header.nTimeStamp > 0) {
// input timestamps are in microseconds // input timestamps are in microseconds
AVRational in_timebase = {1, 1000000}; AVRational in_timebase = {1, 1000000};
AVPacket pkt; AVPacket pkt;
av_init_packet(&pkt); av_init_packet(&pkt);
pkt.data = buf_data; pkt.data = out_buf->data;
pkt.size = out_buf->nFilledLen; pkt.size = out_buf->header.nFilledLen;
enum AVRounding rnd = static_cast<enum AVRounding>(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX); enum AVRounding rnd = static_cast<enum AVRounding>(AV_ROUND_NEAR_INF|AV_ROUND_PASS_MINMAX);
pkt.pts = pkt.dts = av_rescale_q_rnd(out_buf->nTimeStamp, in_timebase, e->ofmt_ctx->streams[0]->time_base, rnd); pkt.pts = pkt.dts = av_rescale_q_rnd(out_buf->header.nTimeStamp, in_timebase, e->ofmt_ctx->streams[0]->time_base, rnd);
pkt.duration = av_rescale_q(50*1000, in_timebase, e->ofmt_ctx->streams[0]->time_base); pkt.duration = av_rescale_q(50*1000, in_timebase, e->ofmt_ctx->streams[0]->time_base);
if (out_buf->nFlags & OMX_BUFFERFLAG_SYNCFRAME) { if (out_buf->header.nFlags & OMX_BUFFERFLAG_SYNCFRAME) {
pkt.flags |= AV_PKT_FLAG_KEY; pkt.flags |= AV_PKT_FLAG_KEY;
} }
@ -388,14 +440,6 @@ void OmxEncoder::handle_out_buf(OmxEncoder *e, OMX_BUFFERHEADERTYPE *out_buf) {
av_free_packet(&pkt); av_free_packet(&pkt);
} }
} }
// give omx back the buffer
#ifdef QCOM2
if (out_buf->nFlags & OMX_BUFFERFLAG_EOS) {
out_buf->nTimeStamp = 0;
}
#endif
OMX_CHECK(OMX_FillThisBuffer(e->handle, out_buf));
} }
int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const uint8_t *v_ptr, int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const uint8_t *v_ptr,
@ -459,15 +503,6 @@ int OmxEncoder::encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const u
OMX_CHECK(OMX_EmptyThisBuffer(this->handle, in_buf)); OMX_CHECK(OMX_EmptyThisBuffer(this->handle, in_buf));
// pump output
while (true) {
OMX_BUFFERHEADERTYPE *out_buf;
if (!this->done_out.try_pop(out_buf)) {
break;
}
handle_out_buf(this, out_buf);
}
this->dirty = true; this->dirty = true;
this->counter++; this->counter++;
@ -524,6 +559,10 @@ void OmxEncoder::encoder_open(const char* path) {
assert(lock_fd >= 0); assert(lock_fd >= 0);
close(lock_fd); close(lock_fd);
// start writer threads
callback_handler_thread = std::thread(OmxEncoder::callback_handler, this);
write_handler_thread = std::thread(OmxEncoder::write_handler, this);
this->is_open = true; this->is_open = true;
this->counter = 0; this->counter = 0;
} }
@ -541,18 +580,12 @@ void OmxEncoder::encoder_close() {
OMX_CHECK(OMX_EmptyThisBuffer(this->handle, in_buf)); OMX_CHECK(OMX_EmptyThisBuffer(this->handle, in_buf));
while (true) {
OMX_BUFFERHEADERTYPE *out_buf = this->done_out.pop();
handle_out_buf(this, out_buf);
if (out_buf->nFlags & OMX_BUFFERFLAG_EOS) {
break;
}
}
this->dirty = false; this->dirty = false;
} }
callback_handler_thread.join();
write_handler_thread.join();
if (this->remuxing) { if (this->remuxing) {
av_write_trailer(this->ofmt_ctx); av_write_trailer(this->ofmt_ctx);
avcodec_free_context(&this->codec_ctx); avcodec_free_context(&this->codec_ctx);
@ -591,9 +624,14 @@ OmxEncoder::~OmxEncoder() {
OMX_CHECK(OMX_FreeHandle(this->handle)); OMX_CHECK(OMX_FreeHandle(this->handle));
OMX_BUFFERHEADERTYPE *out_buf; OMX_BUFFERHEADERTYPE *buf;
while (this->free_in.try_pop(out_buf)); while (this->free_in.try_pop(buf));
while (this->done_out.try_pop(out_buf)); while (this->done_out.try_pop(buf));
OmxBuffer *write_buf;
while (this->to_write.try_pop(write_buf)) {
free(write_buf);
};
if (this->codec_config) { if (this->codec_config) {
free(this->codec_config); free(this->codec_config);

@ -3,6 +3,7 @@
#include <cstdint> #include <cstdint>
#include <cstdio> #include <cstdio>
#include <vector> #include <vector>
#include <thread>
#include <OMX_Component.h> #include <OMX_Component.h>
extern "C" { extern "C" {
@ -12,6 +13,12 @@ extern "C" {
#include "selfdrive/common/queue.h" #include "selfdrive/common/queue.h"
#include "selfdrive/loggerd/encoder.h" #include "selfdrive/loggerd/encoder.h"
struct OmxBuffer {
OMX_BUFFERHEADERTYPE header;
OMX_U8 data[];
};
// OmxEncoder, lossey codec using hardware hevc // OmxEncoder, lossey codec using hardware hevc
class OmxEncoder : public VideoEncoder { class OmxEncoder : public VideoEncoder {
public: public:
@ -32,7 +39,9 @@ public:
private: private:
void wait_for_state(OMX_STATETYPE state); void wait_for_state(OMX_STATETYPE state);
static void handle_out_buf(OmxEncoder *e, OMX_BUFFERHEADERTYPE *out_buf); static void callback_handler(OmxEncoder *e);
static void write_handler(OmxEncoder *e);
static void handle_out_buf(OmxEncoder *e, OmxBuffer *out_buf);
int width, height, fps; int width, height, fps;
char vid_path[1024]; char vid_path[1024];
@ -41,6 +50,8 @@ private:
bool dirty = false; bool dirty = false;
bool write = false; bool write = false;
int counter = 0; int counter = 0;
std::thread callback_handler_thread;
std::thread write_handler_thread;
const char* filename; const char* filename;
FILE *of = nullptr; FILE *of = nullptr;
@ -62,6 +73,7 @@ private:
SafeQueue<OMX_BUFFERHEADERTYPE *> free_in; SafeQueue<OMX_BUFFERHEADERTYPE *> free_in;
SafeQueue<OMX_BUFFERHEADERTYPE *> done_out; SafeQueue<OMX_BUFFERHEADERTYPE *> done_out;
SafeQueue<OmxBuffer *> to_write;
AVFormatContext *ofmt_ctx; AVFormatContext *ofmt_ctx;
AVCodecContext *codec_ctx; AVCodecContext *codec_ctx;

Loading…
Cancel
Save