From 0ca15a3d76c21e9d839449ae1e2cc9aef686f1cb Mon Sep 17 00:00:00 2001 From: deanlee Date: Tue, 11 Mar 2025 17:10:30 +0800 Subject: [PATCH] add unit tests to catch sync issues --- system/loggerd/loggerd.cc | 84 ++++----------------- system/loggerd/loggerd.h | 64 ++++++++++++++++ system/loggerd/tests/test_logger.cc | 109 ++++++++++++++++++++++++++++ 3 files changed, 187 insertions(+), 70 deletions(-) diff --git a/system/loggerd/loggerd.cc b/system/loggerd/loggerd.cc index 953ae1df32..3382e381e5 100644 --- a/system/loggerd/loggerd.cc +++ b/system/loggerd/loggerd.cc @@ -9,18 +9,9 @@ #include "common/params.h" #include "system/loggerd/encoder/encoder.h" #include "system/loggerd/loggerd.h" -#include "system/loggerd/video_writer.h" ExitHandler do_exit; -struct LoggerdState { - LoggerState logger; - std::atomic last_camera_seen_tms{0.0}; - std::atomic ready_to_rotate{0}; // count of encoders ready to rotate - int max_waiting = 0; - double last_rotate_tms = 0.; // last rotate time in ms -}; - void logger_rotate(LoggerdState *s) { bool ret =s->logger.next(); assert(ret); @@ -53,17 +44,6 @@ void rotate_if_needed(LoggerdState *s) { } } -struct RemoteEncoder { - std::unique_ptr writer; - int encoderd_segment_offset; - int current_segment = -1; - std::vector q; - int dropped_frames = 0; - bool recording = false; - bool marked_ready_to_rotate = false; - bool seen_first_packet = false; -}; - size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEncoder &re, const EncoderInfo &encoder_info) { auto edata = (event.*(encoder_info.get_encode_data_func))(); auto idx = edata.getIdx(); @@ -117,72 +97,36 @@ size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEnc } int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) { - int bytes_count = 0; - - // extract the message capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word))); auto event = cmsg.getRoot(); auto edata = (event.*(encoder_info.get_encode_data_func))(); auto idx = edata.getIdx(); - // encoderd can have started long before loggerd - if (!re.seen_first_packet) { - re.seen_first_packet = true; - re.encoderd_segment_offset = idx.getSegmentNum(); - LOGD("%s: has encoderd offset %d", name.c_str(), re.encoderd_segment_offset); - } - int offset_segment_num = idx.getSegmentNum() - re.encoderd_segment_offset; - - if (offset_segment_num == s->logger.segment()) { - // loggerd is now on the segment that matches this packet - - // if this is a new segment, we close any possible old segments, move to the new, and process any queued packets - if (re.current_segment != s->logger.segment()) { - if (re.recording) { - re.writer.reset(); - re.recording = false; - } - re.current_segment = s->logger.segment(); - re.marked_ready_to_rotate = false; - // we are in this segment now, process any queued messages before this one - if (!re.q.empty()) { - for (auto qmsg : re.q) { - capnp::FlatArrayMessageReader reader({(capnp::word *)qmsg->getData(), qmsg->getSize() / sizeof(capnp::word)}); - bytes_count += write_encode_data(s, reader.getRoot(), re, encoder_info); - delete qmsg; - } - re.q.clear(); + int bytes_count = 0; + // Synchronize segment and process if aligned + if (re.syncSegment(s, name, idx.getSegmentNum(), s->logger.segment())) { + // Process any queued messages before the current one + if (!re.q.empty()) { + for (auto qmsg : re.q) { + capnp::FlatArrayMessageReader reader({(capnp::word *)qmsg->getData(), qmsg->getSize() / sizeof(capnp::word)}); + bytes_count += write_encode_data(s, reader.getRoot(), re, encoder_info); + delete qmsg; } + re.q.clear(); } + + // Process the current message bytes_count += write_encode_data(s, event, re, encoder_info); delete msg; - } else if (offset_segment_num > s->logger.segment()) { - // encoderd packet has a newer segment, this means encoderd has rolled over - if (!re.marked_ready_to_rotate) { - re.marked_ready_to_rotate = true; - ++s->ready_to_rotate; - LOGD("rotate %d -> %d ready %d/%d for %s", - s->logger.segment(), offset_segment_num, - s->ready_to_rotate.load(), s->max_waiting, name.c_str()); - } - - // TODO: define this behavior, but for now don't leak - if (re.q.size() > MAIN_FPS*10) { + } else { + if (re.q.size() > MAIN_FPS * 10) { LOGE_100("%s: dropping frame, queue is too large", name.c_str()); delete msg; } else { // queue up all the new segment messages, they go in after the rotate re.q.push_back(msg); } - } else { - LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d", - name.c_str(), idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset); - // free the message, it's useless. this should never happen - // actually, this can happen if you restart encoderd - re.encoderd_segment_offset = -s->logger.segment(); - delete msg; } - return bytes_count; } diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h index 27d2d37fc4..7c619701aa 100644 --- a/system/loggerd/loggerd.h +++ b/system/loggerd/loggerd.h @@ -11,6 +11,7 @@ #include "common/util.h" #include "system/loggerd/logger.h" +#include "system/loggerd/video_writer.h" constexpr int MAIN_FPS = 20; const int MAIN_BITRATE = 1e7; @@ -147,3 +148,66 @@ const LogCameraInfo stream_driver_camera_info{ const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info}; const LogCameraInfo stream_cameras_logged[] = {stream_road_camera_info, stream_wide_road_camera_info, stream_driver_camera_info}; + +struct LoggerdState { + LoggerState logger; + std::atomic last_camera_seen_tms{0.0}; + std::atomic ready_to_rotate{0}; // count of encoders ready to rotate + int max_waiting = 0; + double last_rotate_tms = 0.; // last rotate time in ms +}; + + +class RemoteEncoder { +public: + std::unique_ptr writer; + int encoderd_segment_offset; + int current_segment = -1; + std::vector q; + int dropped_frames = 0; + bool recording = false; + bool marked_ready_to_rotate = false; + bool seen_first_packet = false; + + bool syncSegment(LoggerdState *s, const std::string &name, int encoder_segment_num, int log_segment_num) { + if (!seen_first_packet) { + seen_first_packet = true; + encoderd_segment_offset = log_segment_num; + LOGD("%s: has encoderd offset %d", name.c_str(), encoderd_segment_offset); + } + int offset_segment_num = encoder_segment_num - encoderd_segment_offset; + printf("offset %d encoderd_segment_offset: %d, log_segment_num:%d\n", offset_segment_num, encoderd_segment_offset, log_segment_num); + if (offset_segment_num == log_segment_num) { + // loggerd is now on the segment that matches this packet + + // if this is a new segment, we close any possible old segments, move to the new, and process any queued packets + if (current_segment != log_segment_num) { + if (recording) { + writer.reset(); + recording = false; + } + current_segment = log_segment_num; + marked_ready_to_rotate = false; + } + return true; + } + + if (offset_segment_num > log_segment_num) { + // encoderd packet has a newer segment, this means encoderd has rolled over + if (!marked_ready_to_rotate) { + marked_ready_to_rotate = true; + ++s->ready_to_rotate; + LOGD("rotate %d -> %d ready %d/%d for %s", + log_segment_num, offset_segment_num, + s->ready_to_rotate.load(), s->max_waiting, name.c_str()); + } + } else { + LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d", + name.c_str(), encoder_segment_num, log_segment_num, encoderd_segment_offset); + // free the message, it's useless. this should never happen + // actually, this can happen if you restart encoderd + encoderd_segment_offset = -log_segment_num; + } + return false; + } +}; diff --git a/system/loggerd/tests/test_logger.cc b/system/loggerd/tests/test_logger.cc index 40a45a68d5..ade65ea1e6 100644 --- a/system/loggerd/tests/test_logger.cc +++ b/system/loggerd/tests/test_logger.cc @@ -1,5 +1,6 @@ #include "catch2/catch.hpp" #include "system/loggerd/logger.h" +#include "system/loggerd/loggerd.h" typedef cereal::Sentinel::SentinelType SentinelType; @@ -73,3 +74,111 @@ TEST_CASE("logger") { verify_segment(log_root + "/" + route_name, i, segment_cnt, 1); } } + +TEST_CASE("RemoteEncoder::syncSegment robustness", "[sync]") { + LoggerdState state; + RemoteEncoder encoder; + std::string name = "test_encoder"; + + SECTION("Matching segment after offset set") { + REQUIRE(encoder.syncSegment(&state, name, 5, 5) == true); + REQUIRE(encoder.encoderd_segment_offset == 0); // 5 - 5 = 0 + REQUIRE(encoder.current_segment == 5); + REQUIRE(encoder.seen_first_packet == true); + REQUIRE(encoder.marked_ready_to_rotate == false); + REQUIRE(state.ready_to_rotate == 0); + + REQUIRE(encoder.syncSegment(&state, name, 7, 7) == true); // 7 - 0 = 7 == 7 + REQUIRE(encoder.current_segment == 7); + REQUIRE(encoder.recording == false); + REQUIRE(encoder.marked_ready_to_rotate == false); + REQUIRE(state.ready_to_rotate == 0); + } + + SECTION("Encoder restarts and sends segment 0") { + REQUIRE(encoder.syncSegment(&state, name, 1, 1) == true); + REQUIRE(encoder.encoderd_segment_offset == 0); // 1 - 1 = 0 + REQUIRE(encoder.current_segment == 1); + + REQUIRE(encoder.syncSegment(&state, name, 0, 2) == false); // 0 - 0 = 0 < 2 + REQUIRE(encoder.encoderd_segment_offset == -2); // Adjusted to 0 - 2 + REQUIRE(encoder.marked_ready_to_rotate == false); + REQUIRE(state.ready_to_rotate == 0); + + REQUIRE(encoder.syncSegment(&state, name, 0, 2) == true); // 0 - (-2) = 2 == 2 + REQUIRE(encoder.current_segment == 2); + REQUIRE(encoder.marked_ready_to_rotate == false); + } + + SECTION("Encoder restarts and sends segment greater than 0") { + REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); + REQUIRE(encoder.encoderd_segment_offset == 0); // 0 - 0 = 0 + REQUIRE(encoder.current_segment == 0); + + REQUIRE(encoder.syncSegment(&state, name, 2, 3) == false); // 2 - 0 = 2 < 3 + REQUIRE(encoder.encoderd_segment_offset == -1); // Adjusted to 2 - 3 + REQUIRE(encoder.marked_ready_to_rotate == false); + REQUIRE(state.ready_to_rotate == 0); + + REQUIRE(encoder.syncSegment(&state, name, 2, 3) == true); // 2 - (-1) = 3 == 3 + REQUIRE(encoder.current_segment == 3); + } + + SECTION("Logger restarts to lower segment") { + REQUIRE(encoder.syncSegment(&state, name, 5, 5) == true); + REQUIRE(encoder.encoderd_segment_offset == 0); + REQUIRE(encoder.current_segment == 5); + + // Logger restarts to segment 0, encoder continues at 6 + REQUIRE(encoder.syncSegment(&state, name, 6, 0) == false); // 6 - 0 = 6 > 0 + REQUIRE(encoder.marked_ready_to_rotate == true); + REQUIRE(state.ready_to_rotate == 1); + REQUIRE(encoder.current_segment == 5); // Unchanged until sync + + // Subsequent call with next segment + REQUIRE(encoder.syncSegment(&state, name, 7, 0) == false); // 7 - 0 = 7 > 0 + REQUIRE(state.ready_to_rotate == 1); // No further increment + } + + SECTION("Encoder is ahead by more than one segment") { + REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); + REQUIRE(encoder.encoderd_segment_offset == 0); + REQUIRE(encoder.current_segment == 0); + + REQUIRE(encoder.syncSegment(&state, name, 2, 0) == false); // 2 - 0 = 2 > 0 + REQUIRE(encoder.marked_ready_to_rotate == true); + REQUIRE(state.ready_to_rotate == 1); + + REQUIRE(encoder.syncSegment(&state, name, 3, 0) == false); // 3 - 0 = 3 > 0 + REQUIRE(state.ready_to_rotate == 1); // No additional increment + } + + SECTION("Sync after rotation") { + REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); + REQUIRE(encoder.encoderd_segment_offset == 0); + + REQUIRE(encoder.syncSegment(&state, name, 1, 0) == false); // 1 - 0 = 1 > 0 + REQUIRE(encoder.marked_ready_to_rotate == true); + REQUIRE(state.ready_to_rotate == 1); + + // Simulate rotation: logger catches up to encoder + REQUIRE(encoder.syncSegment(&state, name, 1, 1) == true); // 1 - 0 = 1 == 1 + REQUIRE(encoder.current_segment == 1); + REQUIRE(encoder.marked_ready_to_rotate == false); + REQUIRE(state.ready_to_rotate == 1); // Not decremented here + } + + SECTION("Encoder catches up after being behind") { + REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); + REQUIRE(encoder.encoderd_segment_offset == 0); + + // Logger advances to 1, encoder sends 0 + REQUIRE(encoder.syncSegment(&state, name, 0, 1) == false); // 0 - 0 = 0 < 1 + REQUIRE(encoder.encoderd_segment_offset == -1); // Adjusted to 0 - 1 + + // Logger advances to 2, encoder sends 1 + REQUIRE(encoder.syncSegment(&state, name, 1, 2) == true); // 1 - (-1) = 2 == 2 + REQUIRE(encoder.current_segment == 2); + REQUIRE(encoder.marked_ready_to_rotate == false); + } +}