replay/camera: publish each camera in a separate thread (#22591)

* publish frames in a seperate thread for each camera

* cleanup

* cleanup

* prefetch next frame

* cleanup
pull/22618/head
Dean Lee 4 years ago committed by GitHub
parent 7f5ffe54c0
commit 4678903a61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 71
      selfdrive/ui/replay/camera.cc
  2. 24
      selfdrive/ui/replay/camera.h
  3. 6
      selfdrive/ui/replay/replay.cc

@ -5,68 +5,83 @@
const int YUV_BUF_COUNT = 50; const int YUV_BUF_COUNT = 50;
CameraServer::CameraServer(std::pair<int, int> cameras[MAX_CAMERAS]) { CameraServer::CameraServer(std::pair<int, int> camera_size[MAX_CAMERAS]) {
if (cameras) { for (auto &cam : cameras_) {
for (auto type : ALL_CAMERAS) { std::tie(cam.width, cam.height) = camera_size[cam.type];
std::tie(cameras_[type].width, cameras_[type].height) = cameras[type];
}
startVipcServer();
} }
camera_thread_ = std::thread(&CameraServer::thread, this); startVipcServer();
} }
CameraServer::~CameraServer() { CameraServer::~CameraServer() {
queue_.push({}); for (auto &cam : cameras_) {
camera_thread_.join(); if (cam.thread.joinable()) {
cam.queue.push({});
cam.thread.join();
}
}
vipc_server_.reset(nullptr); vipc_server_.reset(nullptr);
} }
void CameraServer::startVipcServer() { void CameraServer::startVipcServer() {
std::cout << (vipc_server_ ? "restart" : "start") << " vipc server" << std::endl;
vipc_server_.reset(new VisionIpcServer("camerad")); vipc_server_.reset(new VisionIpcServer("camerad"));
for (auto &cam : cameras_) { for (auto &cam : cameras_) {
if (cam.width > 0 && cam.height > 0) { if (cam.width > 0 && cam.height > 0) {
std::cout << "camera[" << cam.type << "] frame size " << cam.width << "x" << cam.height << std::endl;
vipc_server_->create_buffers(cam.rgb_type, UI_BUF_COUNT, true, cam.width, cam.height); vipc_server_->create_buffers(cam.rgb_type, UI_BUF_COUNT, true, cam.width, cam.height);
vipc_server_->create_buffers(cam.yuv_type, YUV_BUF_COUNT, false, cam.width, cam.height); vipc_server_->create_buffers(cam.yuv_type, YUV_BUF_COUNT, false, cam.width, cam.height);
if (!cam.thread.joinable()) {
cam.thread = std::thread(&CameraServer::cameraThread, this, std::ref(cam));
}
} }
} }
vipc_server_->start_listener(); vipc_server_->start_listener();
} }
void CameraServer::thread() { void CameraServer::cameraThread(Camera &cam) {
auto read_frame = [&](FrameReader *fr, int frame_id) {
VisionBuf *rgb_buf = vipc_server_->get_buffer(cam.rgb_type);
VisionBuf *yuv_buf = vipc_server_->get_buffer(cam.yuv_type);
bool ret = fr->get(frame_id, (uint8_t *)rgb_buf->addr, (uint8_t *)yuv_buf->addr);
return ret ? std::pair{rgb_buf, yuv_buf} : std::pair{nullptr, nullptr};
};
while (true) { while (true) {
const auto [type, fr, eidx] = queue_.pop(); const auto [fr, eidx] = cam.queue.pop();
if (!fr) break; if (!fr) break;
auto &cam = cameras_[type]; const int id = eidx.getSegmentId();
// restart vipc server if new camera incoming. bool prefetched = (id == cam.cached_id && eidx.getSegmentNum() == cam.cached_seg && cam.cached_buf.first && cam.cached_buf.second);
if (cam.width != fr->width || cam.height != fr->height) { auto [rgb, yuv] = prefetched ? cam.cached_buf : read_frame(fr, id);
cam.width = fr->width;
cam.height = fr->height;
std::cout << "camera[" << type << "] frame size " << cam.width << "x" << cam.height << std::endl;
startVipcServer();
}
// send frame if (rgb && yuv) {
VisionBuf *rgb_buf = vipc_server_->get_buffer(cam.rgb_type);
VisionBuf *yuv_buf = vipc_server_->get_buffer(cam.yuv_type);
if (fr->get(eidx.getSegmentId(), (uint8_t *)rgb_buf->addr, (uint8_t *)yuv_buf->addr)) {
VisionIpcBufExtra extra = { VisionIpcBufExtra extra = {
.frame_id = eidx.getFrameId(), .frame_id = eidx.getFrameId(),
.timestamp_sof = eidx.getTimestampSof(), .timestamp_sof = eidx.getTimestampSof(),
.timestamp_eof = eidx.getTimestampEof(), .timestamp_eof = eidx.getTimestampEof(),
}; };
vipc_server_->send(rgb_buf, &extra, false); vipc_server_->send(rgb, &extra, false);
vipc_server_->send(yuv_buf, &extra, false); vipc_server_->send(yuv, &extra, false);
} else { } else {
std::cout << "camera[" << type << "] failed to get frame:" << eidx.getSegmentId() << std::endl; std::cout << "camera[" << cam.type << "] failed to get frame:" << eidx.getSegmentId() << std::endl;
} }
cam.cached_id = id + 1;
cam.cached_seg = eidx.getSegmentNum();
cam.cached_buf = read_frame(fr, cam.cached_id);
--publishing_; --publishing_;
} }
} }
void CameraServer::pushFrame(CameraType type, FrameReader *fr, const cereal::EncodeIndex::Reader &eidx) { void CameraServer::pushFrame(CameraType type, FrameReader *fr, const cereal::EncodeIndex::Reader &eidx) {
auto &cam = cameras_[type];
if (cam.width != fr->width || cam.height != fr->height) {
cam.width = fr->width;
cam.height = fr->height;
waitFinish();
startVipcServer();
}
++publishing_; ++publishing_;
queue_.push({type, fr, eidx}); cam.queue.push({fr, eidx});
} }

