loggerd: trigger rotate on frame id instead of frame count (#22848)

Co-authored-by: Comma Device <device@comma.ai>
old-commit-hash: ea761cbbd5
commatwo_master
Adeeb Shihadeh 3 years ago committed by GitHub
parent 17b51d3fd8
commit 35bfdce477
  1. 52
      selfdrive/loggerd/loggerd.cc
  2. 31
      selfdrive/loggerd/tests/test_encoder.py
  3. 2
      selfdrive/loggerd/tests/test_loggerd.py

@ -113,35 +113,35 @@ struct LoggerdState {
std::condition_variable rotate_cv; std::condition_variable rotate_cv;
std::atomic<int> rotate_segment; std::atomic<int> rotate_segment;
std::atomic<double> last_camera_seen_tms; std::atomic<double> last_camera_seen_tms;
std::atomic<int> waiting_rotate; std::atomic<int> ready_to_rotate; // count of encoders ready to rotate
int max_waiting = 0; int max_waiting = 0;
double last_rotate_tms = 0.; double last_rotate_tms = 0.; // last rotate time in ms
// Sync logic for startup // Sync logic for startup
std::atomic<int> encoders_ready = 0; std::atomic<int> encoders_ready = 0;
std::atomic<uint32_t> latest_frame_id = 0; std::atomic<uint32_t> start_frame_id = 0;
bool camera_ready[WideRoadCam + 1] = {}; bool camera_ready[WideRoadCam + 1] = {};
bool camera_synced[WideRoadCam + 1] = {}; bool camera_synced[WideRoadCam + 1] = {};
}; };
LoggerdState s; LoggerdState s;
// Wait for all encoders to reach the same frame id // Handle initial encoder syncing by waiting for all encoders to reach the same frame id
bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id) { bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id) {
if (state->camera_synced[cam_type]) return true; if (state->camera_synced[cam_type]) return true;
if (state->max_waiting > 1 && state->encoders_ready != state->max_waiting) { if (state->max_waiting > 1 && state->encoders_ready != state->max_waiting) {
update_max_atomic(state->latest_frame_id, frame_id); // add a small margin to the start frame id in case one of the encoders already dropped the next frame
update_max_atomic(state->start_frame_id, frame_id + 2);
if (std::exchange(state->camera_ready[cam_type], true) == false) { if (std::exchange(state->camera_ready[cam_type], true) == false) {
++state->encoders_ready; ++state->encoders_ready;
LOGE("camera %d encoder ready", cam_type); LOGE("camera %d encoder ready", cam_type);
} }
return false; return false;
} else { } else {
// Small margin in case one of the encoders already dropped the next frame if (state->max_waiting == 1) update_max_atomic(state->start_frame_id, frame_id);
uint32_t start_frame_id = state->latest_frame_id + 2; bool synced = frame_id >= state->start_frame_id;
bool synced = frame_id >= start_frame_id;
state->camera_synced[cam_type] = synced; state->camera_synced[cam_type] = synced;
if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, start_frame_id, frame_id); if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, (int)state->start_frame_id, frame_id);
return synced; return synced;
} }
} }
@ -149,7 +149,7 @@ bool sync_encoders(LoggerdState *state, CameraType cam_type, uint32_t frame_id)
void encoder_thread(const LogCameraInfo &cam_info) { void encoder_thread(const LogCameraInfo &cam_info) {
set_thread_name(cam_info.filename); set_thread_name(cam_info.filename);
int cnt = 0, cur_seg = -1; int cur_seg = -1;
int encode_idx = 0; int encode_idx = 0;
LoggerHandle *lh = NULL; LoggerHandle *lh = NULL;
std::vector<Encoder *> encoders; std::vector<Encoder *> encoders;
@ -187,20 +187,23 @@ void encoder_thread(const LogCameraInfo &cam_info) {
if (!sync_encoders(&s, cam_info.type, extra.frame_id)) { if (!sync_encoders(&s, cam_info.type, extra.frame_id)) {
continue; continue;
} }
}
if (cam_info.trigger_rotate && (cnt >= SEGMENT_LENGTH * MAIN_FPS)) { // check if we're ready to rotate
// trigger rotate and wait logger rotated to new segment const int frames_per_seg = SEGMENT_LENGTH * MAIN_FPS;
++s.waiting_rotate; if (cur_seg >= 0 && extra.frame_id >= ((cur_seg+1) * frames_per_seg) + s.start_frame_id) {
std::unique_lock lk(s.rotate_lock); // trigger rotate and wait until the main logger has rotated to the new segment
s.rotate_cv.wait(lk, [&] { return s.rotate_segment > cur_seg || do_exit; }); ++s.ready_to_rotate;
std::unique_lock lk(s.rotate_lock);
s.rotate_cv.wait(lk, [&] {
return s.rotate_segment > cur_seg || do_exit;
});
if (do_exit) break;
}
} }
if (do_exit) break;
// rotate the encoder if the logger is on a newer segment // rotate the encoder if the logger is on a newer segment
if (s.rotate_segment > cur_seg) { if (s.rotate_segment > cur_seg) {
cur_seg = s.rotate_segment; cur_seg = s.rotate_segment;
cnt = 0;
LOGW("camera %d rotate encoder to %s", cam_info.type, s.segment_path); LOGW("camera %d rotate encoder to %s", cam_info.type, s.segment_path);
for (auto &e : encoders) { for (auto &e : encoders) {
@ -247,7 +250,6 @@ void encoder_thread(const LogCameraInfo &cam_info) {
} }
} }
cnt++;
encode_idx++; encode_idx++;
} }
@ -283,7 +285,7 @@ void logger_rotate() {
int err = logger_next(&s.logger, LOG_ROOT.c_str(), s.segment_path, sizeof(s.segment_path), &segment); int err = logger_next(&s.logger, LOG_ROOT.c_str(), s.segment_path, sizeof(s.segment_path), &segment);
assert(err == 0); assert(err == 0);
s.rotate_segment = segment; s.rotate_segment = segment;
s.waiting_rotate = 0; s.ready_to_rotate = 0;
s.last_rotate_tms = millis_since_boot(); s.last_rotate_tms = millis_since_boot();
} }
s.rotate_cv.notify_all(); s.rotate_cv.notify_all();
@ -291,7 +293,7 @@ void logger_rotate() {
} }
void rotate_if_needed() { void rotate_if_needed() {
if (s.waiting_rotate == s.max_waiting) { if (s.ready_to_rotate == s.max_waiting) {
logger_rotate(); logger_rotate();
} }
@ -347,10 +349,10 @@ int main(int argc, char** argv) {
// init encoders // init encoders
s.last_camera_seen_tms = millis_since_boot(); s.last_camera_seen_tms = millis_since_boot();
std::vector<std::thread> encoder_threads; std::vector<std::thread> encoder_threads;
for (const auto &ci : cameras_logged) { for (const auto &cam : cameras_logged) {
if (ci.enable) { if (cam.enable) {
encoder_threads.push_back(std::thread(encoder_thread, ci)); encoder_threads.push_back(std::thread(encoder_thread, cam));
if (ci.trigger_rotate) s.max_waiting++; if (cam.trigger_rotate) s.max_waiting++;
} }
} }

@ -97,10 +97,8 @@ class TestEncoder(unittest.TestCase):
file_path = f"{route_prefix_path}--{i}/{camera}" file_path = f"{route_prefix_path}--{i}/{camera}"
# check file size # check file exists
self.assertTrue(os.path.exists(file_path)) self.assertTrue(os.path.exists(file_path), f"segment #{i}: '{file_path}' missing")
file_size = os.path.getsize(file_path)
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE))
# TODO: this ffprobe call is really slow # TODO: this ffprobe call is really slow
# check frame count # check frame count
@ -116,7 +114,11 @@ class TestEncoder(unittest.TestCase):
counts.append(frame_count) counts.append(frame_count)
self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance, self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance,
f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}") f"segment #{i}: {camera} failed frame count check: expected {expected_frames}, got {frame_count}")
# sanity check file size
file_size = os.path.getsize(file_path)
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE))
# Check encodeIdx # Check encodeIdx
if encode_idx_name is not None: if encode_idx_name is not None:
@ -151,15 +153,16 @@ class TestEncoder(unittest.TestCase):
self.assertEqual(min(counts), expected_frames) self.assertEqual(min(counts), expected_frames)
shutil.rmtree(f"{route_prefix_path}--{i}") shutil.rmtree(f"{route_prefix_path}--{i}")
for i in trange(num_segments + 1): try:
# poll for next segment for i in trange(num_segments + 1):
with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"): # poll for next segment
while Path(f"{route_prefix_path}--{i}") not in Path(ROOT).iterdir(): with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"):
time.sleep(0.1) while Path(f"{route_prefix_path}--{i}") not in Path(ROOT).iterdir():
time.sleep(0.1)
managed_processes['loggerd'].stop() finally:
managed_processes['camerad'].stop() managed_processes['loggerd'].stop()
managed_processes['sensord'].stop() managed_processes['camerad'].stop()
managed_processes['sensord'].stop()
for i in trange(num_segments): for i in trange(num_segments):
check_seg(i) check_seg(i)

@ -133,7 +133,7 @@ class TestLoggerd(unittest.TestCase):
p = Path(f"{route_path}--{n}") p = Path(f"{route_path}--{n}")
logged = set([f.name for f in p.iterdir() if f.is_file()]) logged = set([f.name for f in p.iterdir() if f.is_file()])
diff = logged ^ expected_files diff = logged ^ expected_files
self.assertEqual(len(diff), 0, f"{_=} {route_path=} {n=}, {logged=} {expected_files=}") self.assertEqual(len(diff), 0, f"didn't get all expected files. run={_} seg={n} {route_path=}, {diff=}\n{logged=} {expected_files=}")
def test_bootlog(self): def test_bootlog(self):
# generate bootlog with fake launch log # generate bootlog with fake launch log

Loading…
Cancel
Save