loggerd: add qcam broadcast support, don't recreate pubmaster (#24226)

* add qcam broadcast support, don't recreate pubmaster

* fixed

Co-authored-by: Comma Device <device@comma.ai>
pull/24231/head
George Hotz 3 years ago committed by GitHub
parent d9fbec6bf5
commit 35e776d2fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      cereal
  2. 31
      selfdrive/loggerd/omx_encoder.cc
  3. 4
      selfdrive/loggerd/omx_encoder.h

@ -1 +1 @@
Subproject commit 932c44c6bda17d7e19e1f58cd26442560a562696 Subproject commit 3d70d855801837c0e5a39f04e6d46791530a537b

@ -326,6 +326,11 @@ OmxEncoder::OmxEncoder(const char* filename, CameraType type, int width, int hei
} }
LOGE("omx initialized - in: %d - out %d", this->in_buf_headers.size(), this->out_buf_headers.size()); LOGE("omx initialized - in: %d - out %d", this->in_buf_headers.size(), this->out_buf_headers.size());
service_name = this->type == DriverCam ? "driverEncodeData" :
(this->type == WideRoadCam ? "wideRoadEncodeData" :
(this->remuxing ? "qRoadEncodeData" : "roadEncodeData"));
pm = new PubMaster({service_name});
} }
void OmxEncoder::callback_handler(OmxEncoder *e) { void OmxEncoder::callback_handler(OmxEncoder *e) {
@ -366,20 +371,21 @@ void OmxEncoder::callback_handler(OmxEncoder *e) {
void OmxEncoder::write_and_broadcast_handler(OmxEncoder *e){ void OmxEncoder::write_and_broadcast_handler(OmxEncoder *e){
bool exit = false; bool exit = false;
const char *service_name = e->type == DriverCam ? "driverEncodeData" : (e->type == WideRoadCam ? "wideRoadEncodeData" : "roadEncodeData");
PubMaster pm({service_name});
e->segment_num++;
uint32_t idx = 0; uint32_t idx = 0;
while (!exit) { while (!exit) {
OmxBuffer *out_buf = e->to_write.pop(); OmxBuffer *out_buf = e->to_write.pop();
MessageBuilder msg; MessageBuilder msg;
auto edata = (e->type == DriverCam) ? msg.initEvent(true).initDriverEncodeData() : auto edata = (e->type == DriverCam) ? msg.initEvent(true).initDriverEncodeData() :
((e->type == WideRoadCam) ? msg.initEvent(true).initWideRoadEncodeData() : msg.initEvent(true).initRoadEncodeData()); ((e->type == WideRoadCam) ? msg.initEvent(true).initWideRoadEncodeData() :
(e->remuxing ? msg.initEvent(true).initQRoadEncodeData() : msg.initEvent(true).initRoadEncodeData()));
edata.setData(kj::heapArray<capnp::byte>(out_buf->data, out_buf->header.nFilledLen)); edata.setData(kj::heapArray<capnp::byte>(out_buf->data, out_buf->header.nFilledLen));
edata.setTimestampEof(out_buf->header.nTimeStamp); edata.setTimestampEof(out_buf->header.nTimeStamp);
edata.setIdx(idx++); edata.setIdx(idx++);
pm.send(service_name, msg); edata.setSegmentNum(e->segment_num);
e->pm->send(e->service_name, msg);
OmxEncoder::handle_out_buf(e, out_buf); OmxEncoder::handle_out_buf(e, out_buf);
if (out_buf->header.nFlags & OMX_BUFFERFLAG_EOS) { if (out_buf->header.nFlags & OMX_BUFFERFLAG_EOS) {
@ -391,21 +397,6 @@ void OmxEncoder::write_and_broadcast_handler(OmxEncoder *e){
} }
void OmxEncoder::write_handler(OmxEncoder *e){
bool exit = false;
while (!exit) {
OmxBuffer *out_buf = e->to_write.pop();
OmxEncoder::handle_out_buf(e, out_buf);
if (out_buf->header.nFlags & OMX_BUFFERFLAG_EOS) {
exit = true;
}
free(out_buf);
}
}
void OmxEncoder::handle_out_buf(OmxEncoder *e, OmxBuffer *out_buf) { void OmxEncoder::handle_out_buf(OmxEncoder *e, OmxBuffer *out_buf) {
int err; int err;
@ -589,7 +580,7 @@ void OmxEncoder::encoder_open(const char* path) {
// start writer threads // start writer threads
callback_handler_thread = std::thread(OmxEncoder::callback_handler, this); callback_handler_thread = std::thread(OmxEncoder::callback_handler, this);
write_handler_thread = std::thread(this->remuxing ? OmxEncoder::write_handler : OmxEncoder::write_and_broadcast_handler, this); write_handler_thread = std::thread(OmxEncoder::write_and_broadcast_handler, this);
this->is_open = true; this->is_open = true;
this->counter = 0; this->counter = 0;

@ -40,7 +40,6 @@ public:
private: private:
void wait_for_state(OMX_STATETYPE state); void wait_for_state(OMX_STATETYPE state);
static void callback_handler(OmxEncoder *e); static void callback_handler(OmxEncoder *e);
static void write_handler(OmxEncoder *e);
static void write_and_broadcast_handler(OmxEncoder *e); static void write_and_broadcast_handler(OmxEncoder *e);
static void handle_out_buf(OmxEncoder *e, OmxBuffer *out_buf); static void handle_out_buf(OmxEncoder *e, OmxBuffer *out_buf);
@ -53,6 +52,9 @@ private:
int counter = 0; int counter = 0;
std::thread callback_handler_thread; std::thread callback_handler_thread;
std::thread write_handler_thread; std::thread write_handler_thread;
int segment_num = -1;
PubMaster *pm;
const char *service_name;
const char* filename; const char* filename;
FILE *of = nullptr; FILE *of = nullptr;

Loading…
Cancel
Save