diff --git a/system/loggerd/loggerd.cc b/system/loggerd/loggerd.cc index 3382e381e5..3727fc4693 100644 --- a/system/loggerd/loggerd.cc +++ b/system/loggerd/loggerd.cc @@ -99,12 +99,11 @@ size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEnc int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) { capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word))); auto event = cmsg.getRoot(); - auto edata = (event.*(encoder_info.get_encode_data_func))(); - auto idx = edata.getIdx(); + auto encoder_idx = (event.*(encoder_info.get_encode_data_func))().getIdx(); int bytes_count = 0; // Synchronize segment and process if aligned - if (re.syncSegment(s, name, idx.getSegmentNum(), s->logger.segment())) { + if (re.syncSegment(s, name, encoder_idx.getSegmentNum(), s->logger.segment())) { // Process any queued messages before the current one if (!re.q.empty()) { for (auto qmsg : re.q) { diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h index e6dbf93b1d..b9673b1ff4 100644 --- a/system/loggerd/loggerd.h +++ b/system/loggerd/loggerd.h @@ -161,7 +161,7 @@ struct LoggerdState { class RemoteEncoder { public: std::unique_ptr writer; - int encoder_segment_offset; + int encoder_segment_offset = 0; int current_segment = -1; std::vector q; int dropped_frames = 0; @@ -169,44 +169,53 @@ public: bool marked_ready_to_rotate = false; bool seen_first_packet = false; - bool syncSegment(LoggerdState *s, const std::string &name, int encoder_segment_num, int log_segment_num) { + bool syncSegment(LoggerdState *s, const std::string &name, int encoder_segment, int logger_segment) { if (!seen_first_packet) { seen_first_packet = true; - encoder_segment_offset = log_segment_num; + encoder_segment_offset = encoder_segment - logger_segment; LOGD("%s: has encoderd offset %d", name.c_str(), encoder_segment_offset); } - int offset_segment_num = encoder_segment_num - encoder_segment_offset; - printf("offset %d encoder_segment_offset: %d, log_segment_num:%d\n", offset_segment_num, encoder_segment_offset, log_segment_num); - if (offset_segment_num == log_segment_num) { - // loggerd is now on the segment that matches this packet - // if this is a new segment, we close any possible old segments, move to the new, and process any queued packets - if (current_segment != log_segment_num) { + // Calculate adjusted segment based on offset + int adjusted_segment = encoder_segment - encoder_segment_offset; + + // Case 1: Segments are synchronized + if (adjusted_segment == logger_segment) { + if (current_segment != logger_segment) { + // New segment detected; reset writer if recording if (recording) { writer.reset(); recording = false; } - current_segment = log_segment_num; - marked_ready_to_rotate = false; + current_segment = logger_segment; } + marked_ready_to_rotate = false; return true; } - if (offset_segment_num > log_segment_num) { - // encoderd packet has a newer segment, this means encoderd has rolled over + // Case 2: Encoder is ahead (newer segment) + if (adjusted_segment > logger_segment) { + int segment_gap = adjusted_segment - logger_segment; + if (segment_gap > 1) { + LOGE("%s: encoder jumped ahead by %d segments (adj=%d, log=%d), adjusting offset", + name.c_str(), segment_gap, adjusted_segment, logger_segment); + encoder_segment_offset += segment_gap - 1; + } + if (!marked_ready_to_rotate) { marked_ready_to_rotate = true; ++s->ready_to_rotate; LOGD("rotate %d -> %d ready %d/%d for %s", - log_segment_num, offset_segment_num, + logger_segment, adjusted_segment, s->ready_to_rotate.load(), s->max_waiting, name.c_str()); } - } else { - LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoder_segment_offset:%d", - name.c_str(), encoder_segment_num, log_segment_num, encoder_segment_offset); - // free the message, it's useless. this should never happen - // actually, this can happen if you restart encoderd - encoder_segment_offset = -log_segment_num; + + } + // Case 3: Encoder is behind (older segment) + else { + LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d encoder_segment_offset:%d", + name.c_str(), encoder_segment, logger_segment, encoder_segment_offset); + encoder_segment_offset = encoder_segment - logger_segment; } return false; } diff --git a/system/loggerd/tests/test_logger.cc b/system/loggerd/tests/test_logger.cc index c549f71e3e..70e69d5f54 100644 --- a/system/loggerd/tests/test_logger.cc +++ b/system/loggerd/tests/test_logger.cc @@ -150,6 +150,7 @@ TEST_CASE("RemoteEncoder::syncSegment robustness", "[sync]") { REQUIRE(encoder.encoder_segment_offset == 1); // Adjusted: += (2 - 1) = 1 REQUIRE(encoder.marked_ready_to_rotate == true); REQUIRE(state.ready_to_rotate == 1); + REQUIRE(encoder.syncSegment(&state, name, 2, 0) == false); // Logger advances to 1, encoder at 2 state.ready_to_rotate = 0;