diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index c87def8ed0..5dea6e575e 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -118,13 +118,34 @@ struct LoggerdState { double last_rotate_tms = 0.; // Sync logic for startup - std::atomic encoders_synced; - std::atomic encoders_ready; - std::atomic start_frame_id; - std::atomic latest_frame_id; + std::atomic encoders_ready = 0; + std::atomic latest_frame_id = 0; + bool camera_ready[WideRoadCam + 1] = {}; + bool camera_synced[WideRoadCam + 1] = {}; }; LoggerdState s; +// Wait for all encoders to reach the same frame id +bool sync_encoders(LoggerdState *s, CameraType cam_type, uint32_t frame_id) { + if (s->camera_synced[cam_type]) return true; + + if (s->max_waiting > 1 && s->encoders_ready != s->max_waiting) { + update_max_atomic(s->latest_frame_id, frame_id); + if (std::exchange(s->camera_ready[cam_type], true) == false) { + ++s->encoders_ready; + LOGE("camera %d encoder ready", cam_type); + } + return false; + } else { + // Small margin in case one of the encoders already dropped the next frame + uint32_t start_frame_id = s->latest_frame_id + 2; + bool synced = frame_id >= start_frame_id; + s->camera_synced[cam_type] = synced; + if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, start_frame_id, frame_id); + return synced; + } +} + void encoder_thread(const LogCameraInfo &cam_info) { set_thread_name(cam_info.filename); @@ -134,8 +155,6 @@ void encoder_thread(const LogCameraInfo &cam_info) { std::vector encoders; VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false); - bool ready = false; - while (!do_exit) { if (!vipc_client.connect(false)) { util::sleep_for(5); @@ -163,27 +182,11 @@ void encoder_thread(const LogCameraInfo &cam_info) { VisionBuf* buf = vipc_client.recv(&extra); if (buf == nullptr) continue; - if (cam_info.trigger_rotate && (s.max_waiting > 1)) { - if (!ready) { - LOGE("%s encoder ready", cam_info.filename); - ++s.encoders_ready; - ready = true; - } - - if (!s.encoders_synced) { - update_max_atomic(s.latest_frame_id, extra.frame_id); - continue; - } else { - // Wait for all encoders to reach the same frame id - if (extra.frame_id < s.start_frame_id) { - LOGE("%s waiting for frame %d, cur %d", cam_info.filename, s.start_frame_id.load(), extra.frame_id); - continue; - } - } - } - if (cam_info.trigger_rotate) { s.last_camera_seen_tms = millis_since_boot(); + if (!sync_encoders(&s, cam_info.type, extra.frame_id)) { + continue; + } } if (cam_info.trigger_rotate && (cnt >= SEGMENT_LENGTH * MAIN_FPS)) { @@ -354,15 +357,6 @@ int main(int argc, char** argv) { uint64_t msg_count = 0, bytes_count = 0; double start_ts = millis_since_boot(); while (!do_exit) { - // Check if all encoders are ready and start encoding at the same time - if ((s.max_waiting > 1) && !s.encoders_synced && (s.encoders_ready == s.max_waiting)) { - // Small margin in case one of the encoders already dropped the next frame - s.start_frame_id = s.latest_frame_id + 2; - s.encoders_synced = true; - LOGE("starting encoders at frame id %d", s.start_frame_id.load()); - } - - // poll for new messages on all sockets for (auto sock : poller->poll(1000)) { // drain socket