diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 0721e836be..40065b5859 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -2,30 +2,42 @@ ExitHandler do_exit; -LoggerdState s; - // Handle initial encoder syncing by waiting for all encoders to reach the same frame id -bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id) { - if (state->camera_synced[cam_type]) return true; +bool sync_encoders(LoggerdState *s, CameraType cam_type, uint32_t frame_id) { + if (s->camera_synced[cam_type]) return true; - if (state->max_waiting > 1 && state->encoders_ready != state->max_waiting) { + if (s->max_waiting > 1 && s->encoders_ready != s->max_waiting) { // add a small margin to the start frame id in case one of the encoders already dropped the next frame - update_max_atomic(state->start_frame_id, frame_id + 2); - if (std::exchange(state->camera_ready[cam_type], true) == false) { - ++state->encoders_ready; + update_max_atomic(s->start_frame_id, frame_id + 2); + if (std::exchange(s->camera_ready[cam_type], true) == false) { + ++s->encoders_ready; LOGE("camera %d encoder ready", cam_type); } return false; } else { - if (state->max_waiting == 1) update_max_atomic(state->start_frame_id, frame_id); - bool synced = frame_id >= state->start_frame_id; - state->camera_synced[cam_type] = synced; - if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, (int)state->start_frame_id, frame_id); + if (s->max_waiting == 1) update_max_atomic(s->start_frame_id, frame_id); + bool synced = frame_id >= s->start_frame_id; + s->camera_synced[cam_type] = synced; + if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, (int)s->start_frame_id, frame_id); return synced; } } -void encoder_thread(const LogCameraInfo &cam_info) { +bool trigger_rotate_if_needed(LoggerdState *s, int cur_seg, uint32_t frame_id) { + const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS; + if (cur_seg >= 0 && frame_id >= ((cur_seg + 1) * frames_per_seg) + s->start_frame_id) { + // trigger rotate and wait until the main logger has rotated to the new segment + ++s->ready_to_rotate; + std::unique_lock lk(s->rotate_lock); + s->rotate_cv.wait(lk, [&] { + return s->rotate_segment > cur_seg || do_exit; + }); + return !do_exit; + } + return false; +} + +void encoder_thread(LoggerdState *s, const LogCameraInfo &cam_info) { set_thread_name(cam_info.filename); int cur_seg = -1; @@ -62,37 +74,29 @@ void encoder_thread(const LogCameraInfo &cam_info) { if (buf == nullptr) continue; if (cam_info.trigger_rotate) { - s.last_camera_seen_tms = millis_since_boot(); - if (!sync_encoders(&s, cam_info.type, extra.frame_id)) { + s->last_camera_seen_tms = millis_since_boot(); + if (!sync_encoders(s, cam_info.type, extra.frame_id)) { continue; } // check if we're ready to rotate - const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS; - if (cur_seg >= 0 && extra.frame_id >= ((cur_seg+1) * frames_per_seg) + s.start_frame_id) { - // trigger rotate and wait until the main logger has rotated to the new segment - ++s.ready_to_rotate; - std::unique_lock lk(s.rotate_lock); - s.rotate_cv.wait(lk, [&] { - return s.rotate_segment > cur_seg || do_exit; - }); - if (do_exit) break; - } + trigger_rotate_if_needed(s, cur_seg, extra.frame_id); + if (do_exit) break; } // rotate the encoder if the logger is on a newer segment - if (s.rotate_segment > cur_seg) { - cur_seg = s.rotate_segment; + if (s->rotate_segment > cur_seg) { + cur_seg = s->rotate_segment; - LOGW("camera %d rotate encoder to %s", cam_info.type, s.segment_path); + LOGW("camera %d rotate encoder to %s", cam_info.type, s->segment_path); for (auto &e : encoders) { e->encoder_close(); - e->encoder_open(s.segment_path); + e->encoder_open(s->segment_path); } if (lh) { lh_close(lh); } - lh = logger_get_handle(&s.logger); + lh = logger_get_handle(&s->logger); } // encode a frame @@ -157,31 +161,31 @@ void clear_locks() { ftw(LOG_ROOT.c_str(), clear_locks_fn, 16); } -void logger_rotate() { +void logger_rotate(LoggerdState *s) { { - std::unique_lock lk(s.rotate_lock); + std::unique_lock lk(s->rotate_lock); int segment = -1; - int err = logger_next(&s.logger, LOG_ROOT.c_str(), s.segment_path, sizeof(s.segment_path), &segment); + int err = logger_next(&s->logger, LOG_ROOT.c_str(), s->segment_path, sizeof(s->segment_path), &segment); assert(err == 0); - s.rotate_segment = segment; - s.ready_to_rotate = 0; - s.last_rotate_tms = millis_since_boot(); + s->rotate_segment = segment; + s->ready_to_rotate = 0; + s->last_rotate_tms = millis_since_boot(); } - s.rotate_cv.notify_all(); - LOGW((s.logger.part == 0) ? "logging to %s" : "rotated to %s", s.segment_path); + s->rotate_cv.notify_all(); + LOGW((s->logger.part == 0) ? "logging to %s" : "rotated to %s", s->segment_path); } -void rotate_if_needed() { - if (s.ready_to_rotate == s.max_waiting) { - logger_rotate(); +void rotate_if_needed(LoggerdState *s) { + if (s->ready_to_rotate == s->max_waiting) { + logger_rotate(s); } double tms = millis_since_boot(); - if ((tms - s.last_rotate_tms) > SEGMENT_LENGTH * 1000 && - (tms - s.last_camera_seen_tms) > NO_CAMERA_PATIENCE && + if ((tms - s->last_rotate_tms) > SEGMENT_LENGTH * 1000 && + (tms - s->last_camera_seen_tms) > NO_CAMERA_PATIENCE && !LOGGERD_TEST) { LOGW("no camera packet seen. auto rotating"); - logger_rotate(); + logger_rotate(s); } } @@ -194,6 +198,7 @@ void loggerd_thread() { } QlogState; std::unordered_map qlog_states; + LoggerdState s; s.ctx = Context::create(); Poller * poller = Poller::create(); @@ -209,7 +214,7 @@ void loggerd_thread() { // init logger logger_init(&s.logger, "rlog", true); - logger_rotate(); + logger_rotate(&s); Params().put("CurrentRoute", s.logger.route_name); // init encoders @@ -217,7 +222,7 @@ void loggerd_thread() { std::vector encoder_threads; for (const auto &cam : cameras_logged) { if (cam.enable) { - encoder_threads.push_back(std::thread(encoder_thread, cam)); + encoder_threads.push_back(std::thread(encoder_thread, &s, cam)); if (cam.trigger_rotate) s.max_waiting++; } } @@ -236,7 +241,7 @@ void loggerd_thread() { bytes_count += msg->getSize(); delete msg; - rotate_if_needed(); + rotate_if_needed(&s); if ((++msg_count % 1000) == 0) { double seconds = (millis_since_boot() - start_ts) / 1000.0; diff --git a/selfdrive/loggerd/loggerd.h b/selfdrive/loggerd/loggerd.h index ca5a638f85..b3e45adfae 100644 --- a/selfdrive/loggerd/loggerd.h +++ b/selfdrive/loggerd/loggerd.h @@ -115,5 +115,7 @@ struct LoggerdState { bool camera_synced[WideRoadCam + 1] = {}; }; -bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id); +bool sync_encoders(LoggerdState *s, CameraType cam_type, uint32_t frame_id); +bool trigger_rotate_if_needed(LoggerdState *s, int cur_seg, uint32_t frame_id); +void rotate_if_needed(LoggerdState *s); void loggerd_thread(); diff --git a/selfdrive/loggerd/tests/test_loggerd.cc b/selfdrive/loggerd/tests/test_loggerd.cc index 500aa6f338..d84185cbba 100644 --- a/selfdrive/loggerd/tests/test_loggerd.cc +++ b/selfdrive/loggerd/tests/test_loggerd.cc @@ -48,3 +48,46 @@ TEST_CASE("sync_encoders") { } } } + +const int MAX_SEGMENT_CNT = 100; + +std::pair encoder_thread(LoggerdState *s) { + int cur_seg = 0; + uint32_t frame_id = s->start_frame_id; + + while (cur_seg < MAX_SEGMENT_CNT) { + ++frame_id; + if (trigger_rotate_if_needed(s, cur_seg, frame_id)) { + cur_seg = s->rotate_segment; + } + util::sleep_for(0); + } + + return {cur_seg, frame_id}; +} + +TEST_CASE("trigger_rotate") { + const int encoders = GENERATE(1, 2, 3); + const int start_frame_id = random_int(0, 20); + + LoggerdState s{ + .max_waiting = encoders, + .start_frame_id = start_frame_id, + }; + + std::vector>> futures; + for (int i = 0; i < encoders; ++i) { + futures.emplace_back(std::async(std::launch::async, encoder_thread, &s)); + } + + while (s.rotate_segment < MAX_SEGMENT_CNT) { + rotate_if_needed(&s); + util::sleep_for(10); + } + + for (auto &f : futures) { + auto [encoder_seg, frame_id] = f.get(); + REQUIRE(encoder_seg == MAX_SEGMENT_CNT); + REQUIRE(frame_id == start_frame_id + encoder_seg * (SEGMENT_LENGTH * MAIN_FPS)); + } +}