@ -1,4 +1,5 @@
#!/usr/bin/env python3
import numpy as np
import os
import random
import string
@ -14,12 +15,14 @@ from cereal.services import service_list
from common . basedir import BASEDIR
from common . params import Params
from common . timeout import Timeout
from selfdrive . hardware import PC , TICI
from selfdrive . hardware import TICI
from selfdrive . loggerd . config import ROOT
from selfdrive . manager . process_config import managed_processes
from selfdrive . test . helpers import with_processes
from selfdrive . version import get_version
from tools . lib . logreader import LogReader
from cereal . visionipc . visionipc_pyx import VisionIpcServer , VisionStreamType # pylint: disable=no-name-in-module, import-error
from common . transformations . camera import eon_f_frame_size , tici_f_frame_size , \
eon_d_frame_size , tici_d_frame_size , tici_e_frame_size
SentinelType = log . Sentinel . SentinelType
@ -28,12 +31,6 @@ CEREAL_SERVICES = [f for f in log.Event.schema.union_fields if f in service_list
class TestLoggerd ( unittest . TestCase ) :
# TODO: all tests should work on PC
@classmethod
def setUpClass ( cls ) :
if PC :
raise unittest . SkipTest
def _get_latest_log_dir ( self ) :
log_dirs = sorted ( Path ( ROOT ) . iterdir ( ) , key = lambda f : f . stat ( ) . st_mtime )
return log_dirs [ - 1 ]
@ -107,25 +104,41 @@ class TestLoggerd(unittest.TestCase):
for _ , k , v in fake_params :
self . assertEqual ( getattr ( initData , k ) , v )
# TODO: this shouldn't need camerad
@with_processes ( [ ' camerad ' ] )
def test_rotation ( self ) :
os . environ [ " LOGGERD_TEST " ] = " 1 "
Params ( ) . put ( " RecordFront " , " 1 " )
expected_files = { " rlog.bz2 " , " qlog.bz2 " , " qcamera.ts " , " fcamera.hevc " , " dcamera.hevc " }
streams = [ ( VisionStreamType . VISION_STREAM_ROAD , tici_f_frame_size if TICI else eon_f_frame_size , " roadCameraState " ) ,
( VisionStreamType . VISION_STREAM_DRIVER , tici_d_frame_size if TICI else eon_d_frame_size , " driverCameraState " ) ]
if TICI :
expected_files . add ( " ecamera.hevc " )
streams . append ( ( VisionStreamType . VISION_STREAM_WIDE_ROAD , tici_e_frame_size , " wideRoadCameraState " ) )
# give camerad time to start
time . sleep ( 3 )
pm = messaging . PubMaster ( [ " roadCameraState " , " driverCameraState " , " wideRoadCameraState " ] )
vipc_server = VisionIpcServer ( " camerad " )
for stream_type , frame_size , _ in streams :
vipc_server . create_buffers ( stream_type , 40 , False , * ( frame_size ) )
vipc_server . start_listener ( )
for _ in range ( 5 ) :
num_segs = random . randint ( 2 , 5 )
length = random . randint ( 1 , 3 )
os . environ [ " LOGGERD_SEGMENT_LENGTH " ] = str ( length )
managed_processes [ " loggerd " ] . start ( )
time . sleep ( num_segs * length + 1 )
fps = 20.0
for n in range ( 1 , int ( num_segs * length * fps ) + 1 ) :
for stream_type , frame_size , state in streams :
dat = np . empty ( int ( frame_size [ 0 ] * frame_size [ 1 ] * 3 / 2 ) , dtype = np . uint8 )
vipc_server . send ( stream_type , dat [ : ] . flatten ( ) . tobytes ( ) , n , n / fps , n / fps )
camera_state = messaging . new_message ( state )
frame = getattr ( camera_state , state )
frame . frameId = n
pm . send ( state , camera_state )
time . sleep ( 1.0 / fps )
managed_processes [ " loggerd " ] . stop ( )
route_path = str ( self . _get_latest_log_dir ( ) ) . rsplit ( " -- " , 1 ) [ 0 ]