replay/camera: publish each camera in a separate thread (#22591)

* publish frames in a seperate thread for each camera

* cleanup

* cleanup

* prefetch next frame

* cleanup
old-commit-hash: 4678903a61
commatwo_master
Dean Lee 4 years ago committed by GitHub
parent 3ab1ab4b91
commit ed5bd9719b
  1. 71
      selfdrive/ui/replay/camera.cc
  2. 24
      selfdrive/ui/replay/camera.h
  3. 6
      selfdrive/ui/replay/replay.cc

@ -5,68 +5,83 @@
const int YUV_BUF_COUNT = 50;
CameraServer::CameraServer(std::pair<int, int> cameras[MAX_CAMERAS]) {
if (cameras) {
for (auto type : ALL_CAMERAS) {
std::tie(cameras_[type].width, cameras_[type].height) = cameras[type];
}
startVipcServer();
CameraServer::CameraServer(std::pair<int, int> camera_size[MAX_CAMERAS]) {
for (auto &cam : cameras_) {
std::tie(cam.width, cam.height) = camera_size[cam.type];
}
camera_thread_ = std::thread(&CameraServer::thread, this);
startVipcServer();
}
CameraServer::~CameraServer() {
queue_.push({});
camera_thread_.join();
for (auto &cam : cameras_) {
if (cam.thread.joinable()) {
cam.queue.push({});
cam.thread.join();
}
}
vipc_server_.reset(nullptr);
}
void CameraServer::startVipcServer() {
std::cout << (vipc_server_ ? "restart" : "start") << " vipc server" << std::endl;
vipc_server_.reset(new VisionIpcServer("camerad"));
for (auto &cam : cameras_) {
if (cam.width > 0 && cam.height > 0) {
std::cout << "camera[" << cam.type << "] frame size " << cam.width << "x" << cam.height << std::endl;
vipc_server_->create_buffers(cam.rgb_type, UI_BUF_COUNT, true, cam.width, cam.height);
vipc_server_->create_buffers(cam.yuv_type, YUV_BUF_COUNT, false, cam.width, cam.height);
if (!cam.thread.joinable()) {
cam.thread = std::thread(&CameraServer::cameraThread, this, std::ref(cam));
}
}
}
vipc_server_->start_listener();
}
void CameraServer::thread() {
void CameraServer::cameraThread(Camera &cam) {
auto read_frame = [&](FrameReader *fr, int frame_id) {
VisionBuf *rgb_buf = vipc_server_->get_buffer(cam.rgb_type);
VisionBuf *yuv_buf = vipc_server_->get_buffer(cam.yuv_type);
bool ret = fr->get(frame_id, (uint8_t *)rgb_buf->addr, (uint8_t *)yuv_buf->addr);
return ret ? std::pair{rgb_buf, yuv_buf} : std::pair{nullptr, nullptr};
};
while (true) {
const auto [type, fr, eidx] = queue_.pop();
const auto [fr, eidx] = cam.queue.pop();
if (!fr) break;
auto &cam = cameras_[type];
// restart vipc server if new camera incoming.
if (cam.width != fr->width || cam.height != fr->height) {
cam.width = fr->width;
cam.height = fr->height;
std::cout << "camera[" << type << "] frame size " << cam.width << "x" << cam.height << std::endl;
startVipcServer();
}
const int id = eidx.getSegmentId();
bool prefetched = (id == cam.cached_id && eidx.getSegmentNum() == cam.cached_seg && cam.cached_buf.first && cam.cached_buf.second);
auto [rgb, yuv] = prefetched ? cam.cached_buf : read_frame(fr, id);
// send frame
VisionBuf *rgb_buf = vipc_server_->get_buffer(cam.rgb_type);
VisionBuf *yuv_buf = vipc_server_->get_buffer(cam.yuv_type);
if (fr->get(eidx.getSegmentId(), (uint8_t *)rgb_buf->addr, (uint8_t *)yuv_buf->addr)) {
if (rgb && yuv) {
VisionIpcBufExtra extra = {
.frame_id = eidx.getFrameId(),
.timestamp_sof = eidx.getTimestampSof(),
.timestamp_eof = eidx.getTimestampEof(),
};
vipc_server_->send(rgb_buf, &extra, false);
vipc_server_->send(yuv_buf, &extra, false);
vipc_server_->send(rgb, &extra, false);
vipc_server_->send(yuv, &extra, false);
} else {
std::cout << "camera[" << type << "] failed to get frame:" << eidx.getSegmentId() << std::endl;
std::cout << "camera[" << cam.type << "] failed to get frame:" << eidx.getSegmentId() << std::endl;
}
cam.cached_id = id + 1;
cam.cached_seg = eidx.getSegmentNum();
cam.cached_buf = read_frame(fr, cam.cached_id);
--publishing_;
}
}
void CameraServer::pushFrame(CameraType type, FrameReader *fr, const cereal::EncodeIndex::Reader &eidx) {
auto &cam = cameras_[type];
if (cam.width != fr->width || cam.height != fr->height) {
cam.width = fr->width;
cam.height = fr->height;
waitFinish();
startVipcServer();
}
++publishing_;
queue_.push({type, fr, eidx});
cam.queue.push({fr, eidx});
}

@ -8,7 +8,7 @@
class CameraServer {
public:
CameraServer(std::pair<int, int> cameras[MAX_CAMERAS] = nullptr);
CameraServer(std::pair<int, int> camera_size[MAX_CAMERAS] = nullptr);
~CameraServer();
void pushFrame(CameraType type, FrameReader* fr, const cereal::EncodeIndex::Reader& eidx);
inline void waitFinish() {
@ -16,24 +16,26 @@ public:
}
protected:
void startVipcServer();
void thread();
struct Camera {
VisionStreamType rgb_type;
CameraType type;
VisionStreamType rgb_type;
VisionStreamType yuv_type;
int width;
int height;
std::thread thread;
SafeQueue<std::pair<FrameReader*, const cereal::EncodeIndex::Reader>> queue;
int cached_id = -1;
int cached_seg = -1;
std::pair<VisionBuf *, VisionBuf*> cached_buf;
};
void startVipcServer();
void cameraThread(Camera &cam);
Camera cameras_[MAX_CAMERAS] = {
{.rgb_type = VISION_STREAM_RGB_BACK, .yuv_type = VISION_STREAM_YUV_BACK},
{.rgb_type = VISION_STREAM_RGB_FRONT, .yuv_type = VISION_STREAM_YUV_FRONT},
{.rgb_type = VISION_STREAM_RGB_WIDE, .yuv_type = VISION_STREAM_YUV_WIDE},
{.type = RoadCam, .rgb_type = VISION_STREAM_RGB_BACK, .yuv_type = VISION_STREAM_YUV_BACK},
{.type = DriverCam, .rgb_type = VISION_STREAM_RGB_FRONT, .yuv_type = VISION_STREAM_YUV_FRONT},
{.type = WideRoadCam, .rgb_type = VISION_STREAM_RGB_WIDE, .yuv_type = VISION_STREAM_YUV_WIDE},
};
std::atomic<int> publishing_ = 0;
std::thread camera_thread_;
std::unique_ptr<VisionIpcServer> vipc_server_;
SafeQueue<std::tuple<CameraType, FrameReader*, const cereal::EncodeIndex::Reader>> queue_;
};

@ -216,13 +216,13 @@ void Replay::startStream(const Segment *cur_segment) {
}
// start camera server
std::pair<int, int> cameras[MAX_CAMERAS] = {};
std::pair<int, int> camera_size[MAX_CAMERAS] = {};
for (auto type : ALL_CAMERAS) {
if (auto &fr = cur_segment->frames[type]) {
cameras[type] = {fr->width, fr->height};
camera_size[type] = {fr->width, fr->height};
}
}
camera_server_ = std::make_unique<CameraServer>(cameras);
camera_server_ = std::make_unique<CameraServer>(camera_size);
// start stream thread
stream_thread_ = QThread::create(&Replay::stream, this);

Loading…
Cancel
Save