From 0000ad2ac0996075481841221e9bcc842f2812d7 Mon Sep 17 00:00:00 2001 From: George Hotz <72895+geohot@users.noreply.github.com> Date: Fri, 20 May 2022 00:02:50 -0700 Subject: [PATCH] loggerd: Fix loggerd encode packet drops (#24599) * hmm, try this * hmm, try this * rewrite handle_encoder_msg * fix new logic * add comments and an assert * handle startup condition better * handle restarts of encoderd Co-authored-by: Comma Device --- selfdrive/loggerd/loggerd.cc | 121 ++++++++++++++++++++++------------- 1 file changed, 78 insertions(+), 43 deletions(-) diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 06fd9c74e5..1a10ab0c07 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -43,27 +43,20 @@ void rotate_if_needed(LoggerdState *s) { struct RemoteEncoder { std::unique_ptr writer; - int segment = -1; + int encoderd_segment_offset; + int current_segment = -1; std::vector q; - int logger_segment = -1; int dropped_frames = 0; bool recording = false; + bool marked_ready_to_rotate = false; + bool seen_first_packet = false; }; int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re) { const LogCameraInfo &cam_info = (name == "driverEncodeData") ? cameras_logged[1] : ((name == "wideRoadEncodeData") ? cameras_logged[2] : ((name == "qRoadEncodeData") ? qcam_info : cameras_logged[0])); - - // rotation happened, process the queue (happens before the current message) int bytes_count = 0; - if (re.logger_segment != s->rotate_segment) { - re.logger_segment = s->rotate_segment; - for (auto &qmsg: re.q) { - bytes_count += handle_encoder_msg(s, qmsg, name, re); - } - re.q.clear(); - } // extract the message capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr((capnp::word *)msg->getData(), msg->getSize())); @@ -74,42 +67,66 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct auto idx = edata.getIdx(); auto flags = idx.getFlags(); - if (!re.recording) { - // only create on iframe - if (flags & V4L2_BUF_FLAG_KEYFRAME) { - if (re.dropped_frames) { - // this should only happen for the first segment, maybe - LOGD("%s: dropped %d non iframe packets before init", name.c_str(), re.dropped_frames); - re.dropped_frames = 0; + // encoderd can have started long before loggerd + if (!re.seen_first_packet) { + re.seen_first_packet = true; + re.encoderd_segment_offset = idx.getSegmentNum(); + LOGD("%s: has encoderd offset %d", name.c_str(), re.encoderd_segment_offset); + } + int offset_segment_num = idx.getSegmentNum() - re.encoderd_segment_offset; + + if (offset_segment_num == s->rotate_segment) { + // loggerd is now on the segment that matches this packet + + // if this is a new segment, we close any possible old segments, move to the new, and process any queued packets + if (re.current_segment != s->rotate_segment) { + if (re.recording) { + re.writer.reset(); + re.recording = false; } - // if we aren't recording, don't create the writer - if (cam_info.record) { - re.writer.reset(new VideoWriter(s->segment_path, - cam_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C, - cam_info.frame_width, cam_info.frame_height, cam_info.fps, idx.getType())); - // write the header - auto header = edata.getHeader(); - re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof()/1000, true, false); + re.current_segment = s->rotate_segment; + re.marked_ready_to_rotate = false; + // we are in this segment now, process any queued messages before this one + if (!re.q.empty()) { + for (auto &qmsg: re.q) { + bytes_count += handle_encoder_msg(s, qmsg, name, re); + } + re.q.clear(); } - re.segment = idx.getSegmentNum(); - re.recording = true; - } else { - ++re.dropped_frames; - return bytes_count; } - } - if (re.segment != idx.getSegmentNum()) { - if (re.recording) { - // encoder is on the next segment, this segment is over so we close the videowriter - re.writer.reset(); - re.recording = false; - ++s->ready_to_rotate; - LOGD("rotate %d -> %d ready %d/%d for %s", re.segment, idx.getSegmentNum(), s->ready_to_rotate.load(), s->max_waiting, name.c_str()); + // if we aren't recording yet, try to start, since we are in the correct segment + if (!re.recording) { + if (flags & V4L2_BUF_FLAG_KEYFRAME) { + // only create on iframe + if (re.dropped_frames) { + // this should only happen for the first segment, maybe + LOGW("%s: dropped %d non iframe packets before init", name.c_str(), re.dropped_frames); + re.dropped_frames = 0; + } + // if we aren't actually recording, don't create the writer + if (cam_info.record) { + re.writer.reset(new VideoWriter(s->segment_path, + cam_info.filename, idx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C, + cam_info.frame_width, cam_info.frame_height, cam_info.fps, idx.getType())); + // write the header + auto header = edata.getHeader(); + re.writer->write((uint8_t *)header.begin(), header.size(), idx.getTimestampEof()/1000, true, false); + } + re.recording = true; + } else { + // this is a sad case when we aren't recording, but don't have an iframe + // nothing we can do but drop the frame + delete msg; + ++re.dropped_frames; + return bytes_count; + } } - // queue up all the new segment messages, they go in after the rotate - re.q.push_back(msg); - } else { + + // we have to be recording if we are here + assert(re.recording); + + // if we are actually writing the video file, do so if (re.writer) { auto data = edata.getData(); re.writer->write((uint8_t *)data.begin(), data.size(), idx.getTimestampEof()/1000, false, flags & V4L2_BUF_FLAG_KEYFRAME); @@ -127,7 +144,25 @@ int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct logger_log(&s->logger, (uint8_t *)new_msg.begin(), new_msg.size(), true); // always in qlog? bytes_count += new_msg.size(); - // this frees the message + // free the message, we used it + delete msg; + } else if (offset_segment_num > s->rotate_segment) { + // encoderd packet has a newer segment, this means encoderd has rolled over + if (!re.marked_ready_to_rotate) { + re.marked_ready_to_rotate = true; + ++s->ready_to_rotate; + LOGD("rotate %d -> %d ready %d/%d for %s", + s->rotate_segment.load(), offset_segment_num, + s->ready_to_rotate.load(), s->max_waiting, name.c_str()); + } + // queue up all the new segment messages, they go in after the rotate + re.q.push_back(msg); + } else { + LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->rotate_segment:%d re.encoderd_segment_offset:%d", + name.c_str(), idx.getSegmentNum(), s->rotate_segment.load(), re.encoderd_segment_offset); + // free the message, it's useless. this should never happen + // actually, this can happen if you restart encoderd + re.encoderd_segment_offset = -s->rotate_segment.load(); delete msg; }