You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							124 lines
						
					
					
						
							4.0 KiB
						
					
					
				
			
		
		
	
	
							124 lines
						
					
					
						
							4.0 KiB
						
					
					
				#!/usr/bin/env python3
 | 
						|
import math
 | 
						|
import os
 | 
						|
import random
 | 
						|
import shutil
 | 
						|
import subprocess
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
from parameterized import parameterized
 | 
						|
from pathlib import Path
 | 
						|
from tqdm import trange
 | 
						|
 | 
						|
from common.params import Params
 | 
						|
from common.hardware import EON, TICI
 | 
						|
from common.timeout import Timeout
 | 
						|
from selfdrive.test.helpers import with_processes
 | 
						|
from selfdrive.loggerd.config import ROOT, CAMERA_FPS
 | 
						|
 | 
						|
 | 
						|
# baseline file sizes for a 2s segment, in bytes
 | 
						|
FULL_SIZE = 1253786
 | 
						|
if EON:
 | 
						|
  CAMERAS = {
 | 
						|
    "fcamera": FULL_SIZE,
 | 
						|
    "dcamera": 770920,
 | 
						|
    "qcamera": 38533,
 | 
						|
  }
 | 
						|
elif TICI:
 | 
						|
  CAMERAS = {f"{c}camera": FULL_SIZE if c!="q" else 38533 for c in ["f", "e", "d", "q"]}
 | 
						|
else:
 | 
						|
  CAMERAS = {}
 | 
						|
 | 
						|
ALL_CAMERA_COMBINATIONS = [(cameras,) for cameras in [CAMERAS, {k:CAMERAS[k] for k in CAMERAS if k!='dcamera'}]]
 | 
						|
 | 
						|
FRAME_TOLERANCE = 0
 | 
						|
FILE_SIZE_TOLERANCE = 0.5
 | 
						|
 | 
						|
class TestLoggerd(unittest.TestCase):
 | 
						|
 | 
						|
  # TODO: all of loggerd should work on PC
 | 
						|
  @classmethod
 | 
						|
  def setUpClass(cls):
 | 
						|
    if not (EON or TICI):
 | 
						|
      raise unittest.SkipTest
 | 
						|
 | 
						|
  def setUp(self):
 | 
						|
    self._clear_logs()
 | 
						|
 | 
						|
    self.segment_length = 2
 | 
						|
    os.environ["LOGGERD_TEST"] = "1"
 | 
						|
    os.environ["LOGGERD_SEGMENT_LENGTH"] = str(self.segment_length)
 | 
						|
 | 
						|
  def tearDown(self):
 | 
						|
    self._clear_logs()
 | 
						|
 | 
						|
  def _clear_logs(self):
 | 
						|
    if os.path.exists(ROOT):
 | 
						|
      shutil.rmtree(ROOT)
 | 
						|
 | 
						|
  def _get_latest_segment_path(self):
 | 
						|
    last_route = sorted(Path(ROOT).iterdir(), key=os.path.getmtime)[-1]
 | 
						|
    return os.path.join(ROOT, last_route)
 | 
						|
 | 
						|
  # TODO: this should run faster than real time
 | 
						|
  @parameterized.expand(ALL_CAMERA_COMBINATIONS)
 | 
						|
  @with_processes(['camerad', 'sensord', 'loggerd'], init_time=5)
 | 
						|
  def test_log_rotation(self, cameras):
 | 
						|
    print("checking targets:", cameras)
 | 
						|
    Params().put("RecordFront", "1" if 'dcamera' in cameras else "0")
 | 
						|
 | 
						|
    num_segments = random.randint(80, 150)
 | 
						|
    if "CI" in os.environ:
 | 
						|
      num_segments = random.randint(15, 20) # ffprobe is slow on comma two
 | 
						|
 | 
						|
    # wait for loggerd to make the dir for first segment
 | 
						|
    time.sleep(10)
 | 
						|
    route_prefix_path = None
 | 
						|
    with Timeout(30):
 | 
						|
      while route_prefix_path is None:
 | 
						|
        try:
 | 
						|
          route_prefix_path = self._get_latest_segment_path().rsplit("--", 1)[0]
 | 
						|
        except Exception:
 | 
						|
          time.sleep(2)
 | 
						|
          continue
 | 
						|
 | 
						|
    for i in trange(num_segments):
 | 
						|
      # poll for next segment
 | 
						|
      if i < num_segments - 1:
 | 
						|
        with Timeout(self.segment_length*3, error_msg=f"timed out waiting for segment {i}"):
 | 
						|
          while True:
 | 
						|
            seg_num = int(self._get_latest_segment_path().rsplit("--", 1)[1])
 | 
						|
            if seg_num > i:
 | 
						|
              break
 | 
						|
            time.sleep(0.1)
 | 
						|
      else:
 | 
						|
        time.sleep(self.segment_length)
 | 
						|
 | 
						|
      # check each camera file size
 | 
						|
      for camera, size in cameras.items():
 | 
						|
        ext = "ts" if camera=='qcamera' else "hevc"
 | 
						|
        file_path = f"{route_prefix_path}--{i}/{camera}.{ext}"
 | 
						|
 | 
						|
        # check file size
 | 
						|
        self.assertTrue(os.path.exists(file_path), f"couldn't find {file_path}")
 | 
						|
        file_size = os.path.getsize(file_path)
 | 
						|
        self.assertTrue(math.isclose(file_size, size, rel_tol=FILE_SIZE_TOLERANCE),
 | 
						|
                        f"{camera} failed size check: expected {size}, got {file_size}")
 | 
						|
 | 
						|
        if camera == 'qcamera':
 | 
						|
          continue
 | 
						|
 | 
						|
        # check frame count
 | 
						|
        cmd = f"ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames \
 | 
						|
               -of default=nokey=1:noprint_wrappers=1 {file_path}"
 | 
						|
        expected_frames = self.segment_length * CAMERA_FPS // 2 if (EON and camera=='dcamera') else self.segment_length * CAMERA_FPS
 | 
						|
        frame_tolerance = FRAME_TOLERANCE+1 if (EON and camera=='dcamera') or i==0 else FRAME_TOLERANCE
 | 
						|
        frame_count = int(subprocess.check_output(cmd, shell=True, encoding='utf8').strip())
 | 
						|
 | 
						|
        self.assertTrue(abs(expected_frames - frame_count) <= frame_tolerance,
 | 
						|
                        f"{camera} failed frame count check: expected {expected_frames}, got {frame_count}")
 | 
						|
      shutil.rmtree(f"{route_prefix_path}--{i}")
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
  unittest.main()
 | 
						|
 |