diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index f595b0f547..a70285efed 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -108,9 +108,11 @@ public: SubSocket* fpkt_sock; uint32_t stream_frame_id, log_frame_id, last_rotate_frame_id; bool enabled, should_rotate, initialized; + std::atomic rotating; + std::atomic cur_seg; RotateState() : fpkt_sock(nullptr), stream_frame_id(0), log_frame_id(0), - last_rotate_frame_id(UINT32_MAX), enabled(false), should_rotate(false), initialized(false) {}; + last_rotate_frame_id(UINT32_MAX), enabled(false), should_rotate(false), initialized(false), rotating(false), cur_seg(-1) {}; void waitLogThread() { std::unique_lock lk(fid_lock); @@ -232,17 +234,31 @@ void encoder_thread(int cam_idx) { rotate_state.initialized = true; } + // get new logger handle for new segment if (lh) { lh_close(lh); } lh = logger_get_handle(&s.logger); + // wait for all to start rotating + rotate_state.rotating = true; + for(auto &r : s.rotate_state) { + while(r.enabled && !r.rotating && !do_exit) util::sleep_for(5); + } + pthread_mutex_lock(&s.rotate_lock); for (auto &e : encoders) { e->encoder_close(); e->encoder_open(s.segment_path, s.rotate_segment); } + rotate_state.cur_seg = s.rotate_segment; pthread_mutex_unlock(&s.rotate_lock); + + // wait for all to finish rotating + for(auto &r : s.rotate_state) { + while(r.enabled && r.cur_seg != s.rotate_segment && !do_exit) util::sleep_for(5); + } + rotate_state.rotating = false; rotate_state.finish_rotate(); } } diff --git a/selfdrive/loggerd/tests/test_encoder.py b/selfdrive/loggerd/tests/test_encoder.py index 130aa0f011..9d53c8339d 100755 --- a/selfdrive/loggerd/tests/test_encoder.py +++ b/selfdrive/loggerd/tests/test_encoder.py @@ -4,7 +4,6 @@ import os import random import shutil import subprocess -import threading import time import unittest from parameterized import parameterized @@ -15,22 +14,24 @@ from common.params import Params from common.timeout import Timeout from selfdrive.hardware import EON, TICI from selfdrive.test.helpers import with_processes -from selfdrive.loggerd.config import ROOT, CAMERA_FPS +from selfdrive.loggerd.config import ROOT -# baseline file sizes for a 2s segment, in bytes SEGMENT_LENGTH = 2 -FULL_SIZE = 1253786 +FULL_SIZE = 1253786 # file size for a 2s segment in bytes if EON: - CAMERAS = { - "fcamera": FULL_SIZE, - "dcamera": 770920, - "qcamera": 38533, - } + CAMERAS = [ + ("fcamera.hevc", 20, FULL_SIZE), + ("dcamera.hevc", 10, 770920), + ("qcamera.ts", 20, 38533), + ] else: - CAMERAS = {f"{c}camera": FULL_SIZE if c!="q" else 38533 for c in ["f", "e", "d", "q"]} - -ALL_CAMERA_COMBINATIONS = [(cameras,) for cameras in [CAMERAS, {k:CAMERAS[k] for k in CAMERAS if k!='dcamera'}]] + CAMERAS = [ + ("fcamera.hevc", 20, FULL_SIZE), + ("dcamera.hevc", 20, FULL_SIZE), + ("ecamera.hevc", 20, FULL_SIZE), + ("qcamera.ts", 20, 38533), + ] # we check frame count, so we don't have to be too strict on size FILE_SIZE_TOLERANCE = 0.5 @@ -60,11 +61,10 @@ class TestEncoder(unittest.TestCase): return os.path.join(ROOT, last_route) # TODO: this should run faster than real time - @parameterized.expand(ALL_CAMERA_COMBINATIONS) - @with_processes(['camerad', 'sensord', 'loggerd']) - def test_log_rotation(self, cameras): - print("checking targets:", cameras) - Params().put("RecordFront", "1" if 'dcamera' in cameras else "0") + @parameterized.expand([(True, ), (False, )]) + @with_processes(['camerad', 'sensord', 'loggerd'], init_time=3) + def test_log_rotation(self, record_front): + Params().put("RecordFront", str(int(record_front))) num_segments = random.randint(80, 150) if "CI" in os.environ: @@ -72,58 +72,45 @@ class TestEncoder(unittest.TestCase): # wait for loggerd to make the dir for first segment route_prefix_path = None - with Timeout(int(SEGMENT_LENGTH*2)): + with Timeout(int(SEGMENT_LENGTH*3)): while route_prefix_path is None: try: route_prefix_path = self._get_latest_segment_path().rsplit("--", 1)[0] except Exception: time.sleep(0.1) - continue def check_seg(i): # check each camera file size - for camera, size in cameras.items(): - ext = "ts" if camera=='qcamera' else "hevc" - file_path = f"{route_prefix_path}--{i}/{camera}.{ext}" + for camera, fps, size in CAMERAS: + if not record_front and "dcamera" in camera: + continue + + file_path = f"{route_prefix_path}--{i}/{camera}" # check file size - self.assertTrue(os.path.exists(file_path), f"couldn't find {file_path}") + self.assertTrue(os.path.exists(file_path)) file_size = os.path.getsize(file_path) - self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE), - f"{camera} failed size check: expected {size}, got {file_size}") - - if camera == 'qcamera': - continue + self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE)) # TODO: this ffprobe call is really slow # check frame count cmd = f"ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames \ -of default=nokey=1:noprint_wrappers=1 {file_path}" - expected_frames = SEGMENT_LENGTH * CAMERA_FPS // 2 if (EON and camera=='dcamera') else SEGMENT_LENGTH * CAMERA_FPS - frame_tolerance = 1 if (EON and camera == 'dcamera') else 0 - frame_count = int(subprocess.check_output(cmd, shell=True, encoding='utf8').strip()) + expected_frames = fps * SEGMENT_LENGTH + frame_tolerance = 1 if (EON and camera == 'dcamera.hevc') else 0 + probe = subprocess.check_output(cmd, shell=True, encoding='utf8') + frame_count = int(probe.split('\n')[0].strip()) self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance, f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}") shutil.rmtree(f"{route_prefix_path}--{i}") - def join(ts, timeout): - for t in ts: - t.join(timeout) - - threads = [] for i in trange(num_segments): # poll for next segment with Timeout(int(SEGMENT_LENGTH*2), error_msg=f"timed out waiting for segment {i}"): while int(self._get_latest_segment_path().rsplit("--", 1)[1]) <= i: time.sleep(0.1) - t = threading.Thread(target=check_seg, args=(i, )) - t.start() - threads.append(t) - join(threads, 0.1) - - with Timeout(20): - join(threads, None) + check_seg(i) if __name__ == "__main__": unittest.main()