loggerd: add test case for trigger_rotate (#23038)

* test rotate

* remove global LoggerdState
pull/23053/head
Dean Lee 4 years ago committed by GitHub
parent 5ae5174509
commit 1d323e0fd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 99
      selfdrive/loggerd/loggerd.cc
  2. 4
      selfdrive/loggerd/loggerd.h
  3. 43
      selfdrive/loggerd/tests/test_loggerd.cc

@ -2,30 +2,42 @@
ExitHandler do_exit; ExitHandler do_exit;
LoggerdState s;
// Handle initial encoder syncing by waiting for all encoders to reach the same frame id // Handle initial encoder syncing by waiting for all encoders to reach the same frame id
bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id) { bool sync_encoders(LoggerdState *s, CameraType cam_type, uint32_t frame_id) {
if (state->camera_synced[cam_type]) return true; if (s->camera_synced[cam_type]) return true;
if (state->max_waiting > 1 && state->encoders_ready != state->max_waiting) { if (s->max_waiting > 1 && s->encoders_ready != s->max_waiting) {
// add a small margin to the start frame id in case one of the encoders already dropped the next frame // add a small margin to the start frame id in case one of the encoders already dropped the next frame
update_max_atomic(state->start_frame_id, frame_id + 2); update_max_atomic(s->start_frame_id, frame_id + 2);
if (std::exchange(state->camera_ready[cam_type], true) == false) { if (std::exchange(s->camera_ready[cam_type], true) == false) {
++state->encoders_ready; ++s->encoders_ready;
LOGE("camera %d encoder ready", cam_type); LOGE("camera %d encoder ready", cam_type);
} }
return false; return false;
} else { } else {
if (state->max_waiting == 1) update_max_atomic(state->start_frame_id, frame_id); if (s->max_waiting == 1) update_max_atomic(s->start_frame_id, frame_id);
bool synced = frame_id >= state->start_frame_id; bool synced = frame_id >= s->start_frame_id;
state->camera_synced[cam_type] = synced; s->camera_synced[cam_type] = synced;
if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, (int)state->start_frame_id, frame_id); if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, (int)s->start_frame_id, frame_id);
return synced; return synced;
} }
} }
void encoder_thread(const LogCameraInfo &cam_info) { bool trigger_rotate_if_needed(LoggerdState *s, int cur_seg, uint32_t frame_id) {
const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS;
if (cur_seg >= 0 && frame_id >= ((cur_seg + 1) * frames_per_seg) + s->start_frame_id) {
// trigger rotate and wait until the main logger has rotated to the new segment
++s->ready_to_rotate;
std::unique_lock lk(s->rotate_lock);
s->rotate_cv.wait(lk, [&] {
return s->rotate_segment > cur_seg || do_exit;
});
return !do_exit;
}
return false;
}
void encoder_thread(LoggerdState *s, const LogCameraInfo &cam_info) {
set_thread_name(cam_info.filename); set_thread_name(cam_info.filename);
int cur_seg = -1; int cur_seg = -1;
@ -62,37 +74,29 @@ void encoder_thread(const LogCameraInfo &cam_info) {
if (buf == nullptr) continue; if (buf == nullptr) continue;
if (cam_info.trigger_rotate) { if (cam_info.trigger_rotate) {
s.last_camera_seen_tms = millis_since_boot(); s->last_camera_seen_tms = millis_since_boot();
if (!sync_encoders(&s, cam_info.type, extra.frame_id)) { if (!sync_encoders(s, cam_info.type, extra.frame_id)) {
continue; continue;
} }
// check if we're ready to rotate // check if we're ready to rotate
const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS; trigger_rotate_if_needed(s, cur_seg, extra.frame_id);
if (cur_seg >= 0 && extra.frame_id >= ((cur_seg+1) * frames_per_seg) + s.start_frame_id) { if (do_exit) break;
// trigger rotate and wait until the main logger has rotated to the new segment
++s.ready_to_rotate;
std::unique_lock lk(s.rotate_lock);
s.rotate_cv.wait(lk, [&] {
return s.rotate_segment > cur_seg || do_exit;
});
if (do_exit) break;
}
} }
// rotate the encoder if the logger is on a newer segment // rotate the encoder if the logger is on a newer segment
if (s.rotate_segment > cur_seg) { if (s->rotate_segment > cur_seg) {
cur_seg = s.rotate_segment; cur_seg = s->rotate_segment;
LOGW("camera %d rotate encoder to %s", cam_info.type, s.segment_path); LOGW("camera %d rotate encoder to %s", cam_info.type, s->segment_path);
for (auto &e : encoders) { for (auto &e : encoders) {
e->encoder_close(); e->encoder_close();
e->encoder_open(s.segment_path); e->encoder_open(s->segment_path);
} }
if (lh) { if (lh) {
lh_close(lh); lh_close(lh);
} }
lh = logger_get_handle(&s.logger); lh = logger_get_handle(&s->logger);
} }
// encode a frame // encode a frame
@ -157,31 +161,31 @@ void clear_locks() {
ftw(LOG_ROOT.c_str(), clear_locks_fn, 16); ftw(LOG_ROOT.c_str(), clear_locks_fn, 16);
} }
void logger_rotate() { void logger_rotate(LoggerdState *s) {
{ {
std::unique_lock lk(s.rotate_lock); std::unique_lock lk(s->rotate_lock);
int segment = -1; int segment = -1;
int err = logger_next(&s.logger, LOG_ROOT.c_str(), s.segment_path, sizeof(s.segment_path), &segment); int err = logger_next(&s->logger, LOG_ROOT.c_str(), s->segment_path, sizeof(s->segment_path), &segment);
assert(err == 0); assert(err == 0);
s.rotate_segment = segment; s->rotate_segment = segment;
s.ready_to_rotate = 0; s->ready_to_rotate = 0;
s.last_rotate_tms = millis_since_boot(); s->last_rotate_tms = millis_since_boot();
} }
s.rotate_cv.notify_all(); s->rotate_cv.notify_all();
LOGW((s.logger.part == 0) ? "logging to %s" : "rotated to %s", s.segment_path); LOGW((s->logger.part == 0) ? "logging to %s" : "rotated to %s", s->segment_path);
} }
void rotate_if_needed() { void rotate_if_needed(LoggerdState *s) {
if (s.ready_to_rotate == s.max_waiting) { if (s->ready_to_rotate == s->max_waiting) {
logger_rotate(); logger_rotate(s);
} }
double tms = millis_since_boot(); double tms = millis_since_boot();
if ((tms - s.last_rotate_tms) > SEGMENT_LENGTH * 1000 && if ((tms - s->last_rotate_tms) > SEGMENT_LENGTH * 1000 &&
(tms - s.last_camera_seen_tms) > NO_CAMERA_PATIENCE && (tms - s->last_camera_seen_tms) > NO_CAMERA_PATIENCE &&
!LOGGERD_TEST) { !LOGGERD_TEST) {
LOGW("no camera packet seen. auto rotating"); LOGW("no camera packet seen. auto rotating");
logger_rotate(); logger_rotate(s);
} }
} }
@ -194,6 +198,7 @@ void loggerd_thread() {
} QlogState; } QlogState;
std::unordered_map<SubSocket*, QlogState> qlog_states; std::unordered_map<SubSocket*, QlogState> qlog_states;
LoggerdState s;
s.ctx = Context::create(); s.ctx = Context::create();
Poller * poller = Poller::create(); Poller * poller = Poller::create();
@ -209,7 +214,7 @@ void loggerd_thread() {
// init logger // init logger
logger_init(&s.logger, "rlog", true); logger_init(&s.logger, "rlog", true);
logger_rotate(); logger_rotate(&s);
Params().put("CurrentRoute", s.logger.route_name); Params().put("CurrentRoute", s.logger.route_name);
// init encoders // init encoders
@ -217,7 +222,7 @@ void loggerd_thread() {
std::vector<std::thread> encoder_threads; std::vector<std::thread> encoder_threads;
for (const auto &cam : cameras_logged) { for (const auto &cam : cameras_logged) {
if (cam.enable) { if (cam.enable) {
encoder_threads.push_back(std::thread(encoder_thread, cam)); encoder_threads.push_back(std::thread(encoder_thread, &s, cam));
if (cam.trigger_rotate) s.max_waiting++; if (cam.trigger_rotate) s.max_waiting++;
} }
} }
@ -236,7 +241,7 @@ void loggerd_thread() {
bytes_count += msg->getSize(); bytes_count += msg->getSize();
delete msg; delete msg;
rotate_if_needed(); rotate_if_needed(&s);
if ((++msg_count % 1000) == 0) { if ((++msg_count % 1000) == 0) {
double seconds = (millis_since_boot() - start_ts) / 1000.0; double seconds = (millis_since_boot() - start_ts) / 1000.0;

@ -115,5 +115,7 @@ struct LoggerdState {
bool camera_synced[WideRoadCam + 1] = {}; bool camera_synced[WideRoadCam + 1] = {};
}; };
bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id); bool sync_encoders(LoggerdState *s, CameraType cam_type, uint32_t frame_id);
bool trigger_rotate_if_needed(LoggerdState *s, int cur_seg, uint32_t frame_id);
void rotate_if_needed(LoggerdState *s);
void loggerd_thread(); void loggerd_thread();

@ -48,3 +48,46 @@ TEST_CASE("sync_encoders") {
} }
} }
} }
const int MAX_SEGMENT_CNT = 100;
std::pair<int, uint32_t> encoder_thread(LoggerdState *s) {
int cur_seg = 0;
uint32_t frame_id = s->start_frame_id;
while (cur_seg < MAX_SEGMENT_CNT) {
++frame_id;
if (trigger_rotate_if_needed(s, cur_seg, frame_id)) {
cur_seg = s->rotate_segment;
}
util::sleep_for(0);
}
return {cur_seg, frame_id};
}
TEST_CASE("trigger_rotate") {
const int encoders = GENERATE(1, 2, 3);
const int start_frame_id = random_int(0, 20);
LoggerdState s{
.max_waiting = encoders,
.start_frame_id = start_frame_id,
};
std::vector<std::future<std::pair<int, uint32_t>>> futures;
for (int i = 0; i < encoders; ++i) {
futures.emplace_back(std::async(std::launch::async, encoder_thread, &s));
}
while (s.rotate_segment < MAX_SEGMENT_CNT) {
rotate_if_needed(&s);
util::sleep_for(10);
}
for (auto &f : futures) {
auto [encoder_seg, frame_id] = f.get();
REQUIRE(encoder_seg == MAX_SEGMENT_CNT);
REQUIRE(frame_id == start_frame_id + encoder_seg * (SEGMENT_LENGTH * MAIN_FPS));
}
}

Loading…
Cancel
Save