From 35bfdce477cf2c4e953e13a07d8cf6eb67de5774 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Sat, 13 Nov 2021 15:38:10 -0800 Subject: [PATCH] loggerd: trigger rotate on frame id instead of frame count (#22848) Co-authored-by: Comma Device old-commit-hash: ea761cbbd5103a589915ec3766b4d6056293cba6 --- selfdrive/loggerd/loggerd.cc | 52 +++++++++++++------------ selfdrive/loggerd/tests/test_encoder.py | 31 ++++++++------- selfdrive/loggerd/tests/test_loggerd.py | 2 +- 3 files changed, 45 insertions(+), 40 deletions(-) diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 71df85396c..f6d89b8091 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -113,35 +113,35 @@ struct LoggerdState { std::condition_variable rotate_cv; std::atomic rotate_segment; std::atomic last_camera_seen_tms; - std::atomic waiting_rotate; + std::atomic ready_to_rotate; // count of encoders ready to rotate int max_waiting = 0; - double last_rotate_tms = 0.; + double last_rotate_tms = 0.; // last rotate time in ms // Sync logic for startup std::atomic encoders_ready = 0; - std::atomic latest_frame_id = 0; + std::atomic start_frame_id = 0; bool camera_ready[WideRoadCam + 1] = {}; bool camera_synced[WideRoadCam + 1] = {}; }; LoggerdState s; -// Wait for all encoders to reach the same frame id +// Handle initial encoder syncing by waiting for all encoders to reach the same frame id bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id) { if (state->camera_synced[cam_type]) return true; if (state->max_waiting > 1 && state->encoders_ready != state->max_waiting) { - update_max_atomic(state->latest_frame_id, frame_id); + // add a small margin to the start frame id in case one of the encoders already dropped the next frame + update_max_atomic(state->start_frame_id, frame_id + 2); if (std::exchange(state->camera_ready[cam_type], true) == false) { ++state->encoders_ready; LOGE("camera %d encoder ready", cam_type); } return false; } else { - // Small margin in case one of the encoders already dropped the next frame - uint32_t start_frame_id = state->latest_frame_id + 2; - bool synced = frame_id >= start_frame_id; + if (state->max_waiting == 1) update_max_atomic(state->start_frame_id, frame_id); + bool synced = frame_id >= state->start_frame_id; state->camera_synced[cam_type] = synced; - if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, start_frame_id, frame_id); + if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, (int)state->start_frame_id, frame_id); return synced; } } @@ -149,7 +149,7 @@ bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id) void encoder_thread(const LogCameraInfo &cam_info) { set_thread_name(cam_info.filename); - int cnt = 0, cur_seg = -1; + int cur_seg = -1; int encode_idx = 0; LoggerHandle *lh = NULL; std::vector encoders; @@ -187,20 +187,23 @@ void encoder_thread(const LogCameraInfo &cam_info) { if (!sync_encoders(&s, cam_info.type, extra.frame_id)) { continue; } - } - if (cam_info.trigger_rotate && (cnt >= SEGMENT_LENGTH * MAIN_FPS)) { - // trigger rotate and wait logger rotated to new segment - ++s.waiting_rotate; - std::unique_lock lk(s.rotate_lock); - s.rotate_cv.wait(lk, [&] { return s.rotate_segment > cur_seg || do_exit; }); + // check if we're ready to rotate + const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS; + if (cur_seg >= 0 && extra.frame_id >= ((cur_seg+1) * frames_per_seg) + s.start_frame_id) { + // trigger rotate and wait until the main logger has rotated to the new segment + ++s.ready_to_rotate; + std::unique_lock lk(s.rotate_lock); + s.rotate_cv.wait(lk, [&] { + return s.rotate_segment > cur_seg || do_exit; + }); + if (do_exit) break; + } } - if (do_exit) break; // rotate the encoder if the logger is on a newer segment if (s.rotate_segment > cur_seg) { cur_seg = s.rotate_segment; - cnt = 0; LOGW("camera %d rotate encoder to %s", cam_info.type, s.segment_path); for (auto &e : encoders) { @@ -247,7 +250,6 @@ void encoder_thread(const LogCameraInfo &cam_info) { } } - cnt++; encode_idx++; } @@ -283,7 +285,7 @@ void logger_rotate() { int err = logger_next(&s.logger, LOG_ROOT.c_str(), s.segment_path, sizeof(s.segment_path), &segment); assert(err == 0); s.rotate_segment = segment; - s.waiting_rotate = 0; + s.ready_to_rotate = 0; s.last_rotate_tms = millis_since_boot(); } s.rotate_cv.notify_all(); @@ -291,7 +293,7 @@ void logger_rotate() { } void rotate_if_needed() { - if (s.waiting_rotate == s.max_waiting) { + if (s.ready_to_rotate == s.max_waiting) { logger_rotate(); } @@ -347,10 +349,10 @@ int main(int argc, char** argv) { // init encoders s.last_camera_seen_tms = millis_since_boot(); std::vector encoder_threads; - for (const auto &ci : cameras_logged) { - if (ci.enable) { - encoder_threads.push_back(std::thread(encoder_thread, ci)); - if (ci.trigger_rotate) s.max_waiting++; + for (const auto &cam : cameras_logged) { + if (cam.enable) { + encoder_threads.push_back(std::thread(encoder_thread, cam)); + if (cam.trigger_rotate) s.max_waiting++; } } diff --git a/selfdrive/loggerd/tests/test_encoder.py b/selfdrive/loggerd/tests/test_encoder.py index f8223e0588..992e8538b6 100755 --- a/selfdrive/loggerd/tests/test_encoder.py +++ b/selfdrive/loggerd/tests/test_encoder.py @@ -97,10 +97,8 @@ class TestEncoder(unittest.TestCase): file_path = f"{route_prefix_path}--{i}/{camera}" - # check file size - self.assertTrue(os.path.exists(file_path)) - file_size = os.path.getsize(file_path) - self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE)) + # check file exists + self.assertTrue(os.path.exists(file_path), f"segment #{i}: '{file_path}' missing") # TODO: this ffprobe call is really slow # check frame count @@ -116,7 +114,11 @@ class TestEncoder(unittest.TestCase): counts.append(frame_count) self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance, - f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}") + f"segment #{i}: {camera} failed frame count check: expected {expected_frames}, got {frame_count}") + + # sanity check file size + file_size = os.path.getsize(file_path) + self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE)) # Check encodeIdx if encode_idx_name is not None: @@ -151,15 +153,16 @@ class TestEncoder(unittest.TestCase): self.assertEqual(min(counts), expected_frames) shutil.rmtree(f"{route_prefix_path}--{i}") - for i in trange(num_segments + 1): - # poll for next segment - with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"): - while Path(f"{route_prefix_path}--{i}") not in Path(ROOT).iterdir(): - time.sleep(0.1) - - managed_processes['loggerd'].stop() - managed_processes['camerad'].stop() - managed_processes['sensord'].stop() + try: + for i in trange(num_segments + 1): + # poll for next segment + with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"): + while Path(f"{route_prefix_path}--{i}") not in Path(ROOT).iterdir(): + time.sleep(0.1) + finally: + managed_processes['loggerd'].stop() + managed_processes['camerad'].stop() + managed_processes['sensord'].stop() for i in trange(num_segments): check_seg(i) diff --git a/selfdrive/loggerd/tests/test_loggerd.py b/selfdrive/loggerd/tests/test_loggerd.py index ecd15d5a06..76756c8724 100755 --- a/selfdrive/loggerd/tests/test_loggerd.py +++ b/selfdrive/loggerd/tests/test_loggerd.py @@ -133,7 +133,7 @@ class TestLoggerd(unittest.TestCase): p = Path(f"{route_path}--{n}") logged = set([f.name for f in p.iterdir() if f.is_file()]) diff = logged ^ expected_files - self.assertEqual(len(diff), 0, f"{_=} {route_path=} {n=}, {logged=} {expected_files=}") + self.assertEqual(len(diff), 0, f"didn't get all expected files. run={_} seg={n} {route_path=}, {diff=}\n{logged=} {expected_files=}") def test_bootlog(self): # generate bootlog with fake launch log