diff --git a/cereal b/cereal index 6c5d7784db..d6c3cf6b33 160000 --- a/cereal +++ b/cereal @@ -1 +1 @@ -Subproject commit 6c5d7784db838427b24e434345712ba7d0d2714d +Subproject commit d6c3cf6b33e699f82e5d78ae22c74cad978830b6 diff --git a/selfdrive/loggerd/encoder.h b/selfdrive/loggerd/encoder.h index 572a635a7d..3d9f957d9f 100644 --- a/selfdrive/loggerd/encoder.h +++ b/selfdrive/loggerd/encoder.h @@ -1,6 +1,7 @@ #pragma once #include +#include "selfdrive/loggerd/loggerd.h" class VideoEncoder { public: diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 57389508b4..051e1ab7b9 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -62,12 +62,12 @@ void encoder_thread(LoggerdState *s, const LogCameraInfo &cam_info) { LOGD("encoder init %dx%d", buf_info.width, buf_info.height); // main encoder - encoders.push_back(new Encoder(cam_info.filename, buf_info.width, buf_info.height, + encoders.push_back(new Encoder(cam_info.filename, cam_info.type, buf_info.width, buf_info.height, cam_info.fps, cam_info.bitrate, cam_info.is_h265, cam_info.downscale, cam_info.record)); // qcamera encoder if (cam_info.has_qcamera) { - encoders.push_back(new Encoder(qcam_info.filename, qcam_info.frame_width, qcam_info.frame_height, + encoders.push_back(new Encoder(qcam_info.filename, cam_info.type, qcam_info.frame_width, qcam_info.frame_height, qcam_info.fps, qcam_info.bitrate, qcam_info.is_h265, qcam_info.downscale)); } } diff --git a/selfdrive/loggerd/omx_encoder.cc b/selfdrive/loggerd/omx_encoder.cc index f37c900150..42637357ba 100644 --- a/selfdrive/loggerd/omx_encoder.cc +++ b/selfdrive/loggerd/omx_encoder.cc @@ -1,6 +1,7 @@ #pragma clang diagnostic ignored "-Wdeprecated-declarations" #include "selfdrive/loggerd/omx_encoder.h" +#include "cereal/messaging/messaging.h" #include #include @@ -153,8 +154,9 @@ static const char* omx_color_fomat_name(uint32_t format) { // ***** encoder functions ***** -OmxEncoder::OmxEncoder(const char* filename, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write) { +OmxEncoder::OmxEncoder(const char* filename, CameraType type, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write) { this->filename = filename; + this->type = type; this->write = write; this->width = width; this->height = height; @@ -362,6 +364,32 @@ void OmxEncoder::callback_handler(OmxEncoder *e) { } } +void OmxEncoder::write_and_broadcast_handler(OmxEncoder *e){ + bool exit = false; + const char *service_name = e->type == DriverCam ? "driverEncodeData" : (e->type == WideRoadCam ? "wideRoadEncodeData" : "roadEncodeData"); + PubMaster pm({service_name}); + + uint32_t idx = 0; + while (!exit) { + OmxBuffer *out_buf = e->to_write.pop(); + + MessageBuilder msg; + auto edata = (e->type == DriverCam) ? msg.initEvent(true).initDriverEncodeData() : + ((e->type == WideRoadCam) ? msg.initEvent(true).initWideRoadEncodeData() : msg.initEvent(true).initRoadEncodeData()); + edata.setData(kj::heapArray(out_buf->data, out_buf->header.nFilledLen)); + edata.setTimestampEof(out_buf->header.nTimeStamp); + edata.setIdx(idx++); + pm.send(service_name, msg); + + OmxEncoder::handle_out_buf(e, out_buf); + if (out_buf->header.nFlags & OMX_BUFFERFLAG_EOS) { + exit = true; + } + + free(out_buf); + } +} + void OmxEncoder::write_handler(OmxEncoder *e){ bool exit = false; @@ -561,7 +589,7 @@ void OmxEncoder::encoder_open(const char* path) { // start writer threads callback_handler_thread = std::thread(OmxEncoder::callback_handler, this); - write_handler_thread = std::thread(OmxEncoder::write_handler, this); + write_handler_thread = std::thread(this->remuxing ? OmxEncoder::write_handler : OmxEncoder::write_and_broadcast_handler, this); this->is_open = true; this->counter = 0; diff --git a/selfdrive/loggerd/omx_encoder.h b/selfdrive/loggerd/omx_encoder.h index e3087e31bf..65f2c0b9be 100644 --- a/selfdrive/loggerd/omx_encoder.h +++ b/selfdrive/loggerd/omx_encoder.h @@ -22,7 +22,7 @@ struct OmxBuffer { // OmxEncoder, lossey codec using hardware hevc class OmxEncoder : public VideoEncoder { public: - OmxEncoder(const char* filename, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write = true); + OmxEncoder(const char* filename, CameraType type, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write = true); ~OmxEncoder(); int encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const uint8_t *v_ptr, int in_width, int in_height, uint64_t ts); @@ -41,6 +41,7 @@ private: void wait_for_state(OMX_STATETYPE state); static void callback_handler(OmxEncoder *e); static void write_handler(OmxEncoder *e); + static void write_and_broadcast_handler(OmxEncoder *e); static void handle_out_buf(OmxEncoder *e, OmxBuffer *out_buf); int width, height, fps; @@ -55,6 +56,7 @@ private: const char* filename; FILE *of = nullptr; + CameraType type; size_t codec_config_len; uint8_t *codec_config = NULL; diff --git a/selfdrive/loggerd/raw_logger.cc b/selfdrive/loggerd/raw_logger.cc index 8d27e4a7a1..6da0d59ad3 100644 --- a/selfdrive/loggerd/raw_logger.cc +++ b/selfdrive/loggerd/raw_logger.cc @@ -22,7 +22,7 @@ extern "C" { #include "selfdrive/common/swaglog.h" #include "selfdrive/common/util.h" -RawLogger::RawLogger(const char* filename, int width, int height, int fps, +RawLogger::RawLogger(const char* filename, CameraType type, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write) : filename(filename), fps(fps) { diff --git a/selfdrive/loggerd/raw_logger.h b/selfdrive/loggerd/raw_logger.h index 3c7fed38cc..2fa23f83cd 100644 --- a/selfdrive/loggerd/raw_logger.h +++ b/selfdrive/loggerd/raw_logger.h @@ -15,7 +15,7 @@ extern "C" { class RawLogger : public VideoEncoder { public: - RawLogger(const char* filename, int width, int height, int fps, + RawLogger(const char* filename, CameraType type, int width, int height, int fps, int bitrate, bool h265, bool downscale, bool write = true); ~RawLogger(); int encode_frame(const uint8_t *y_ptr, const uint8_t *u_ptr, const uint8_t *v_ptr, diff --git a/tools/camerastream/compressed_vipc.py b/tools/camerastream/compressed_vipc.py new file mode 100755 index 0000000000..68522d1eb1 --- /dev/null +++ b/tools/camerastream/compressed_vipc.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +import os +import sys +import numpy as np +import multiprocessing + +from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module, import-error +W, H = 1928, 1208 + +def writer(fn, addr, sock_name): + import cereal.messaging as messaging + HEADER = b"\x00\x00\x00\x01\x40\x01\x0c\x01\xff\xff\x01\x60\x00\x00\x03\x00\xb0\x00\x00\x03\x00\x00\x03\x00\x96\xac\x09\x00\x00\x00\x01\x42\x01\x01\x01\x60\x00\x00\x03\x00\xb0\x00\x00\x03\x00\x00\x03\x00\x96\xa0\x03\xd0\x80\x13\x07\x1b\x2e\x5a\xee\x4c\x92\xea\x00\xbb\x42\x84\xa0\x00\x00\x00\x01\x44\x01\xc0\xe2\x4f\x09\xc1\x80\xc6\x08\x40\x00" + fifo_file = open(fn, "wb") + fifo_file.write(HEADER) + fifo_file.flush() + + os.environ["ZMQ"] = "1" + messaging.context = messaging.Context() + + sock = messaging.sub_sock(sock_name, None, addr=addr, conflate=False) + last_idx = -1 + seen_iframe = False + while 1: + msgs = messaging.drain_sock(sock, wait_for_one=True) + for evt in msgs: + evta = getattr(evt, evt.which()) + lat = ((evt.logMonoTime/1e9) - (evta.timestampEof/1e6))*1000 + print("%2d %4d %.3f %.3f latency %.2f ms" % (len(msgs), evta.idx, evt.logMonoTime/1e9, evta.timestampEof/1e6, lat), len(evta.data), sock_name) + if evta.idx != 0 and evta.idx != (last_idx+1): + print("DROP!") + last_idx = evta.idx + if len(evta.data) > 4 and evta.data[4] == 0x26: + seen_iframe = True + if not seen_iframe: + print("waiting for iframe") + continue + fifo_file.write(evta.data) + fifo_file.flush() + +def decoder_nvidia(fn, vipc_server, vst): + sys.path.append("/raid.dell2/PyNvCodec") + import PyNvCodec as nvc # pylint: disable=import-error + decoder = nvc.PyNvDecoder(fn, 0, {"probesize": "32"}) + conv = nvc.PySurfaceConverter(W, H, nvc.PixelFormat.NV12, nvc.PixelFormat.BGR, 0) + cc1 = nvc.ColorspaceConversionContext(nvc.ColorSpace.BT_709, nvc.ColorRange.JPEG) + nvDwn = nvc.PySurfaceDownloader(W, H, nvc.PixelFormat.BGR, 0) + + img = np.ndarray((H,W,3), dtype=np.uint8) + cnt = 0 + while 1: + rawSurface = decoder.DecodeSingleSurface() + if rawSurface.Empty(): + continue + convSurface = conv.Execute(rawSurface, cc1) + nvDwn.DownloadSingleSurface(convSurface, img) + vipc_server.send(vst, img.flatten().data, cnt, 0, 0) + cnt += 1 + +def decoder_ffmpeg(fn, vipc_server, vst): + import av # pylint: disable=import-error + container = av.open(fn, options={"probesize": "32"}) + cnt = 0 + for frame in container.decode(video=0): + img = frame.to_ndarray(format=av.video.format.VideoFormat('bgr24')) + vipc_server.send(vst, img.flatten().data, cnt, 0, 0) + cnt += 1 + +import argparse +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='Decode video streams and broacast on VisionIPC') + parser.add_argument("addr", help="Address of comma 3") + parser.add_argument('--nvidia', action='store_true', help='Use nvidia instead of ffmpeg') + parser.add_argument("--cams", default="0,1,2", help="Cameras to decode") + args = parser.parse_args() + + all_cams = [ + ("roadEncodeData", VisionStreamType.VISION_STREAM_RGB_ROAD), + ("wideRoadEncodeData", VisionStreamType.VISION_STREAM_RGB_WIDE_ROAD), + ("driverEncodeData", VisionStreamType.VISION_STREAM_RGB_DRIVER), + ] + cams = dict([all_cams[int(x)] for x in args.cams.split(",")]) + + vipc_server = VisionIpcServer("camerad") + for vst in cams.values(): + vipc_server.create_buffers(vst, 4, True, W, H) + vipc_server.start_listener() + + for k,v in cams.items(): + FIFO_NAME = "/tmp/decodepipe_"+k + if os.path.exists(FIFO_NAME): + os.unlink(FIFO_NAME) + os.mkfifo(FIFO_NAME) + multiprocessing.Process(target=writer, args=(FIFO_NAME, sys.argv[1], k)).start() + if args.nvidia: + multiprocessing.Process(target=decoder_nvidia, args=(FIFO_NAME, vipc_server, v)).start() + else: + multiprocessing.Process(target=decoder_ffmpeg, args=(FIFO_NAME, vipc_server, v)).start()