|
|
|
@ -118,13 +118,34 @@ struct LoggerdState { |
|
|
|
|
double last_rotate_tms = 0.; |
|
|
|
|
|
|
|
|
|
// Sync logic for startup
|
|
|
|
|
std::atomic<bool> encoders_synced; |
|
|
|
|
std::atomic<int> encoders_ready; |
|
|
|
|
std::atomic<uint32_t> start_frame_id; |
|
|
|
|
std::atomic<uint32_t> latest_frame_id; |
|
|
|
|
std::atomic<int> encoders_ready = 0; |
|
|
|
|
std::atomic<uint32_t> latest_frame_id = 0; |
|
|
|
|
bool camera_ready[WideRoadCam + 1] = {}; |
|
|
|
|
bool camera_synced[WideRoadCam + 1] = {}; |
|
|
|
|
}; |
|
|
|
|
LoggerdState s; |
|
|
|
|
|
|
|
|
|
// Wait for all encoders to reach the same frame id
|
|
|
|
|
bool sync_encoders(LoggerdState *s, CameraType cam_type, uint32_t frame_id) { |
|
|
|
|
if (s->camera_synced[cam_type]) return true; |
|
|
|
|
|
|
|
|
|
if (s->max_waiting > 1 && s->encoders_ready != s->max_waiting) { |
|
|
|
|
update_max_atomic(s->latest_frame_id, frame_id); |
|
|
|
|
if (std::exchange(s->camera_ready[cam_type], true) == false) { |
|
|
|
|
++s->encoders_ready; |
|
|
|
|
LOGE("camera %d encoder ready", cam_type); |
|
|
|
|
} |
|
|
|
|
return false; |
|
|
|
|
} else { |
|
|
|
|
// Small margin in case one of the encoders already dropped the next frame
|
|
|
|
|
uint32_t start_frame_id = s->latest_frame_id + 2; |
|
|
|
|
bool synced = frame_id >= start_frame_id; |
|
|
|
|
s->camera_synced[cam_type] = synced; |
|
|
|
|
if (!synced) LOGE("camera %d waiting for frame %d, cur %d", cam_type, start_frame_id, frame_id); |
|
|
|
|
return synced; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void encoder_thread(const LogCameraInfo &cam_info) { |
|
|
|
|
set_thread_name(cam_info.filename); |
|
|
|
|
|
|
|
|
@ -134,8 +155,6 @@ void encoder_thread(const LogCameraInfo &cam_info) { |
|
|
|
|
std::vector<Encoder *> encoders; |
|
|
|
|
VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false); |
|
|
|
|
|
|
|
|
|
bool ready = false; |
|
|
|
|
|
|
|
|
|
while (!do_exit) { |
|
|
|
|
if (!vipc_client.connect(false)) { |
|
|
|
|
util::sleep_for(5); |
|
|
|
@ -163,27 +182,11 @@ void encoder_thread(const LogCameraInfo &cam_info) { |
|
|
|
|
VisionBuf* buf = vipc_client.recv(&extra); |
|
|
|
|
if (buf == nullptr) continue; |
|
|
|
|
|
|
|
|
|
if (cam_info.trigger_rotate && (s.max_waiting > 1)) { |
|
|
|
|
if (!ready) { |
|
|
|
|
LOGE("%s encoder ready", cam_info.filename); |
|
|
|
|
++s.encoders_ready; |
|
|
|
|
ready = true; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (!s.encoders_synced) { |
|
|
|
|
update_max_atomic(s.latest_frame_id, extra.frame_id); |
|
|
|
|
continue; |
|
|
|
|
} else { |
|
|
|
|
// Wait for all encoders to reach the same frame id
|
|
|
|
|
if (extra.frame_id < s.start_frame_id) { |
|
|
|
|
LOGE("%s waiting for frame %d, cur %d", cam_info.filename, s.start_frame_id.load(), extra.frame_id); |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (cam_info.trigger_rotate) { |
|
|
|
|
s.last_camera_seen_tms = millis_since_boot(); |
|
|
|
|
if (!sync_encoders(&s, cam_info.type, extra.frame_id)) { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (cam_info.trigger_rotate && (cnt >= SEGMENT_LENGTH * MAIN_FPS)) { |
|
|
|
@ -354,15 +357,6 @@ int main(int argc, char** argv) { |
|
|
|
|
uint64_t msg_count = 0, bytes_count = 0; |
|
|
|
|
double start_ts = millis_since_boot(); |
|
|
|
|
while (!do_exit) { |
|
|
|
|
// Check if all encoders are ready and start encoding at the same time
|
|
|
|
|
if ((s.max_waiting > 1) && !s.encoders_synced && (s.encoders_ready == s.max_waiting)) { |
|
|
|
|
// Small margin in case one of the encoders already dropped the next frame
|
|
|
|
|
s.start_frame_id = s.latest_frame_id + 2; |
|
|
|
|
s.encoders_synced = true; |
|
|
|
|
LOGE("starting encoders at frame id %d", s.start_frame_id.load()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// poll for new messages on all sockets
|
|
|
|
|
for (auto sock : poller->poll(1000)) { |
|
|
|
|
// drain socket
|
|
|
|
|