test: RecordFront and RecordAudio (#35650)

record front and record audio tests
pull/35652/head
Jimmy 1 week ago committed by GitHub
parent 8e3b5f6210
commit 1b92dbb46f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 103
      system/loggerd/tests/test_loggerd.py

@ -97,6 +97,50 @@ class TestLoggerd:
return sent_msgs return sent_msgs
def _publish_camera_and_audio_messages(self, num_segs=1, segment_length=5):
d = DEVICE_CAMERAS[("tici", "ar0231")]
streams = [
(VisionStreamType.VISION_STREAM_ROAD, (d.fcam.width, d.fcam.height, 2048 * 2346, 2048, 2048 * 1216), "roadCameraState"),
(VisionStreamType.VISION_STREAM_DRIVER, (d.dcam.width, d.dcam.height, 2048 * 2346, 2048, 2048 * 1216), "driverCameraState"),
(VisionStreamType.VISION_STREAM_WIDE_ROAD, (d.ecam.width, d.ecam.height, 2048 * 2346, 2048, 2048 * 1216), "wideRoadCameraState"),
]
pm = messaging.PubMaster([s for _, _, s in streams] + ["rawAudioData"])
vipc_server = VisionIpcServer("camerad")
for stream_type, frame_spec, _ in streams:
vipc_server.create_buffers_with_sizes(stream_type, 40, *(frame_spec))
vipc_server.start_listener()
os.environ["LOGGERD_TEST"] = "1"
os.environ["LOGGERD_SEGMENT_LENGTH"] = str(segment_length)
managed_processes["loggerd"].start()
managed_processes["encoderd"].start()
assert pm.wait_for_readers_to_update("roadCameraState", timeout=5)
fps = 20
for n in range(1, int(num_segs * segment_length * fps) + 1):
# send video
for stream_type, frame_spec, state in streams:
dat = np.empty(frame_spec[2], dtype=np.uint8)
vipc_server.send(stream_type, dat[:].flatten().tobytes(), n, n / fps, n / fps)
camera_state = messaging.new_message(state)
frame = getattr(camera_state, state)
frame.frameId = n
pm.send(state, camera_state)
# send audio
msg = messaging.new_message('rawAudioData')
msg.rawAudioData.data = bytes(800 * 2) # 800 samples of int16
msg.rawAudioData.sampleRate = 16000
pm.send('rawAudioData', msg)
for _, _, state in streams:
assert pm.wait_for_readers_to_update(state, timeout=5, dt=0.001)
managed_processes["loggerd"].stop()
managed_processes["encoderd"].stop()
def test_init_data_values(self): def test_init_data_values(self):
os.environ["CLEAN"] = random.choice(["0", "1"]) os.environ["CLEAN"] = random.choice(["0", "1"])
@ -138,51 +182,21 @@ class TestLoggerd:
@pytest.mark.skip("FIXME: encoderd sometimes crashes in CI when running with pytest-xdist") @pytest.mark.skip("FIXME: encoderd sometimes crashes in CI when running with pytest-xdist")
def test_rotation(self): def test_rotation(self):
os.environ["LOGGERD_TEST"] = "1"
Params().put("RecordFront", "1") Params().put("RecordFront", "1")
d = DEVICE_CAMERAS[("tici", "ar0231")]
expected_files = {"rlog.zst", "qlog.zst", "qcamera.ts", "fcamera.hevc", "dcamera.hevc", "ecamera.hevc"} expected_files = {"rlog.zst", "qlog.zst", "qcamera.ts", "fcamera.hevc", "dcamera.hevc", "ecamera.hevc"}
streams = [(VisionStreamType.VISION_STREAM_ROAD, (d.fcam.width, d.fcam.height, 2048*2346, 2048, 2048*1216), "roadCameraState"),
(VisionStreamType.VISION_STREAM_DRIVER, (d.dcam.width, d.dcam.height, 2048*2346, 2048, 2048*1216), "driverCameraState"),
(VisionStreamType.VISION_STREAM_WIDE_ROAD, (d.ecam.width, d.ecam.height, 2048*2346, 2048, 2048*1216), "wideRoadCameraState")]
pm = messaging.PubMaster(["roadCameraState", "driverCameraState", "wideRoadCameraState"])
vipc_server = VisionIpcServer("camerad")
for stream_type, frame_spec, _ in streams:
vipc_server.create_buffers_with_sizes(stream_type, 40, *(frame_spec))
vipc_server.start_listener()
num_segs = random.randint(2, 5) num_segs = random.randint(2, 5)
length = random.randint(1, 3) length = random.randint(1, 3)
os.environ["LOGGERD_SEGMENT_LENGTH"] = str(length)
managed_processes["loggerd"].start()
managed_processes["encoderd"].start()
assert pm.wait_for_readers_to_update("roadCameraState", timeout=5)
fps = 20.0
for n in range(1, int(num_segs*length*fps)+1):
for stream_type, frame_spec, state in streams:
dat = np.empty(frame_spec[2], dtype=np.uint8)
vipc_server.send(stream_type, dat[:].flatten().tobytes(), n, n/fps, n/fps)
camera_state = messaging.new_message(state)
frame = getattr(camera_state, state)
frame.frameId = n
pm.send(state, camera_state)
for _, _, state in streams:
assert pm.wait_for_readers_to_update(state, timeout=5, dt=0.001)
managed_processes["loggerd"].stop() self._publish_camera_and_audio_messages(num_segs=num_segs, segment_length=length)
managed_processes["encoderd"].stop()
route_path = str(self._get_latest_log_dir()).rsplit("--", 1)[0] route_path = str(self._get_latest_log_dir()).rsplit("--", 1)[0]
for n in range(num_segs): for n in range(num_segs):
p = Path(f"{route_path}--{n}") p = Path(f"{route_path}--{n}")
logged = {f.name for f in p.iterdir() if f.is_file()} logged = {f.name for f in p.iterdir() if f.is_file()}
diff = logged ^ expected_files diff = logged ^ expected_files
assert len(diff) == 0, f"didn't get all expected files. run={_} seg={n} {route_path=}, {diff=}\n{logged=} {expected_files=}" assert len(diff) == 0, f"didn't get all expected files. seg={n} {route_path=}, {diff=}\n{logged=} {expected_files=}"
def test_bootlog(self): def test_bootlog(self):
# generate bootlog with fake launch log # generate bootlog with fake launch log
@ -281,3 +295,28 @@ class TestLoggerd:
segment_dir = self._get_latest_log_dir() segment_dir = self._get_latest_log_dir()
assert getxattr(segment_dir, PRESERVE_ATTR_NAME) is None assert getxattr(segment_dir, PRESERVE_ATTR_NAME) is None
@pytest.mark.parametrize("record_front", [True, False])
def test_record_front(self, record_front):
params = Params()
params.put_bool("RecordFront", record_front)
self._publish_camera_and_audio_messages()
dcamera_hevc_exists = os.path.exists(os.path.join(self._get_latest_log_dir(), 'dcamera.hevc'))
assert dcamera_hevc_exists == record_front
@pytest.mark.parametrize("record_audio", [True, False])
def test_record_audio(self, record_audio):
params = Params()
params.put_bool("RecordAudio", record_audio)
self._publish_camera_and_audio_messages()
qcamera_ts_path = os.path.join(self._get_latest_log_dir(), 'qcamera.ts')
ffprobe_cmd = f"ffprobe -i {qcamera_ts_path} -show_streams -select_streams a -loglevel error"
has_audio_stream = subprocess.run(ffprobe_cmd, shell=True, capture_output=True).stdout.strip() != b''
assert has_audio_stream == record_audio
raw_audio_in_rlog = any(m.which() == 'rawAudioData' for m in LogReader(os.path.join(self._get_latest_log_dir(), 'rlog.zst')))
assert raw_audio_in_rlog == record_audio

Loading…
Cancel
Save