|
|
@ -3,6 +3,7 @@ import math |
|
|
|
import os |
|
|
|
import os |
|
|
|
import random |
|
|
|
import random |
|
|
|
import shutil |
|
|
|
import shutil |
|
|
|
|
|
|
|
import subprocess |
|
|
|
import time |
|
|
|
import time |
|
|
|
import unittest |
|
|
|
import unittest |
|
|
|
from pathlib import Path |
|
|
|
from pathlib import Path |
|
|
@ -11,7 +12,7 @@ from common.params import Params |
|
|
|
from common.hardware import EON, TICI |
|
|
|
from common.hardware import EON, TICI |
|
|
|
from common.timeout import Timeout |
|
|
|
from common.timeout import Timeout |
|
|
|
from selfdrive.test.helpers import with_processes |
|
|
|
from selfdrive.test.helpers import with_processes |
|
|
|
from selfdrive.loggerd.config import ROOT |
|
|
|
from selfdrive.loggerd.config import ROOT, CAMERA_FPS |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# baseline file sizes for a 2s segment, in bytes |
|
|
|
# baseline file sizes for a 2s segment, in bytes |
|
|
@ -24,6 +25,7 @@ if EON: |
|
|
|
elif TICI: |
|
|
|
elif TICI: |
|
|
|
CAMERAS = {f"{c}camera": FULL_SIZE for c in ["f", "e", "d"]} |
|
|
|
CAMERAS = {f"{c}camera": FULL_SIZE for c in ["f", "e", "d"]} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FRAME_TOLERANCE = 2 |
|
|
|
FILE_SIZE_TOLERANCE = 0.25 |
|
|
|
FILE_SIZE_TOLERANCE = 0.25 |
|
|
|
|
|
|
|
|
|
|
|
class TestLoggerd(unittest.TestCase): |
|
|
|
class TestLoggerd(unittest.TestCase): |
|
|
@ -74,11 +76,21 @@ class TestLoggerd(unittest.TestCase): |
|
|
|
|
|
|
|
|
|
|
|
# check each camera file size |
|
|
|
# check each camera file size |
|
|
|
for camera, size in CAMERAS.items(): |
|
|
|
for camera, size in CAMERAS.items(): |
|
|
|
f = f"{route_prefix_path}--{i}/{camera}.hevc" |
|
|
|
file_path = f"{route_prefix_path}--{i}/{camera}.hevc" |
|
|
|
self.assertTrue(os.path.exists(f), f"couldn't find {f}") |
|
|
|
|
|
|
|
file_size = os.path.getsize(f) |
|
|
|
# check file size |
|
|
|
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE), |
|
|
|
self.assertTrue(os.path.exists(file_path), f"couldn't find {file_path}") |
|
|
|
|
|
|
|
file_size = os.path.getsize(file_path) |
|
|
|
|
|
|
|
self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE), |
|
|
|
f"{camera} failed size check: expected {size}, got {file_size}") |
|
|
|
f"{camera} failed size check: expected {size}, got {file_size}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# check frame count |
|
|
|
|
|
|
|
cmd = "fprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames \ |
|
|
|
|
|
|
|
-of default=nokey=1:noprint_wrappers=1 {file_path}" |
|
|
|
|
|
|
|
expected_frames = self.segment_length * CAMERA_FPS |
|
|
|
|
|
|
|
frame_count = int(subprocess.check_output(cmd, shell=True, encoding='utf8').strip()) |
|
|
|
|
|
|
|
self.assertTrue(abs(expected_frames - frame_count) <= FRAME_TOLERANCE, |
|
|
|
|
|
|
|
f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}") |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
if __name__ == "__main__": |
|
|
|
unittest.main() |
|
|
|
unittest.main() |
|
|
|