Dean Lee 2 days ago committed by GitHub
commit 30ad66da82
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 87
      system/loggerd/loggerd.cc
  2. 73
      system/loggerd/loggerd.h
  3. 129
      system/loggerd/tests/test_logger.cc

@ -9,18 +9,9 @@
#include "common/params.h"
#include "system/loggerd/encoder/encoder.h"
#include "system/loggerd/loggerd.h"
#include "system/loggerd/video_writer.h"
ExitHandler do_exit;
struct LoggerdState {
LoggerState logger;
std::atomic<double> last_camera_seen_tms{0.0};
std::atomic<int> ready_to_rotate{0}; // count of encoders ready to rotate
int max_waiting = 0;
double last_rotate_tms = 0.; // last rotate time in ms
};
void logger_rotate(LoggerdState *s) {
bool ret =s->logger.next();
assert(ret);
@ -53,17 +44,6 @@ void rotate_if_needed(LoggerdState *s) {
}
}
struct RemoteEncoder {
std::unique_ptr<VideoWriter> writer;
int encoderd_segment_offset;
int current_segment = -1;
std::vector<Message *> q;
int dropped_frames = 0;
bool recording = false;
bool marked_ready_to_rotate = false;
bool seen_first_packet = false;
};
size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEncoder &re, const EncoderInfo &encoder_info) {
auto edata = (event.*(encoder_info.get_encode_data_func))();
auto idx = edata.getIdx();
@ -117,72 +97,35 @@ size_t write_encode_data(LoggerdState *s, cereal::Event::Reader event, RemoteEnc
}
int handle_encoder_msg(LoggerdState *s, Message *msg, std::string &name, struct RemoteEncoder &re, const EncoderInfo &encoder_info) {
int bytes_count = 0;
// extract the message
capnp::FlatArrayMessageReader cmsg(kj::ArrayPtr<capnp::word>((capnp::word *)msg->getData(), msg->getSize() / sizeof(capnp::word)));
auto event = cmsg.getRoot<cereal::Event>();
auto edata = (event.*(encoder_info.get_encode_data_func))();
auto idx = edata.getIdx();
// encoderd can have started long before loggerd
if (!re.seen_first_packet) {
re.seen_first_packet = true;
re.encoderd_segment_offset = idx.getSegmentNum();
LOGD("%s: has encoderd offset %d", name.c_str(), re.encoderd_segment_offset);
}
int offset_segment_num = idx.getSegmentNum() - re.encoderd_segment_offset;
auto encoder_idx = (event.*(encoder_info.get_encode_data_func))().getIdx();
if (offset_segment_num == s->logger.segment()) {
// loggerd is now on the segment that matches this packet
// if this is a new segment, we close any possible old segments, move to the new, and process any queued packets
if (re.current_segment != s->logger.segment()) {
if (re.recording) {
re.writer.reset();
re.recording = false;
}
re.current_segment = s->logger.segment();
re.marked_ready_to_rotate = false;
// we are in this segment now, process any queued messages before this one
if (!re.q.empty()) {
for (auto qmsg : re.q) {
capnp::FlatArrayMessageReader reader({(capnp::word *)qmsg->getData(), qmsg->getSize() / sizeof(capnp::word)});
bytes_count += write_encode_data(s, reader.getRoot<cereal::Event>(), re, encoder_info);
delete qmsg;
}
re.q.clear();
int bytes_count = 0;
// Synchronize segment and process if aligned
if (re.syncSegment(s, name, encoder_idx.getSegmentNum(), s->logger.segment())) {
// Process any queued messages before the current one
if (!re.q.empty()) {
for (auto qmsg : re.q) {
capnp::FlatArrayMessageReader reader({(capnp::word *)qmsg->getData(), qmsg->getSize() / sizeof(capnp::word)});
bytes_count += write_encode_data(s, reader.getRoot<cereal::Event>(), re, encoder_info);
delete qmsg;
}
re.q.clear();
}
// Process the current message
bytes_count += write_encode_data(s, event, re, encoder_info);
delete msg;
} else if (offset_segment_num > s->logger.segment()) {
// encoderd packet has a newer segment, this means encoderd has rolled over
if (!re.marked_ready_to_rotate) {
re.marked_ready_to_rotate = true;
++s->ready_to_rotate;
LOGD("rotate %d -> %d ready %d/%d for %s",
s->logger.segment(), offset_segment_num,
s->ready_to_rotate.load(), s->max_waiting, name.c_str());
}
// TODO: define this behavior, but for now don't leak
if (re.q.size() > MAIN_FPS*10) {
} else {
if (re.q.size() > MAIN_FPS * 10) {
LOGE_100("%s: dropping frame, queue is too large", name.c_str());
delete msg;
} else {
// queue up all the new segment messages, they go in after the rotate
re.q.push_back(msg);
}
} else {
LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d re.encoderd_segment_offset:%d",
name.c_str(), idx.getSegmentNum(), s->logger.segment(), re.encoderd_segment_offset);
// free the message, it's useless. this should never happen
// actually, this can happen if you restart encoderd
re.encoderd_segment_offset = -s->logger.segment();
delete msg;
}
return bytes_count;
}

@ -11,6 +11,7 @@
#include "common/util.h"
#include "system/loggerd/logger.h"
#include "system/loggerd/video_writer.h"
constexpr int MAIN_FPS = 20;
const int MAIN_BITRATE = 1e7;
@ -147,3 +148,75 @@ const LogCameraInfo stream_driver_camera_info{
const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info};
const LogCameraInfo stream_cameras_logged[] = {stream_road_camera_info, stream_wide_road_camera_info, stream_driver_camera_info};
struct LoggerdState {
LoggerState logger;
std::atomic<double> last_camera_seen_tms{0.0};
std::atomic<int> ready_to_rotate{0}; // count of encoders ready to rotate
int max_waiting = 0;
double last_rotate_tms = 0.; // last rotate time in ms
};
class RemoteEncoder {
public:
std::unique_ptr<VideoWriter> writer;
int encoder_segment_offset = 0;
int current_segment = -1;
std::vector<Message *> q;
int dropped_frames = 0;
bool recording = false;
bool marked_ready_to_rotate = false;
bool seen_first_packet = false;
bool syncSegment(LoggerdState *s, const std::string &name, int encoder_segment, int logger_segment) {
if (!seen_first_packet) {
seen_first_packet = true;
encoder_segment_offset = encoder_segment - logger_segment;
LOGD("%s: has encoderd offset %d", name.c_str(), encoder_segment_offset);
}
// Calculate adjusted segment based on offset
int adjusted_segment = encoder_segment - encoder_segment_offset;
// Case 1: Segments are synchronized
if (adjusted_segment == logger_segment) {
if (current_segment != logger_segment) {
// New segment detected; reset writer if recording
if (recording) {
writer.reset();
recording = false;
}
current_segment = logger_segment;
}
marked_ready_to_rotate = false;
return true;
}
// Case 2: Encoder is ahead (newer segment)
if (adjusted_segment > logger_segment) {
int segment_gap = adjusted_segment - logger_segment;
if (segment_gap > 1) {
LOGE("%s: encoder jumped ahead by %d segments (adj=%d, log=%d), adjusting offset",
name.c_str(), segment_gap, adjusted_segment, logger_segment);
encoder_segment_offset += segment_gap - 1;
}
if (!marked_ready_to_rotate) {
marked_ready_to_rotate = true;
++s->ready_to_rotate;
LOGD("rotate %d -> %d ready %d/%d for %s",
logger_segment, adjusted_segment,
s->ready_to_rotate.load(), s->max_waiting, name.c_str());
}
}
// Case 3: Encoder is behind (older segment)
else {
LOGE("%s: encoderd packet has a older segment!!! idx.getSegmentNum():%d s->logger.segment():%d encoder_segment_offset:%d",
name.c_str(), encoder_segment, logger_segment, encoder_segment_offset);
encoder_segment_offset = encoder_segment - logger_segment;
}
return false;
}
};

@ -1,5 +1,6 @@
#include "catch2/catch.hpp"
#include "system/loggerd/logger.h"
#include "system/loggerd/loggerd.h"
typedef cereal::Sentinel::SentinelType SentinelType;
@ -73,3 +74,131 @@ TEST_CASE("logger") {
verify_segment(log_root + "/" + route_name, i, segment_cnt, 1);
}
}
TEST_CASE("RemoteEncoder::syncSegment robustness", "[sync]") {
LoggerdState state;
RemoteEncoder encoder;
std::string name = "test_encoder";
SECTION("Matching segment after offset set") {
REQUIRE(encoder.syncSegment(&state, name, 5, 5) == true); // First packet sets offset
REQUIRE(encoder.encoder_segment_offset == 0); // 5 - 5 = 0
REQUIRE(encoder.current_segment == 5);
REQUIRE(encoder.seen_first_packet == true);
REQUIRE(encoder.marked_ready_to_rotate == false);
REQUIRE(state.ready_to_rotate == 0);
REQUIRE(encoder.syncSegment(&state, name, 7, 7) == true); // 7 - 0 = 7 matches 7
REQUIRE(encoder.current_segment == 7);
REQUIRE(encoder.recording == false);
REQUIRE(encoder.marked_ready_to_rotate == false);
REQUIRE(state.ready_to_rotate == 0);
}
SECTION("Encoder restarts and sends segment 0") {
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); // Initial sync
REQUIRE(encoder.encoder_segment_offset == 0); // 1 - 1 = 0
REQUIRE(encoder.current_segment == 0);
REQUIRE(encoder.syncSegment(&state, name, 0, 2) == false); // 0 - 0 = 0 < 2 (behind)
REQUIRE(encoder.encoder_segment_offset == -2); // Adjusted to 0 - 2 = -2
REQUIRE(encoder.marked_ready_to_rotate == false);
REQUIRE(state.ready_to_rotate == 0);
REQUIRE(encoder.syncSegment(&state, name, 0, 2) == true); // 0 - (-2) = 2 matches 2
REQUIRE(encoder.current_segment == 2);
REQUIRE(encoder.marked_ready_to_rotate == false);
}
SECTION("Encoder restarts and sends segment greater than 0") {
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); // Initial sync
REQUIRE(encoder.encoder_segment_offset == 0); // 0 - 0 = 0
REQUIRE(encoder.current_segment == 0);
REQUIRE(encoder.syncSegment(&state, name, 2, 3) == false); // 2 - 0 = 2 < 3 (behind)
REQUIRE(encoder.encoder_segment_offset == -1); // Adjusted to 2 - 3 = -1
REQUIRE(encoder.marked_ready_to_rotate == false);
REQUIRE(state.ready_to_rotate == 0);
REQUIRE(encoder.syncSegment(&state, name, 2, 3) == true); // 2 - (-1) = 3 matches 3
REQUIRE(encoder.current_segment == 3);
}
SECTION("Logger restarts to lower segment") {
REQUIRE(encoder.syncSegment(&state, name, 5, 5) == true);
REQUIRE(encoder.encoder_segment_offset == 0);
REQUIRE(encoder.current_segment == 5);
// Logger restarts to 0, encoder at 6
REQUIRE(encoder.syncSegment(&state, name, 6, 0) == false); // 6 - 0 = 6 > 0 (ahead)
REQUIRE(encoder.marked_ready_to_rotate == true);
REQUIRE(state.ready_to_rotate == 1);
REQUIRE(encoder.current_segment == 5); // Unchanged until sync
// Encoder advances to 7, logger still at 0
REQUIRE(encoder.syncSegment(&state, name, 7, 0) == false); // 7 - 0 = 7 > 0
REQUIRE(state.ready_to_rotate == 1); // No further increment
}
SECTION("Encoder is ahead by more than one segment") {
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true);
REQUIRE(encoder.encoder_segment_offset == 0);
REQUIRE(encoder.current_segment == 0);
// Encoder jumps to 2, logger at 0
REQUIRE(encoder.syncSegment(&state, name, 2, 0) == false); // 2 - 0 = 2 > 0
REQUIRE(encoder.encoder_segment_offset == 1); // Adjusted: += (2 - 1) = 1
REQUIRE(encoder.marked_ready_to_rotate == true);
REQUIRE(state.ready_to_rotate == 1);
REQUIRE(encoder.syncSegment(&state, name, 2, 0) == false);
// Logger advances to 1, encoder at 2
state.ready_to_rotate = 0;
REQUIRE(encoder.syncSegment(&state, name, 2, 1) == true); // 2 - 1 = 1 matches 1
REQUIRE(encoder.current_segment == 1);
REQUIRE(encoder.marked_ready_to_rotate == false);
REQUIRE(encoder.syncSegment(&state, name, 3, 1) == false); // 3 - 0 = 3 > 0
REQUIRE(state.ready_to_rotate == 1); // No additional increment
}
SECTION("Sync after rotation") {
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true);
REQUIRE(encoder.encoder_segment_offset == 0);
REQUIRE(encoder.syncSegment(&state, name, 1, 0) == false); // 1 - 0 = 1 > 0
REQUIRE(encoder.marked_ready_to_rotate == true);
REQUIRE(state.ready_to_rotate == 1);
// Simulate rotation: logger catches up to encoder
REQUIRE(encoder.syncSegment(&state, name, 1, 1) == true); // 1 - 0 = 1 == 1
REQUIRE(encoder.current_segment == 1);
REQUIRE(encoder.marked_ready_to_rotate == false);
REQUIRE(state.ready_to_rotate == 1); // Not decremented here
}
SECTION("Encoder catches up after being behind") {
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true);
REQUIRE(encoder.encoder_segment_offset == 0);
// Logger advances to 1, encoder sends 0
REQUIRE(encoder.syncSegment(&state, name, 0, 1) == false); // 0 - 0 = 0 < 1
REQUIRE(encoder.encoder_segment_offset == -1); // Adjusted to 0 - 1
// Logger advances to 2, encoder sends 1
REQUIRE(encoder.syncSegment(&state, name, 1, 2) == true); // 1 - (-1) = 2 == 2
REQUIRE(encoder.current_segment == 2);
REQUIRE(encoder.marked_ready_to_rotate == false);
}
SECTION("Recording reset on segment change") {
encoder.recording = true; // Simulate active recording
REQUIRE(encoder.syncSegment(&state, name, 0, 0) == true); // Initial sync
REQUIRE(encoder.encoder_segment_offset == 0);
REQUIRE(encoder.current_segment == 0);
REQUIRE(encoder.syncSegment(&state, name, 1, 1) == true); // 1 - 0 = 1 matches 1
REQUIRE(encoder.current_segment == 1);
REQUIRE(encoder.recording == false); // Recording reset on segment change
}
}

Loading…
Cancel
Save