@ -8,7 +8,7 @@
class CameraServer { class CameraServer {
public: public:
CameraServer(std::pair<int, int> cameras[MAX_CAMERAS] = nullptr); CameraServer(std::pair<int, int> camera_size[MAX_CAMERAS] = nullptr);
~CameraServer(); ~CameraServer();
void pushFrame(CameraType type, FrameReader* fr, const cereal::EncodeIndex::Reader& eidx); void pushFrame(CameraType type, FrameReader* fr, const cereal::EncodeIndex::Reader& eidx);
inline void waitFinish() { inline void waitFinish() {
@ -16,24 +16,26 @@ public:
} }
protected: protected:
void startVipcServer();
void thread();
struct Camera { struct Camera {
VisionStreamType rgb_type; CameraType type;
VisionStreamType rgb_type;
VisionStreamType yuv_type; VisionStreamType yuv_type;
int width; int width;
int height; int height;
std::thread thread;
SafeQueue<std::pair<FrameReader*, const cereal::EncodeIndex::Reader>> queue;
int cached_id = -1;
int cached_seg = -1;
std::pair<VisionBuf *, VisionBuf*> cached_buf;
}; };
void startVipcServer();
void cameraThread(Camera &cam);
Camera cameras_[MAX_CAMERAS] = { Camera cameras_[MAX_CAMERAS] = {
{.rgb_type = VISION_STREAM_RGB_BACK, .yuv_type = VISION_STREAM_YUV_BACK}, {.type = RoadCam, .rgb_type = VISION_STREAM_RGB_BACK, .yuv_type = VISION_STREAM_YUV_BACK},
{.rgb_type = VISION_STREAM_RGB_FRONT, .yuv_type = VISION_STREAM_YUV_FRONT}, {.type = DriverCam, .rgb_type = VISION_STREAM_RGB_FRONT, .yuv_type = VISION_STREAM_YUV_FRONT},
{.rgb_type = VISION_STREAM_RGB_WIDE, .yuv_type = VISION_STREAM_YUV_WIDE}, {.type = WideRoadCam, .rgb_type = VISION_STREAM_RGB_WIDE, .yuv_type = VISION_STREAM_YUV_WIDE},
}; };
std::atomic<int> publishing_ = 0; std::atomic<int> publishing_ = 0;
std::thread camera_thread_;
std::unique_ptr<VisionIpcServer> vipc_server_; std::unique_ptr<VisionIpcServer> vipc_server_;
SafeQueue<std::tuple<CameraType, FrameReader*, const cereal::EncodeIndex::Reader>> queue_;
}; };

@ -216,13 +216,13 @@ void Replay::startStream(const Segment *cur_segment) {
} }
// start camera server // start camera server
std::pair<int, int> cameras[MAX_CAMERAS] = {}; std::pair<int, int> camera_size[MAX_CAMERAS] = {};
for (auto type : ALL_CAMERAS) { for (auto type : ALL_CAMERAS) {
if (auto &fr = cur_segment->frames[type]) { if (auto &fr = cur_segment->frames[type]) {
cameras[type] = {fr->width, fr->height}; camera_size[type] = {fr->width, fr->height};
} }
} }
camera_server_ = std::make_unique<CameraServer>(cameras); camera_server_ = std::make_unique<CameraServer>(camera_size);
// start stream thread // start stream thread
stream_thread_ = QThread::create(&Replay::stream, this); stream_thread_ = QThread::create(&Replay::stream, this);

Loading…
Cancel
Save