diff --git a/selfdrive/common/util.h b/selfdrive/common/util.h index f9a244bc6a..987bf95a9d 100644 --- a/selfdrive/common/util.h +++ b/selfdrive/common/util.h @@ -141,3 +141,9 @@ public: private: float x_, k_; }; + +template +void update_max_atomic(std::atomic& max, T const& value) { + T prev = max; + while(prev < value && !max.compare_exchange_weak(prev, value)) {} +} diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index b70d1feec4..1b9e9d6db6 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -46,7 +46,8 @@ const int DCAM_BITRATE = Hardware::TICI() ? MAIN_BITRATE : 2500000; #define NO_CAMERA_PATIENCE 500 // fall back to time-based rotation if all cameras are dead -const int SEGMENT_LENGTH = getenv("LOGGERD_TEST") ? atoi(getenv("LOGGERD_SEGMENT_LENGTH")) : 60; +const bool LOGGERD_TEST = getenv("LOGGERD_TEST"); +const int SEGMENT_LENGTH = LOGGERD_TEST ? atoi(getenv("LOGGERD_SEGMENT_LENGTH")) : 60; ExitHandler do_exit; @@ -112,6 +113,12 @@ struct LoggerdState { std::atomic waiting_rotate; int max_waiting = 0; double last_rotate_tms = 0.; + + // Sync logic for startup + std::atomic encoders_synced; + std::atomic encoders_ready; + std::atomic start_frame_id; + std::atomic latest_frame_id; }; LoggerdState s; @@ -124,9 +131,11 @@ void encoder_thread(const LogCameraInfo &cam_info) { std::vector encoders; VisionIpcClient vipc_client = VisionIpcClient("camerad", cam_info.stream_type, false); + bool ready = false; + while (!do_exit) { if (!vipc_client.connect(false)) { - util::sleep_for(100); + util::sleep_for(5); continue; } @@ -151,6 +160,23 @@ void encoder_thread(const LogCameraInfo &cam_info) { if (buf == nullptr) continue; if (cam_info.trigger_rotate) { + if (!ready) { + LOGE("%s encoder ready", cam_info.filename); + ++s.encoders_ready; + ready = true; + } + + if (!s.encoders_synced) { + update_max_atomic(s.latest_frame_id, extra.frame_id); + continue; + } else { + // Wait for all encoders to reach the same frame id + if (extra.frame_id < s.start_frame_id) { + LOGE("%s waiting for frame %d, cur %d", cam_info.filename, s.start_frame_id.load(), extra.frame_id); + continue; + } + } + s.last_camera_seen_tms = millis_since_boot(); } @@ -262,7 +288,8 @@ void rotate_if_needed() { double tms = millis_since_boot(); if ((tms - s.last_rotate_tms) > SEGMENT_LENGTH * 1000 && - (tms - s.last_camera_seen_tms) > NO_CAMERA_PATIENCE) { + (tms - s.last_camera_seen_tms) > NO_CAMERA_PATIENCE && + !LOGGERD_TEST) { LOGW("no camera packet seen. auto rotating"); logger_rotate(); } @@ -312,6 +339,15 @@ int main(int argc, char** argv) { uint64_t msg_count = 0, bytes_count = 0; double start_ts = millis_since_boot(); while (!do_exit) { + // Check if all encoders are ready and start encoding at the same time + if (!s.encoders_synced && (s.encoders_ready == s.max_waiting)) { + // Small margin in case one of the encoders already dropped the next frame + s.start_frame_id = s.latest_frame_id + 2; + s.encoders_synced = true; + LOGE("starting encoders at frame id %d", s.start_frame_id.load()); + } + + // poll for new messages on all sockets for (auto sock : poller->poll(1000)) { // drain socket diff --git a/selfdrive/loggerd/tests/test_encoder.py b/selfdrive/loggerd/tests/test_encoder.py index 5cd7da5a02..313b7ff8ec 100755 --- a/selfdrive/loggerd/tests/test_encoder.py +++ b/selfdrive/loggerd/tests/test_encoder.py @@ -15,7 +15,6 @@ from common.params import Params from common.timeout import Timeout from selfdrive.hardware import EON, TICI from selfdrive.loggerd.config import ROOT -from selfdrive.test.helpers import with_processes from selfdrive.manager.process_config import managed_processes from tools.lib.logreader import LogReader @@ -66,9 +65,14 @@ class TestEncoder(unittest.TestCase): # TODO: this should run faster than real time @parameterized.expand([(True, ), (False, )]) - @with_processes(['camerad', 'sensord', 'loggerd'], init_time=3, ignore_stopped=['loggerd']) def test_log_rotation(self, record_front): - Params().put("RecordFront", str(int(record_front))) + Params().put_bool("RecordFront", record_front) + + managed_processes['sensord'].start() + managed_processes['loggerd'].start() + + time.sleep(1.0) + managed_processes['camerad'].start() num_segments = int(os.getenv("SEGMENTS", random.randint(10, 15))) @@ -84,6 +88,7 @@ class TestEncoder(unittest.TestCase): def check_seg(i): # check each camera file size counts = [] + first_frames = [] for camera, fps, size, encode_idx_name in CAMERAS: if not record_front and "dcamera" in camera: continue @@ -119,6 +124,7 @@ class TestEncoder(unittest.TestCase): segment_idxs = [getattr(m, encode_idx_name).segmentId for m in LogReader(rlog_path) if m.which() == encode_idx_name] encode_idxs = [getattr(m, encode_idx_name).encodeId for m in LogReader(rlog_path) if m.which() == encode_idx_name] + frame_idxs = [getattr(m, encode_idx_name).frameId for m in LogReader(rlog_path) if m.which() == encode_idx_name] # Check frame count self.assertEqual(frame_count, len(segment_idxs)) @@ -130,8 +136,11 @@ class TestEncoder(unittest.TestCase): if not eon_dcam: self.assertEqual(expected_frames * i, encode_idxs[0]) + first_frames.append(frame_idxs[0]) self.assertEqual(len(set(encode_idxs)), len(encode_idxs)) + self.assertEqual(1, len(set(first_frames))) + if TICI: expected_frames = fps * SEGMENT_LENGTH self.assertEqual(min(counts), expected_frames) @@ -139,11 +148,13 @@ class TestEncoder(unittest.TestCase): for i in trange(num_segments + 1): # poll for next segment - with Timeout(int(SEGMENT_LENGTH*2), error_msg=f"timed out waiting for segment {i}"): + with Timeout(int(SEGMENT_LENGTH*10), error_msg=f"timed out waiting for segment {i}"): while Path(f"{route_prefix_path}--{i}") not in Path(ROOT).iterdir(): time.sleep(0.1) managed_processes['loggerd'].stop() + managed_processes['camerad'].stop() + managed_processes['sensord'].stop() for i in trange(num_segments): check_seg(i)