loggerd: broadcast encoder data (#24177)

* encode data try 1

* fix pc build

* low quality compressed streamer with pyav

* nvidia streamer

* timestamp support

* fix latency measurement

* refactor

* camerastream updates

* fix linter

* compressed to vipc, clean

* remove print

* touchups

Co-authored-by: Comma Device <device@comma.ai>
old-commit-hash: dab978083b
taco
George Hotz 3 years ago committed by GitHub
parent 052b799262
commit 9720bb37b5
  1. 2
      cereal
  2. 1
      selfdrive/loggerd/encoder.h
  3. 4
      selfdrive/loggerd/loggerd.cc
  4. 32
      selfdrive/loggerd/omx_encoder.cc
  5. 4
      selfdrive/loggerd/omx_encoder.h
  6. 2
      selfdrive/loggerd/raw_logger.cc
  7. 2
      selfdrive/loggerd/raw_logger.h
  8. 97
      tools/camerastream/compressed_vipc.py

@ -1 +1 @@
Subproject commit 6c5d7784db838427b24e434345712ba7d0d2714d Subproject commit d6c3cf6b33e699f82e5d78ae22c74cad978830b6

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <cstdint> #include <cstdint>
#include "selfdrive/loggerd/loggerd.h"
class VideoEncoder { class VideoEncoder {
public: public:

@ -62,12 +62,12 @@ void encoder_thread(LoggerdState *s, const LogCameraInfo &cam_info) {
LOGD("encoder init %dx%d", buf_info.width, buf_info.height); LOGD("encoder init %dx%d", buf_info.width, buf_info.height);
// main encoder // main encoder
encoders.push_back(new Encoder(cam_info.filename, buf_info.width, buf_info.height, encoders.push_back(new Encoder(cam_info.filename, cam_info.type, buf_info.width, buf_info.height,
cam_info.fps, cam_info.bitrate, cam_info.is_h265, cam_info.fps, cam_info.bitrate, cam_info.is_h265,
cam_info.downscale, cam_info.record)); cam_info.downscale, cam_info.record));
// qcamera encoder // qcamera encoder
if (cam_info.has_qcamera) { if (cam_info.has_qcamera) {
encoders.push_back(new Encoder(qcam_info.filename, qcam_info.frame_width, qcam_info.frame_height, encoders.push_back(new Encoder(qcam_info.filename, cam_info.type, qcam_info.frame_width, qcam_info.frame_height,
qcam_info.fps, qcam_info.bitrate, qcam_info.is_h265, qcam_info.downscale)); qcam_info.fps, qcam_info.bitrate, qcam_info.is_h265, qcam_info.downscale));
} }
} }

@ -1,6 +1,7 @@
#pragma clang diagnostic ignored "-Wdeprecated-declarations" #pragma clang diagnostic ignored "-Wdeprecated-declarations"
#include "selfdrive/loggerd/omx_encoder.h" #include "selfdrive/loggerd/omx_encoder.h"
#include "cereal/messaging/messaging.h"
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>
@ -153,8 +154,9 @@ static const char* omx_color_fomat_name(uint32_t format) {
// ***** encoder functions ***** // ***** encoder functions *****
OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write) { OmxEncoder::OmxEncoder(const char* filename, CameraType type, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write) {
this->filename = filename; this->filename = filename;
this->type = type;
this->write = write; this->write = write;
this->width = width; this->width = width;
this->height = height; this->height = height;
@ -362,6 +364,32 @@ void OmxEncoder::callback_handler(OmxEncoder *e) {
} }
} }
void OmxEncoder::write_and_broadcast_handler(OmxEncoder *e){
bool exit = false;
const char *service_name = e->type == DriverCam ? "driverEncodeData" : (e->type == WideRoadCam ? "wideRoadEncodeData" : "roadEncodeData");
PubMaster pm({service_name});
uint32_t idx = 0;
while (!exit) {
OmxBuffer *out_buf = e->to_write.pop();
MessageBuilder msg;
auto edata = (e->type == DriverCam) ? msg.initEvent(true).initDriverEncodeData() :
((e->type == WideRoadCam) ? msg.initEvent(true).initWideRoadEncodeData() : msg.initEvent(true).initRoadEncodeData());
edata.setData(kj::heapArray<capnp::byte>(out_buf->data, out_buf->header.nFilledLen));
edata.setTimestampEof(out_buf->header.nTimeStamp);
edata.setIdx(idx++);
pm.send(service_name, msg);
OmxEncoder::handle_out_buf(e, out_buf);
if (out_buf->header.nFlags & OMX_BUFFERFLAG_EOS) {
exit = true;
}
free(out_buf);
}
}
void OmxEncoder::write_handler(OmxEncoder *e){ void OmxEncoder::write_handler(OmxEncoder *e){
bool exit = false; bool exit = false;
@ -561,7 +589,7 @@ void OmxEncoder::encoder_open(const char* path) {
// start writer threads // start writer threads
callback_handler_thread = std::thread(OmxEncoder::callback_handler, this); callback_handler_thread = std::thread(OmxEncoder::callback_handler, this);
write_handler_thread = std::thread(OmxEncoder::write_handler, this); write_handler_thread = std::thread(this->remuxing ? OmxEncoder::write_handler : OmxEncoder::write_and_broadcast_handler, this);
this->is_open = true; this->is_open = true;
this->counter = 0; this->counter = 0;

@ -22,7 +22,7 @@ struct OmxBuffer {
// OmxEncoder, lossey codec using hardware hevc // OmxEncoder, lossey codec using hardware hevc
class OmxEncoder : public VideoEncoder { class OmxEncoder : public VideoEncoder {
public: public:
OmxEncoder(const char* filename, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write = true); OmxEncoder(const char* filename, CameraType type, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write = true);
~OmxEncoder(); ~OmxEncoder();
int encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const uint8_t *v_ptr, int encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const uint8_t *v_ptr,
int in_width, int in_height, uint64_t ts); int in_width, int in_height, uint64_t ts);
@ -41,6 +41,7 @@ private:
void wait_for_state(OMX_STATETYPE state); void wait_for_state(OMX_STATETYPE state);
static void callback_handler(OmxEncoder *e); static void callback_handler(OmxEncoder *e);
static void write_handler(OmxEncoder *e); static void write_handler(OmxEncoder *e);
static void write_and_broadcast_handler(OmxEncoder *e);
static void handle_out_buf(OmxEncoder *e, OmxBuffer *out_buf); static void handle_out_buf(OmxEncoder *e, OmxBuffer *out_buf);
int width, height, fps; int width, height, fps;
@ -55,6 +56,7 @@ private:
const char* filename; const char* filename;
FILE *of = nullptr; FILE *of = nullptr;
CameraType type;
size_t codec_config_len; size_t codec_config_len;
uint8_t *codec_config = NULL; uint8_t *codec_config = NULL;

@ -22,7 +22,7 @@ extern "C" {
#include "selfdrive/common/swaglog.h" #include "selfdrive/common/swaglog.h"
#include "selfdrive/common/util.h" #include "selfdrive/common/util.h"
RawLogger::RawLogger(const char* filename, int width, int height, int fps, RawLogger::RawLogger(const char* filename, CameraType type, int width, int height, int fps,
int bitrate, bool h265, bool downscale, bool write) int bitrate, bool h265, bool downscale, bool write)
: filename(filename), fps(fps) { : filename(filename), fps(fps) {

@ -15,7 +15,7 @@ extern "C" {
class RawLogger : public VideoEncoder { class RawLogger : public VideoEncoder {
public: public:
RawLogger(const char* filename, int width, int height, int fps, RawLogger(const char* filename, CameraType type, int width, int height, int fps,
int bitrate, bool h265, bool downscale, bool write = true); int bitrate, bool h265, bool downscale, bool write = true);
~RawLogger(); ~RawLogger();
int encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const uint8_t *v_ptr, int encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const uint8_t *v_ptr,

@ -0,0 +1,97 @@
#!/usr/bin/env python3
import os
import sys
import numpy as np
import multiprocessing
from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module, import-error
W, H = 1928, 1208
def writer(fn, addr, sock_name):
import cereal.messaging as messaging
HEADER = b"\x00\x00\x00\x01\x40\x01\x0c\x01\xff\xff\x01\x60\x00\x00\x03\x00\xb0\x00\x00\x03\x00\x00\x03\x00\x96\xac\x09\x00\x00\x00\x01\x42\x01\x01\x01\x60\x00\x00\x03\x00\xb0\x00\x00\x03\x00\x00\x03\x00\x96\xa0\x03\xd0\x80\x13\x07\x1b\x2e\x5a\xee\x4c\x92\xea\x00\xbb\x42\x84\xa0\x00\x00\x00\x01\x44\x01\xc0\xe2\x4f\x09\xc1\x80\xc6\x08\x40\x00"
fifo_file = open(fn, "wb")
fifo_file.write(HEADER)
fifo_file.flush()
os.environ["ZMQ"] = "1"
messaging.context = messaging.Context()
sock = messaging.sub_sock(sock_name, None, addr=addr, conflate=False)
last_idx = -1
seen_iframe = False
while 1:
msgs = messaging.drain_sock(sock, wait_for_one=True)
for evt in msgs:
evta = getattr(evt, evt.which())
lat = ((evt.logMonoTime/1e9) - (evta.timestampEof/1e6))*1000
print("%2d %4d %.3f %.3f latency %.2f ms" % (len(msgs), evta.idx, evt.logMonoTime/1e9, evta.timestampEof/1e6, lat), len(evta.data), sock_name)
if evta.idx != 0 and evta.idx != (last_idx+1):
print("DROP!")
last_idx = evta.idx
if len(evta.data) > 4 and evta.data[4] == 0x26:
seen_iframe = True
if not seen_iframe:
print("waiting for iframe")
continue
fifo_file.write(evta.data)
fifo_file.flush()
def decoder_nvidia(fn, vipc_server, vst):
sys.path.append("/raid.dell2/PyNvCodec")
import PyNvCodec as nvc # pylint: disable=import-error
decoder = nvc.PyNvDecoder(fn, 0, {"probesize": "32"})
conv = nvc.PySurfaceConverter(W, H, nvc.PixelFormat.NV12, nvc.PixelFormat.BGR, 0)
cc1 = nvc.ColorspaceConversionContext(nvc.ColorSpace.BT_709, nvc.ColorRange.JPEG)
nvDwn = nvc.PySurfaceDownloader(W, H, nvc.PixelFormat.BGR, 0)
img = np.ndarray((H,W,3), dtype=np.uint8)
cnt = 0
while 1:
rawSurface = decoder.DecodeSingleSurface()
if rawSurface.Empty():
continue
convSurface = conv.Execute(rawSurface, cc1)
nvDwn.DownloadSingleSurface(convSurface, img)
vipc_server.send(vst, img.flatten().data, cnt, 0, 0)
cnt += 1
def decoder_ffmpeg(fn, vipc_server, vst):
import av # pylint: disable=import-error
container = av.open(fn, options={"probesize": "32"})
cnt = 0
for frame in container.decode(video=0):
img = frame.to_ndarray(format=av.video.format.VideoFormat('bgr24'))
vipc_server.send(vst, img.flatten().data, cnt, 0, 0)
cnt += 1
import argparse
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Decode video streams and broacast on VisionIPC')
parser.add_argument("addr", help="Address of comma 3")
parser.add_argument('--nvidia', action='store_true', help='Use nvidia instead of ffmpeg')
parser.add_argument("--cams", default="0,1,2", help="Cameras to decode")
args = parser.parse_args()
all_cams = [
("roadEncodeData", VisionStreamType.VISION_STREAM_RGB_ROAD),
("wideRoadEncodeData", VisionStreamType.VISION_STREAM_RGB_WIDE_ROAD),
("driverEncodeData", VisionStreamType.VISION_STREAM_RGB_DRIVER),
]
cams = dict([all_cams[int(x)] for x in args.cams.split(",")])
vipc_server = VisionIpcServer("camerad")
for vst in cams.values():
vipc_server.create_buffers(vst, 4, True, W, H)
vipc_server.start_listener()
for k,v in cams.items():
FIFO_NAME = "/tmp/decodepipe_"+k
if os.path.exists(FIFO_NAME):
os.unlink(FIFO_NAME)
os.mkfifo(FIFO_NAME)
multiprocessing.Process(target=writer, args=(FIFO_NAME, sys.argv[1], k)).start()
if args.nvidia:
multiprocessing.Process(target=decoder_nvidia, args=(FIFO_NAME, vipc_server, v)).start()
else:
multiprocessing.Process(target=decoder_ffmpeg, args=(FIFO_NAME, vipc_server, v)).start()
Loading…
Cancel
Save