fix sync issues

pull/34840/head
deanlee 2 months ago
parent 33f1b8ca82
commit 492a959f06
  1. 5
      system/loggerd/loggerd.cc
  2. 49
      system/loggerd/loggerd.h
  3. 1
      system/loggerd/tests/test_logger.cc

@ -99,12 +99,11 @@ size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEnc
int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) { int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) {
capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr<capnp::word>((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word))); capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr<capnp::word>((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word)));
auto event = cmsg.getRoot<cereal::Event>(); auto event = cmsg.getRoot<cereal::Event>();
auto edata = (event.*(encoder_info.get_encode_data_func))(); auto encoder_idx = (event.*(encoder_info.get_encode_data_func))().getIdx();
auto idx = edata.getIdx();
int bytes_count = 0; int bytes_count = 0;
// Synchronize segment and process if aligned // Synchronize segment and process if aligned
if (re.syncSegment(s, name, idx.getSegmentNum(), s->logger.segment())) { if (re.syncSegment(s, name, encoder_idx.getSegmentNum(), s->logger.segment())) {
// Process any queued messages before the current one // Process any queued messages before the current one
if (!re.q.empty()) { if (!re.q.empty()) {
for (auto qmsg : re.q) { for (auto qmsg : re.q) {

@ -161,7 +161,7 @@ struct LoggerdState {
class RemoteEncoder { class RemoteEncoder {
public: public:
std::unique_ptr<VideoWriter> writer; std::unique_ptr<VideoWriter> writer;
int encoder_segment_offset; int encoder_segment_offset = 0;
int current_segment = -1; int current_segment = -1;
std::vector<Message *> q; std::vector<Message *> q;
int dropped_frames = 0; int dropped_frames = 0;
@ -169,44 +169,53 @@ public:
bool marked_ready_to_rotate = false; bool marked_ready_to_rotate = false;
bool seen_first_packet = false; bool seen_first_packet = false;
bool syncSegment(LoggerdState *s, const std::string &name, int encoder_segment_num, int log_segment_num) { bool syncSegment(LoggerdState *s, const std::string &name, int encoder_segment, int logger_segment) {
if (!seen_first_packet) { if (!seen_first_packet) {
seen_first_packet = true; seen_first_packet = true;
encoder_segment_offset = log_segment_num; encoder_segment_offset = encoder_segment - logger_segment;
LOGD("%s: has encoderd offset %d", name.c_str(), encoder_segment_offset); LOGD("%s: has encoderd offset %d", name.c_str(), encoder_segment_offset);
} }
int offset_segment_num = encoder_segment_num - encoder_segment_offset;
printf("offset %d encoder_segment_offset: %d, log_segment_num:%d\n", offset_segment_num, encoder_segment_offset, log_segment_num);
if (offset_segment_num == log_segment_num) {
// loggerd is now on the segment that matches this packet
// if this is a new segment, we close any possible old segments, move to the new, and process any queued packets // Calculate adjusted segment based on offset
if (current_segment != log_segment_num) { int adjusted_segment = encoder_segment - encoder_segment_offset;
// Case 1: Segments are synchronized
if (adjusted_segment == logger_segment) {
if (current_segment != logger_segment) {
// New segment detected; reset writer if recording
if (recording) { if (recording) {
writer.reset(); writer.reset();
recording = false; recording = false;
} }
current_segment = log_segment_num; current_segment = logger_segment;
marked_ready_to_rotate = false;
} }
marked_ready_to_rotate = false;
return true; return true;
} }
if (offset_segment_num > log_segment_num) { // Case 2: Encoder is ahead (newer segment)
// encoderd packet has a newer segment, this means encoderd has rolled over if (adjusted_segment > logger_segment) {
int segment_gap = adjusted_segment - logger_segment;
if (segment_gap > 1) {
LOGE("%s: encoder jumped ahead by %d segments (adj=%d, log=%d), adjusting offset",
name.c_str(), segment_gap, adjusted_segment, logger_segment);
encoder_segment_offset += segment_gap - 1;
}
if (!marked_ready_to_rotate) { if (!marked_ready_to_rotate) {
marked_ready_to_rotate = true; marked_ready_to_rotate = true;
++s->ready_to_rotate; ++s->ready_to_rotate;
LOGD("rotate %d -> %d ready %d/%d for %s", LOGD("rotate %d -> %d ready %d/%d for %s",
log_segment_num, offset_segment_num, logger_segment, adjusted_segment,
s->ready_to_rotate.load(), s->max_waiting, name.c_str()); s->ready_to_rotate.load(), s->max_waiting, name.c_str());
} }
} else {
LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoder_segment_offset:%d", }
name.c_str(), encoder_segment_num, log_segment_num, encoder_segment_offset); // Case 3: Encoder is behind (older segment)
// free the message, it's useless. this should never happen else {
// actually, this can happen if you restart encoderd LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d encoder_segment_offset:%d",
encoder_segment_offset = -log_segment_num; name.c_str(), encoder_segment, logger_segment, encoder_segment_offset);
encoder_segment_offset = encoder_segment - logger_segment;
} }
return false; return false;
} }

@ -150,6 +150,7 @@ TEST_CASE("RemoteEncoder::syncSegment robustness", "[sync]") {
REQUIRE(encoder.encoder_segment_offset == 1); // Adjusted: += (2 - 1) = 1 REQUIRE(encoder.encoder_segment_offset == 1); // Adjusted: += (2 - 1) = 1
REQUIRE(encoder.marked_ready_to_rotate == true); REQUIRE(encoder.marked_ready_to_rotate == true);
REQUIRE(state.ready_to_rotate == 1); REQUIRE(state.ready_to_rotate == 1);
REQUIRE(encoder.syncSegment(&state, name, 2, 0) == false);
// Logger advances to 1, encoder at 2 // Logger advances to 1, encoder at 2
state.ready_to_rotate = 0; state.ready_to_rotate = 0;

Loading…
Cancel
Save