From a8c5acb8840e31de3e7d10c12b2650636fc7975e Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Mon, 4 Oct 2021 22:45:28 +0800 Subject: [PATCH] c++ replay: publish all frames in CameraServer (#22378) * cameraserver * support yuv * init camera server in start() * trigger ci old-commit-hash: 1eb79d7a59a36e75ef2ccb1a8b1cea084784fc43 --- selfdrive/camerad/cameras/camera_replay.cc | 6 +- selfdrive/ui/SConscript | 2 +- selfdrive/ui/replay/camera.cc | 67 +++++++++++++++++++++ selfdrive/ui/replay/camera.h | 41 +++++++++++++ selfdrive/ui/replay/framereader.cc | 70 +++++++++++----------- selfdrive/ui/replay/framereader.h | 13 ++-- selfdrive/ui/replay/replay.cc | 53 +++++++--------- selfdrive/ui/replay/replay.h | 4 +- 8 files changed, 180 insertions(+), 76 deletions(-) create mode 100644 selfdrive/ui/replay/camera.cc create mode 100644 selfdrive/ui/replay/camera.h diff --git a/selfdrive/camerad/cameras/camera_replay.cc b/selfdrive/camerad/cameras/camera_replay.cc index ff632b4b67..825b1616bf 100644 --- a/selfdrive/camerad/cameras/camera_replay.cc +++ b/selfdrive/camerad/cameras/camera_replay.cc @@ -53,11 +53,11 @@ void run_camera(CameraState *s) { // loop stream stream_frame_id = 0; } - uint8_t *dat = s->frame->get(stream_frame_id++); - if (dat) { + if (auto dat = s->frame->get(stream_frame_id++)) { + auto [rgb_buf, yuv_buf] = *dat; s->buf.camera_bufs_metadata[buf_idx] = {.frame_id = frame_id}; auto &buf = s->buf.camera_bufs[buf_idx]; - CL_CHECK(clEnqueueWriteBuffer(buf.copy_q, buf.buf_cl, CL_TRUE, 0, s->frame->getRGBSize(), dat, 0, NULL, NULL)); + CL_CHECK(clEnqueueWriteBuffer(buf.copy_q, buf.buf_cl, CL_TRUE, 0, s->frame->getRGBSize(), rgb_buf, 0, NULL, NULL)); s->buf.queue(buf_idx); ++frame_id; buf_idx = (buf_idx + 1) % FRAME_BUF_COUNT; diff --git a/selfdrive/ui/SConscript b/selfdrive/ui/SConscript index 137b6e47f7..ca8147ec82 100644 --- a/selfdrive/ui/SConscript +++ b/selfdrive/ui/SConscript @@ -108,7 +108,7 @@ if GetOption('setup'): if arch in ['x86_64', 'Darwin'] and os.path.exists(Dir("#tools/").get_abspath()): qt_env['CXXFLAGS'] += ["-Wno-deprecated-declarations"] - replay_lib_src = ["replay/replay.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"] + replay_lib_src = ["replay/replay.cc", "replay/camera.cc", "replay/logreader.cc", "replay/framereader.cc", "replay/route.cc", "replay/util.cc"] replay_lib = qt_env.Library("qt_replay", replay_lib_src, LIBS=base_libs) replay_libs = [replay_lib, 'avutil', 'avcodec', 'avformat', 'swscale', 'bz2', 'curl'] + qt_libs diff --git a/selfdrive/ui/replay/camera.cc b/selfdrive/ui/replay/camera.cc new file mode 100644 index 0000000000..2c1a6e0beb --- /dev/null +++ b/selfdrive/ui/replay/camera.cc @@ -0,0 +1,67 @@ +#include "selfdrive/ui/replay/camera.h" + +#include +#include + +const int YUV_BUF_COUNT = 50; + +CameraServer::CameraServer() { + device_id_ = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT); + context_ = CL_CHECK_ERR(clCreateContext(nullptr, 1, &device_id_, nullptr, nullptr, &err)); + camera_thread_ = std::thread(&CameraServer::thread, this); +} + +CameraServer::~CameraServer() { + queue_.push({}); + camera_thread_.join(); + vipc_server_.reset(nullptr); + CL_CHECK(clReleaseContext(context_)); +} + +void CameraServer::startVipcServer() { + std::cout << (vipc_server_ ? "start" : "restart") << " vipc server" << std::endl; + vipc_server_.reset(new VisionIpcServer("camerad", device_id_, context_)); + for (auto &cam : cameras_) { + if (cam.width > 0 && cam.height > 0) { + vipc_server_->create_buffers(cam.rgb_type, UI_BUF_COUNT, true, cam.width, cam.height); + vipc_server_->create_buffers(cam.yuv_type, YUV_BUF_COUNT, false, cam.width, cam.height); + } + } + vipc_server_->start_listener(); +} + +void CameraServer::thread() { + while (true) { + const auto [type, fr, eidx] = queue_.pop(); + if (!fr) break; + + auto &cam = cameras_[type]; + // start|restart the vipc server if frame size changed + if (cam.width != fr->width || cam.height != fr->height) { + cam.width = fr->width; + cam.height = fr->height; + std::cout << "camera[" << type << "] frame size " << cam.width << "x" << cam.height << std::endl; + startVipcServer(); + } + + // send frame + if (auto dat = fr->get(eidx.getSegmentId())) { + auto [rgb_dat, yuv_dat] = *dat; + VisionIpcBufExtra extra = { + .frame_id = eidx.getFrameId(), + .timestamp_sof = eidx.getTimestampSof(), + .timestamp_eof = eidx.getTimestampEof(), + }; + + VisionBuf *rgb_buf = vipc_server_->get_buffer(cam.rgb_type); + memcpy(rgb_buf->addr, rgb_dat, fr->getRGBSize()); + VisionBuf *yuv_buf = vipc_server_->get_buffer(cam.yuv_type); + memcpy(yuv_buf->addr, yuv_dat, fr->getYUVSize()); + + vipc_server_->send(rgb_buf, &extra, false); + vipc_server_->send(yuv_buf, &extra, false); + } else { + std::cout << "camera[" << type << "] failed to get frame:" << eidx.getSegmentId() << std::endl; + } + } +} diff --git a/selfdrive/ui/replay/camera.h b/selfdrive/ui/replay/camera.h new file mode 100644 index 0000000000..84607aa3a1 --- /dev/null +++ b/selfdrive/ui/replay/camera.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include "cereal/visionipc/visionipc_server.h" +#include "selfdrive/common/queue.h" +#include "selfdrive/ui/replay/framereader.h" +#include "selfdrive/ui/replay/logreader.h" + +class CameraServer { +public: + CameraServer(); + ~CameraServer(); + inline void pushFrame(CameraType type, FrameReader* fr, const cereal::EncodeIndex::Reader& eidx) { + queue_.push({type, fr, eidx}); + } + inline void waitFinish() { + while (!queue_.empty()) usleep(0); + } + +protected: + void startVipcServer(); + void thread(); + + struct Camera { + VisionStreamType rgb_type; + VisionStreamType yuv_type; + int width; + int height; + }; + + Camera cameras_[MAX_CAMERAS] = { + {.rgb_type = VISION_STREAM_RGB_BACK, .yuv_type = VISION_STREAM_YUV_BACK}, + {.rgb_type = VISION_STREAM_RGB_FRONT, .yuv_type = VISION_STREAM_YUV_FRONT}, + {.rgb_type = VISION_STREAM_RGB_WIDE, .yuv_type = VISION_STREAM_YUV_WIDE}, + }; + cl_device_id device_id_; + cl_context context_; + std::thread camera_thread_; + std::unique_ptr vipc_server_; + SafeQueue> queue_; +}; diff --git a/selfdrive/ui/replay/framereader.cc b/selfdrive/ui/replay/framereader.cc index ea5407039b..a94bed1d32 100644 --- a/selfdrive/ui/replay/framereader.cc +++ b/selfdrive/ui/replay/framereader.cc @@ -48,13 +48,6 @@ FrameReader::~FrameReader() { // free all. for (auto &f : frames_) { av_free_packet(&f.pkt); - if (f.data) { - delete[] f.data; - } - } - while (!buffer_pool.empty()) { - delete[] buffer_pool.front(); - buffer_pool.pop(); } if (frmRgb_) { av_frame_free(&frmRgb_); @@ -78,7 +71,7 @@ bool FrameReader::load(const std::string &url) { return false; } avformat_find_stream_info(pFormatCtx_, NULL); - av_dump_format(pFormatCtx_, 0, url.c_str(), 0); + // av_dump_format(pFormatCtx_, 0, url.c_str(), 0); auto pCodecCtxOrig = pFormatCtx_->streams[0]->codec; auto pCodec = avcodec_find_decoder(pCodecCtxOrig->codec_id); @@ -119,15 +112,18 @@ bool FrameReader::load(const std::string &url) { return valid_; } -uint8_t *FrameReader::get(int idx) { +std::optional> FrameReader::get(int idx) { if (!valid_ || idx < 0 || idx >= frames_.size()) { - return nullptr; + return std::nullopt; } std::unique_lock lk(mutex_); decode_idx_ = idx; cv_decode_.notify_one(); - cv_frame_.wait(lk, [=] { return exit_ || frames_[idx].data || frames_[idx].failed; }); - return frames_[idx].data; + cv_frame_.wait(lk, [=] { return exit_ || frames_[idx].rgb_data || frames_[idx].failed; }); + if (!frames_[idx].rgb_data) { + return std::nullopt; + } + return std::make_pair(frames_[idx].rgb_data.get(), frames_[idx].yuv_data.get()); } void FrameReader::decodeThread() { @@ -138,16 +134,17 @@ void FrameReader::decodeThread() { for (int i = 0; i < frames_.size() && !exit_; ++i) { Frame &frame = frames_[i]; if (i >= from && i < to) { - if (frame.data || frame.failed) continue; + if (frame.rgb_data || frame.failed) continue; - uint8_t *dat = decodeFrame(&frame.pkt); + auto [rgb_data, yuv_data] = decodeFrame(&frame.pkt); std::unique_lock lk(mutex_); - frame.data = dat; - frame.failed = !dat; + frame.rgb_data.reset(rgb_data); + frame.yuv_data.reset(yuv_data); + frame.failed = !rgb_data; cv_frame_.notify_all(); - } else if (frame.data) { - buffer_pool.push(frame.data); - frame.data = nullptr; + } else { + frame.rgb_data.reset(nullptr); + frame.yuv_data.reset(nullptr); frame.failed = false; } } @@ -160,28 +157,33 @@ void FrameReader::decodeThread() { } } -uint8_t *FrameReader::decodeFrame(AVPacket *pkt) { - int gotFrame; +std::pair FrameReader::decodeFrame(AVPacket *pkt) { + uint8_t *rgb_data = nullptr, *yuv_data = nullptr; + int gotFrame = 0; AVFrame *f = av_frame_alloc(); avcodec_decode_video2(pCodecCtx_, f, &gotFrame, pkt); - - uint8_t *dat = nullptr; if (gotFrame) { - if (!buffer_pool.empty()) { - dat = buffer_pool.front(); - buffer_pool.pop(); - } else { - dat = new uint8_t[getRGBSize()]; + rgb_data = new uint8_t[getRGBSize()]; + yuv_data = new uint8_t[getYUVSize()]; + int i, j, k; + for (i = 0; i < f->height; i++) { + memcpy(yuv_data + f->width * i, f->data[0] + f->linesize[0] * i, f->width); + } + for (j = 0; j < f->height / 2; j++) { + memcpy(yuv_data + f->width * i + f->width / 2 * j, f->data[1] + f->linesize[1] * j, f->width / 2); + } + for (k = 0; k < f->height / 2; k++) { + memcpy(yuv_data + f->width * i + f->width / 2 * j + f->width / 2 * k, f->data[2] + f->linesize[2] * k, f->width / 2); } - int ret = avpicture_fill((AVPicture *)frmRgb_, dat, AV_PIX_FMT_BGR24, f->width, f->height); + int ret = avpicture_fill((AVPicture *)frmRgb_, rgb_data, AV_PIX_FMT_BGR24, f->width, f->height); assert(ret > 0); - if (sws_scale(sws_ctx_, (const uint8_t **)f->data, f->linesize, 0, - f->height, frmRgb_->data, frmRgb_->linesize) <= 0) { - delete[] dat; - dat = nullptr; + if (sws_scale(sws_ctx_, (const uint8_t **)f->data, f->linesize, 0, f->height, frmRgb_->data, frmRgb_->linesize) <= 0) { + delete[] rgb_data; + delete[] yuv_data; + rgb_data = yuv_data = nullptr; } } av_frame_free(&f); - return dat; + return {rgb_data, yuv_data}; } diff --git a/selfdrive/ui/replay/framereader.h b/selfdrive/ui/replay/framereader.h index a66e401d7c..ce72b4e5b9 100644 --- a/selfdrive/ui/replay/framereader.h +++ b/selfdrive/ui/replay/framereader.h @@ -1,11 +1,9 @@ #pragma once -#include - #include #include #include -#include +#include #include #include #include @@ -22,8 +20,9 @@ public: FrameReader(); ~FrameReader(); bool load(const std::string &url); - uint8_t *get(int idx); + std::optional> get(int idx); int getRGBSize() const { return width * height * 3; } + int getYUVSize() const { return width * height * 3 / 2; } size_t getFrameCount() const { return frames_.size(); } bool valid() const { return valid_; } @@ -31,10 +30,11 @@ public: private: void decodeThread(); - uint8_t *decodeFrame(AVPacket *pkt); + std::pair decodeFrame(AVPacket *pkt); struct Frame { AVPacket pkt = {}; - uint8_t *data = nullptr; + std::unique_ptr rgb_data = nullptr; + std::unique_ptr yuv_data = nullptr; bool failed = false; }; std::vector frames_; @@ -42,7 +42,6 @@ private: AVFormatContext *pFormatCtx_ = nullptr; AVCodecContext *pCodecCtx_ = nullptr; AVFrame *frmRgb_ = nullptr; - std::queue buffer_pool; struct SwsContext *sws_ctx_ = nullptr; std::mutex mutex_; diff --git a/selfdrive/ui/replay/replay.cc b/selfdrive/ui/replay/replay.cc index f6cb0f832b..838495df3d 100644 --- a/selfdrive/ui/replay/replay.cc +++ b/selfdrive/ui/replay/replay.cc @@ -51,6 +51,8 @@ bool Replay::load() { void Replay::start(int seconds) { seekTo(seconds); + + camera_server_ = std::make_unique(); // start stream thread thread = new QThread; QObject::connect(thread, &QThread::started, [=]() { stream(); }); @@ -184,6 +186,22 @@ void Replay::mergeSegments(int cur_seg, int end_idx) { } } +void Replay::publishFrame(const Event *e) { + auto publish = [=](CameraType cam_type, const cereal::EncodeIndex::Reader &eidx) { + auto &seg = segments_[eidx.getSegmentNum()]; + if (seg && seg->isLoaded() && seg->frames[cam_type] && eidx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) { + camera_server_->pushFrame(cam_type, seg->frames[cam_type].get(), eidx); + } + }; + if (e->which == cereal::Event::ROAD_ENCODE_IDX) { + publish(RoadCam, e->event.getRoadEncodeIdx()); + } else if (e->which == cereal::Event::DRIVER_ENCODE_IDX) { + publish(DriverCam, e->event.getDriverEncodeIdx()); + } else if (e->which == cereal::Event::WIDE_ROAD_ENCODE_IDX) { + publish(WideRoadCam, e->event.getWideRoadEncodeIdx()); + } +} + void Replay::stream() { float last_print = 0; cereal::Event::Which cur_which = cereal::Event::Which::INIT_DATA; @@ -227,38 +245,10 @@ void Replay::stream() { precise_nano_sleep(behind_ns); } - // publish frame if (evt->frame) { - // TODO: publish all frames - if (evt->which == cereal::Event::ROAD_ENCODE_IDX) { - auto idx = evt->event.getRoadEncodeIdx(); - auto &seg = segments_[idx.getSegmentNum()]; - - if (seg && seg->isLoaded() && idx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) { - auto &frm = seg->frames[RoadCam]; - - if (vipc_server == nullptr) { - cl_device_id device_id = cl_get_device_id(CL_DEVICE_TYPE_DEFAULT); - cl_context context = CL_CHECK_ERR(clCreateContext(NULL, 1, &device_id, NULL, NULL, &err)); - - vipc_server = new VisionIpcServer("camerad", device_id, context); - vipc_server->create_buffers(VisionStreamType::VISION_STREAM_RGB_BACK, UI_BUF_COUNT, - true, frm->width, frm->height); - vipc_server->start_listener(); - } - - uint8_t *dat = frm->get(idx.getSegmentId()); - if (dat) { - VisionIpcBufExtra extra = {}; - VisionBuf *buf = vipc_server->get_buffer(VisionStreamType::VISION_STREAM_RGB_BACK); - memcpy(buf->addr, dat, frm->getRGBSize()); - vipc_server->send(buf, &extra, false); - } - } - } - - // publish msg + publishFrame(evt); } else { + // publish msg if (sm == nullptr) { auto bytes = evt->bytes(); pm->send(sockets_[cur_which], (capnp::byte *)bytes.begin(), bytes.size()); @@ -268,5 +258,8 @@ void Replay::stream() { } } } + + // wait for frame to be sent before unlock.(frameReader may be deleted after unlock) + camera_server_->waitFinish(); } } diff --git a/selfdrive/ui/replay/replay.h b/selfdrive/ui/replay/replay.h index abfac773e3..a9151b2d43 100644 --- a/selfdrive/ui/replay/replay.h +++ b/selfdrive/ui/replay/replay.h @@ -4,6 +4,7 @@ #include #include "cereal/visionipc/visionipc_server.h" +#include "selfdrive/ui/replay/camera.h" #include "selfdrive/ui/replay/route.h" constexpr int FORWARD_SEGS = 2; @@ -33,6 +34,7 @@ protected: void setCurrentSegment(int n); void mergeSegments(int begin_idx, int end_idx); void updateEvents(const std::function& lambda); + void publishFrame(const Event *e); QThread *thread; @@ -55,7 +57,7 @@ protected: SubMaster *sm; PubMaster *pm; std::vector sockets_; - VisionIpcServer *vipc_server = nullptr; std::unique_ptr route_; bool load_dcam = false, load_ecam = false; + std::unique_ptr camera_server_; };