diff --git a/system/loggerd/loggerd.cc b/system/loggerd/loggerd.cc index 953ae1df32..3727fc4693 100644 --- a/system/loggerd/loggerd.cc +++ b/system/loggerd/loggerd.cc @@ -9,18 +9,9 @@ #include "common/params.h" #include "system/loggerd/encoder/encoder.h" #include "system/loggerd/loggerd.h" -#include "system/loggerd/video_writer.h" ExitHandler do_exit; -struct LoggerdState { - LoggerState logger; - std::atomic last_camera_seen_tms{0.0}; - std::atomic ready_to_rotate{0}; // count of encoders ready to rotate - int max_waiting = 0; - double last_rotate_tms = 0.; // last rotate time in ms -}; - void logger_rotate(LoggerdState *s) { bool ret =s->logger.next(); assert(ret); @@ -53,17 +44,6 @@ void rotate_if_needed(LoggerdState *s) { } } -struct RemoteEncoder { - std::unique_ptr writer; - int encoderd_segment_offset; - int current_segment = -1; - std::vector q; - int dropped_frames = 0; - bool recording = false; - bool marked_ready_to_rotate = false; - bool seen_first_packet = false; -}; - size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEncoder &re, const EncoderInfo &encoder_info) { auto edata = (event.*(encoder_info.get_encode_data_func))(); auto idx = edata.getIdx(); @@ -117,72 +97,35 @@ size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEnc } int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) { - int bytes_count = 0; - - // extract the message capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word))); auto event = cmsg.getRoot(); - auto edata = (event.*(encoder_info.get_encode_data_func))(); - auto idx = edata.getIdx(); - - // encoderd can have started long before loggerd - if (!re.seen_first_packet) { - re.seen_first_packet = true; - re.encoderd_segment_offset = idx.getSegmentNum(); - LOGD("%s: has encoderd offset %d", name.c_str(), re.encoderd_segment_offset); - } - int offset_segment_num = idx.getSegmentNum() - re.encoderd_segment_offset; + auto encoder_idx = (event.*(encoder_info.get_encode_data_func))().getIdx(); - if (offset_segment_num == s->logger.segment()) { - // loggerd is now on the segment that matches this packet - - // if this is a new segment, we close any possible old segments, move to the new, and process any queued packets - if (re.current_segment != s->logger.segment()) { - if (re.recording) { - re.writer.reset(); - re.recording = false; - } - re.current_segment = s->logger.segment(); - re.marked_ready_to_rotate = false; - // we are in this segment now, process any queued messages before this one - if (!re.q.empty()) { - for (auto qmsg : re.q) { - capnp::FlatArrayMessageReader reader({(capnp::word *)qmsg->getData(), qmsg->getSize() / sizeof(capnp::word)}); - bytes_count += write_encode_data(s, reader.getRoot(), re, encoder_info); - delete qmsg; - } - re.q.clear(); + int bytes_count = 0; + // Synchronize segment and process if aligned + if (re.syncSegment(s, name, encoder_idx.getSegmentNum(), s->logger.segment())) { + // Process any queued messages before the current one + if (!re.q.empty()) { + for (auto qmsg : re.q) { + capnp::FlatArrayMessageReader reader({(capnp::word *)qmsg->getData(), qmsg->getSize() / sizeof(capnp::word)}); + bytes_count += write_encode_data(s, reader.getRoot(), re, encoder_info); + delete qmsg; } + re.q.clear(); } + + // Process the current message bytes_count += write_encode_data(s, event, re, encoder_info); delete msg; - } else if (offset_segment_num > s->logger.segment()) { - // encoderd packet has a newer segment, this means encoderd has rolled over - if (!re.marked_ready_to_rotate) { - re.marked_ready_to_rotate = true; - ++s->ready_to_rotate; - LOGD("rotate %d -> %d ready %d/%d for %s", - s->logger.segment(), offset_segment_num, - s->ready_to_rotate.load(), s->max_waiting, name.c_str()); - } - - // TODO: define this behavior, but for now don't leak - if (re.q.size() > MAIN_FPS*10) { + } else { + if (re.q.size() > MAIN_FPS * 10) { LOGE_100("%s: dropping frame, queue is too large", name.c_str()); delete msg; } else { // queue up all the new segment messages, they go in after the rotate re.q.push_back(msg); } - } else { - LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d", - name.c_str(), idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset); - // free the message, it's useless. this should never happen - // actually, this can happen if you restart encoderd - re.encoderd_segment_offset = -s->logger.segment(); - delete msg; } - return bytes_count; } diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h index 27d2d37fc4..b9673b1ff4 100644 --- a/system/loggerd/loggerd.h +++ b/system/loggerd/loggerd.h @@ -11,6 +11,7 @@ #include "common/util.h" #include "system/loggerd/logger.h" +#include "system/loggerd/video_writer.h" constexpr int MAIN_FPS = 20; const int MAIN_BITRATE = 1e7; @@ -147,3 +148,75 @@ const LogCameraInfo stream_driver_camera_info{ const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info}; const LogCameraInfo stream_cameras_logged[] = {stream_road_camera_info, stream_wide_road_camera_info, stream_driver_camera_info}; + +struct LoggerdState { + LoggerState logger; + std::atomic last_camera_seen_tms{0.0}; + std::atomic ready_to_rotate{0}; // count of encoders ready to rotate + int max_waiting = 0; + double last_rotate_tms = 0.; // last rotate time in ms +}; + + +class RemoteEncoder { +public: + std::unique_ptr writer; + int encoder_segment_offset = 0; + int current_segment = -1; + std::vector q; + int dropped_frames = 0; + bool recording = false; + bool marked_ready_to_rotate = false; + bool seen_first_packet = false; + + bool syncSegment(LoggerdState *s, const std::string &name, int encoder_segment, int logger_segment) { + if (!seen_first_packet) { + seen_first_packet = true; + encoder_segment_offset = encoder_segment - logger_segment; + LOGD("%s: has encoderd offset %d", name.c_str(), encoder_segment_offset); + } + + // Calculate adjusted segment based on offset + int adjusted_segment = encoder_segment - encoder_segment_offset; + + // Case 1: Segments are synchronized + if (adjusted_segment == logger_segment) { + if (current_segment != logger_segment) { + // New segment detected; reset writer if recording + if (recording) { + writer.reset(); + recording = false; + } + current_segment = logger_segment; + } + marked_ready_to_rotate = false; + return true; + } + + // Case 2: Encoder is ahead (newer segment) + if (adjusted_segment > logger_segment) { + int segment_gap = adjusted_segment - logger_segment; + if (segment_gap > 1) { + LOGE("%s: encoder jumped ahead by %d segments (adj=%d, log=%d), adjusting offset", + name.c_str(), segment_gap, adjusted_segment, logger_segment); + encoder_segment_offset += segment_gap - 1; + } + + if (!marked_ready_to_rotate) { + marked_ready_to_rotate = true; + ++s->ready_to_rotate; + LOGD("rotate %d -> %d ready %d/%d for %s", + logger_segment, adjusted_segment, + s->ready_to_rotate.load(), s->max_waiting, name.c_str()); + } + + } + // Case 3: Encoder is behind (older segment) + else { + LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d encoder_segment_offset:%d", + name.c_str(), encoder_segment, logger_segment, encoder_segment_offset); + encoder_segment_offset = encoder_segment - logger_segment; + } + return false; + } +}; diff --git a/system/loggerd/tests/test_logger.cc b/system/loggerd/tests/test_logger.cc index 40a45a68d5..70e69d5f54 100644 --- a/system/loggerd/tests/test_logger.cc +++ b/system/loggerd/tests/test_logger.cc @@ -1,5 +1,6 @@ #include "catch2/catch.hpp" #include "system/loggerd/logger.h" +#include "system/loggerd/loggerd.h" typedef cereal::Sentinel::SentinelType SentinelType; @@ -73,3 +74,131 @@ TEST_CASE("logger") { verify_segment(log_root + "/" + route_name, i, segment_cnt, 1); } } + +TEST_CASE("RemoteEncoder::syncSegment robustness", "[sync]") { + LoggerdState state; + RemoteEncoder encoder; + std::string name = "test_encoder"; + + SECTION("Matching segment after offset set") { + REQUIRE(encoder.syncSegment(&state, name, 5, 5) == true); // First packet sets offset + REQUIRE(encoder.encoder_segment_offset == 0); // 5 - 5 = 0 + REQUIRE(encoder.current_segment == 5); + REQUIRE(encoder.seen_first_packet == true); + REQUIRE(encoder.marked_ready_to_rotate == false); + REQUIRE(state.ready_to_rotate == 0); + + REQUIRE(encoder.syncSegment(&state, name, 7, 7) == true); // 7 - 0 = 7 matches 7 + REQUIRE(encoder.current_segment == 7); + REQUIRE(encoder.recording == false); + REQUIRE(encoder.marked_ready_to_rotate == false); + REQUIRE(state.ready_to_rotate == 0); + } + + SECTION("Encoder restarts and sends segment 0") { + REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); // Initial sync + REQUIRE(encoder.encoder_segment_offset == 0); // 1 - 1 = 0 + REQUIRE(encoder.current_segment == 0); + + REQUIRE(encoder.syncSegment(&state, name, 0, 2) == false); // 0 - 0 = 0 < 2 (behind) + REQUIRE(encoder.encoder_segment_offset == -2); // Adjusted to 0 - 2 = -2 + REQUIRE(encoder.marked_ready_to_rotate == false); + REQUIRE(state.ready_to_rotate == 0); + + REQUIRE(encoder.syncSegment(&state, name, 0, 2) == true); // 0 - (-2) = 2 matches 2 + REQUIRE(encoder.current_segment == 2); + REQUIRE(encoder.marked_ready_to_rotate == false); + } + + SECTION("Encoder restarts and sends segment greater than 0") { + REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); // Initial sync + REQUIRE(encoder.encoder_segment_offset == 0); // 0 - 0 = 0 + REQUIRE(encoder.current_segment == 0); + + REQUIRE(encoder.syncSegment(&state, name, 2, 3) == false); // 2 - 0 = 2 < 3 (behind) + REQUIRE(encoder.encoder_segment_offset == -1); // Adjusted to 2 - 3 = -1 + REQUIRE(encoder.marked_ready_to_rotate == false); + REQUIRE(state.ready_to_rotate == 0); + + REQUIRE(encoder.syncSegment(&state, name, 2, 3) == true); // 2 - (-1) = 3 matches 3 + REQUIRE(encoder.current_segment == 3); + } + + SECTION("Logger restarts to lower segment") { + REQUIRE(encoder.syncSegment(&state, name, 5, 5) == true); + REQUIRE(encoder.encoder_segment_offset == 0); + REQUIRE(encoder.current_segment == 5); + + // Logger restarts to 0, encoder at 6 + REQUIRE(encoder.syncSegment(&state, name, 6, 0) == false); // 6 - 0 = 6 > 0 (ahead) + REQUIRE(encoder.marked_ready_to_rotate == true); + REQUIRE(state.ready_to_rotate == 1); + REQUIRE(encoder.current_segment == 5); // Unchanged until sync + + // Encoder advances to 7, logger still at 0 + REQUIRE(encoder.syncSegment(&state, name, 7, 0) == false); // 7 - 0 = 7 > 0 + REQUIRE(state.ready_to_rotate == 1); // No further increment + } + + SECTION("Encoder is ahead by more than one segment") { + REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); + REQUIRE(encoder.encoder_segment_offset == 0); + REQUIRE(encoder.current_segment == 0); + + // Encoder jumps to 2, logger at 0 + REQUIRE(encoder.syncSegment(&state, name, 2, 0) == false); // 2 - 0 = 2 > 0 + REQUIRE(encoder.encoder_segment_offset == 1); // Adjusted: += (2 - 1) = 1 + REQUIRE(encoder.marked_ready_to_rotate == true); + REQUIRE(state.ready_to_rotate == 1); + REQUIRE(encoder.syncSegment(&state, name, 2, 0) == false); + + // Logger advances to 1, encoder at 2 + state.ready_to_rotate = 0; + REQUIRE(encoder.syncSegment(&state, name, 2, 1) == true); // 2 - 1 = 1 matches 1 + REQUIRE(encoder.current_segment == 1); + REQUIRE(encoder.marked_ready_to_rotate == false); + + REQUIRE(encoder.syncSegment(&state, name, 3, 1) == false); // 3 - 0 = 3 > 0 + REQUIRE(state.ready_to_rotate == 1); // No additional increment + } + + SECTION("Sync after rotation") { + REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); + REQUIRE(encoder.encoder_segment_offset == 0); + + REQUIRE(encoder.syncSegment(&state, name, 1, 0) == false); // 1 - 0 = 1 > 0 + REQUIRE(encoder.marked_ready_to_rotate == true); + REQUIRE(state.ready_to_rotate == 1); + + // Simulate rotation: logger catches up to encoder + REQUIRE(encoder.syncSegment(&state, name, 1, 1) == true); // 1 - 0 = 1 == 1 + REQUIRE(encoder.current_segment == 1); + REQUIRE(encoder.marked_ready_to_rotate == false); + REQUIRE(state.ready_to_rotate == 1); // Not decremented here + } + + SECTION("Encoder catches up after being behind") { + REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); + REQUIRE(encoder.encoder_segment_offset == 0); + + // Logger advances to 1, encoder sends 0 + REQUIRE(encoder.syncSegment(&state, name, 0, 1) == false); // 0 - 0 = 0 < 1 + REQUIRE(encoder.encoder_segment_offset == -1); // Adjusted to 0 - 1 + + // Logger advances to 2, encoder sends 1 + REQUIRE(encoder.syncSegment(&state, name, 1, 2) == true); // 1 - (-1) = 2 == 2 + REQUIRE(encoder.current_segment == 2); + REQUIRE(encoder.marked_ready_to_rotate == false); + } + + SECTION("Recording reset on segment change") { + encoder.recording = true; // Simulate active recording + REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); // Initial sync + REQUIRE(encoder.encoder_segment_offset == 0); + REQUIRE(encoder.current_segment == 0); + + REQUIRE(encoder.syncSegment(&state, name, 1, 1) == true); // 1 - 0 = 1 matches 1 + REQUIRE(encoder.current_segment == 1); + REQUIRE(encoder.recording == false); // Recording reset on segment change + } +}