#!/usr/bin/env python3
import numpy as np
import os
import re
import random
import string
import subprocess
import time
from collections import defaultdict
from pathlib import Path
from flaky import flaky
import cereal . messaging as messaging
from cereal import log
from cereal . services import SERVICE_LIST
from openpilot . common . basedir import BASEDIR
from openpilot . common . params import Params
from openpilot . common . timeout import Timeout
from openpilot . system . hardware . hw import Paths
from openpilot . system . loggerd . xattr_cache import getxattr
from openpilot . system . loggerd . deleter import PRESERVE_ATTR_NAME , PRESERVE_ATTR_VALUE
from openpilot . selfdrive . manager . process_config import managed_processes
from openpilot . system . version import get_version
from openpilot . tools . lib . helpers import RE
from openpilot . tools . lib . logreader import LogReader
from cereal . visionipc import VisionIpcServer , VisionStreamType
from openpilot . common . transformations . camera import DEVICE_CAMERAS
SentinelType = log . Sentinel . SentinelType
CEREAL_SERVICES = [ f for f in log . Event . schema . union_fields if f in SERVICE_LIST
and SERVICE_LIST [ f ] . should_log and " encode " not in f . lower ( ) ]
class TestLoggerd :
def _get_latest_log_dir ( self ) :
log_dirs = sorted ( Path ( Paths . log_root ( ) ) . iterdir ( ) , key = lambda f : f . stat ( ) . st_mtime )
return log_dirs [ - 1 ]
def _get_log_dir ( self , x ) :
for l in x . splitlines ( ) :
for p in l . split ( ' ' ) :
path = Path ( p . strip ( ) )
if path . is_dir ( ) :
return path
return None
def _get_log_fn ( self , x ) :
for l in x . splitlines ( ) :
for p in l . split ( ' ' ) :
path = Path ( p . strip ( ) )
if path . is_file ( ) :
return path
return None
def _gen_bootlog ( self ) :
with Timeout ( 5 ) :
out = subprocess . check_output ( " ./bootlog " , cwd = os . path . join ( BASEDIR , " system/loggerd " ) , encoding = ' utf-8 ' )
log_fn = self . _get_log_fn ( out )
# check existence
assert log_fn is not None
return log_fn
def _check_init_data ( self , msgs ) :
msg = msgs [ 0 ]
assert msg . which ( ) == ' initData '
def _check_sentinel ( self , msgs , route ) :
start_type = SentinelType . startOfRoute if route else SentinelType . startOfSegment
assert msgs [ 1 ] . sentinel . type == start_type
end_type = SentinelType . endOfRoute if route else SentinelType . endOfSegment
assert msgs [ - 1 ] . sentinel . type == end_type
def _publish_random_messages ( self , services : list [ str ] ) - > dict [ str , list ] :
pm = messaging . PubMaster ( services )
managed_processes [ " loggerd " ] . start ( )
for s in services :
assert pm . wait_for_readers_to_update ( s , timeout = 5 )
sent_msgs = defaultdict ( list )
for _ in range ( random . randint ( 2 , 10 ) * 100 ) :
for s in services :
try :
m = messaging . new_message ( s )
except Exception :
m = messaging . new_message ( s , random . randint ( 2 , 10 ) )
pm . send ( s , m )
sent_msgs [ s ] . append ( m )
for s in services :
assert pm . wait_for_readers_to_update ( s , timeout = 5 )
managed_processes [ " loggerd " ] . stop ( )
return sent_msgs
def test_init_data_values ( self ) :
os . environ [ " CLEAN " ] = random . choice ( [ " 0 " , " 1 " ] )
dongle = ' ' . join ( random . choice ( string . printable ) for n in range ( random . randint ( 1 , 100 ) ) )
fake_params = [
# param, initData field, value
( " DongleId " , " dongleId " , dongle ) ,
( " GitCommit " , " gitCommit " , " commit " ) ,
( " GitCommitDate " , " gitCommitDate " , " date " ) ,
( " GitBranch " , " gitBranch " , " branch " ) ,
( " GitRemote " , " gitRemote " , " remote " ) ,
]
params = Params ( )
for k , _ , v in fake_params :
params . put ( k , v )
params . put ( " AccessToken " , " abc " )
lr = list ( LogReader ( str ( self . _gen_bootlog ( ) ) ) )
initData = lr [ 0 ] . initData
assert initData . dirty != bool ( os . environ [ " CLEAN " ] )
assert initData . version == get_version ( )
if os . path . isfile ( " /proc/cmdline " ) :
with open ( " /proc/cmdline " ) as f :
assert list ( initData . kernelArgs ) == f . read ( ) . strip ( ) . split ( " " )
with open ( " /proc/version " ) as f :
assert initData . kernelVersion == f . read ( )
# check params
logged_params = { entry . key : entry . value for entry in initData . params . entries }
expected_params = { k for k , _ , __ in fake_params } | { ' AccessToken ' , ' BootCount ' }
assert set ( logged_params . keys ( ) ) == expected_params , set ( logged_params . keys ( ) ) ^ expected_params
assert logged_params [ ' AccessToken ' ] == b ' ' , f " DONT_LOG param value was logged: { repr ( logged_params [ ' AccessToken ' ] ) } "
for param_key , initData_key , v in fake_params :
assert getattr ( initData , initData_key ) == v
assert logged_params [ param_key ] . decode ( ) == v
@flaky ( max_runs = 3 )
def test_rotation ( self ) :
os . environ [ " LOGGERD_TEST " ] = " 1 "
Params ( ) . put ( " RecordFront " , " 1 " )
d = DEVICE_CAMERAS [ ( " tici " , " ar0231 " ) ]
expected_files = { " rlog " , " qlog " , " qcamera.ts " , " fcamera.hevc " , " dcamera.hevc " , " ecamera.hevc " }
streams = [ ( VisionStreamType . VISION_STREAM_ROAD , ( d . fcam . width , d . fcam . height , 2048 * 2346 , 2048 , 2048 * 1216 ) , " roadCameraState " ) ,
( VisionStreamType . VISION_STREAM_DRIVER , ( d . dcam . width , d . dcam . height , 2048 * 2346 , 2048 , 2048 * 1216 ) , " driverCameraState " ) ,
( VisionStreamType . VISION_STREAM_WIDE_ROAD , ( d . ecam . width , d . ecam . height , 2048 * 2346 , 2048 , 2048 * 1216 ) , " wideRoadCameraState " ) ]
pm = messaging . PubMaster ( [ " roadCameraState " , " driverCameraState " , " wideRoadCameraState " ] )
vipc_server = VisionIpcServer ( " camerad " )
for stream_type , frame_spec , _ in streams :
vipc_server . create_buffers_with_sizes ( stream_type , 40 , False , * ( frame_spec ) )
vipc_server . start_listener ( )
num_segs = random . randint ( 2 , 5 )
length = random . randint ( 1 , 3 )
os . environ [ " LOGGERD_SEGMENT_LENGTH " ] = str ( length )
managed_processes [ " loggerd " ] . start ( )
managed_processes [ " encoderd " ] . start ( )
assert pm . wait_for_readers_to_update ( " roadCameraState " , timeout = 5 )
fps = 20.0
for n in range ( 1 , int ( num_segs * length * fps ) + 1 ) :
for stream_type , frame_spec , state in streams :
dat = np . empty ( frame_spec [ 2 ] , dtype = np . uint8 )
vipc_server . send ( stream_type , dat [ : ] . flatten ( ) . tobytes ( ) , n , n / fps , n / fps )
camera_state = messaging . new_message ( state )
frame = getattr ( camera_state , state )
frame . frameId = n
pm . send ( state , camera_state )
for _ , _ , state in streams :
assert pm . wait_for_readers_to_update ( state , timeout = 5 , dt = 0.001 )
managed_processes [ " loggerd " ] . stop ( )
managed_processes [ " encoderd " ] . stop ( )
route_path = str ( self . _get_latest_log_dir ( ) ) . rsplit ( " -- " , 1 ) [ 0 ]
for n in range ( num_segs ) :
p = Path ( f " { route_path } -- { n } " )
logged = { f . name for f in p . iterdir ( ) if f . is_file ( ) }
diff = logged ^ expected_files
assert len ( diff ) == 0 , f " didn ' t get all expected files. run= { _ } seg= { n } { route_path =} , { diff =} \n { logged =} { expected_files =} "
def test_bootlog ( self ) :
# generate bootlog with fake launch log
launch_log = ' ' . join ( str ( random . choice ( string . printable ) ) for _ in range ( 100 ) )
with open ( " /tmp/launch_log " , " w " ) as f :
f . write ( launch_log )
bootlog_path = self . _gen_bootlog ( )
lr = list ( LogReader ( str ( bootlog_path ) ) )
# check length
assert len ( lr ) == 2 # boot + initData
self . _check_init_data ( lr )
# check msgs
bootlog_msgs = [ m for m in lr if m . which ( ) == ' boot ' ]
assert len ( bootlog_msgs ) == 1
# sanity check values
boot = bootlog_msgs . pop ( ) . boot
assert abs ( boot . wallTimeNanos - time . time_ns ( ) ) < 5 * 1e9 # within 5s
assert boot . launchLog == launch_log
for fn in [ " console-ramoops " , " pmsg-ramoops-0 " ] :
path = Path ( os . path . join ( " /sys/fs/pstore/ " , fn ) )
if path . is_file ( ) :
with open ( path , " rb " ) as f :
expected_val = f . read ( )
bootlog_val = [ e . value for e in boot . pstore . entries if e . key == fn ] [ 0 ]
assert expected_val == bootlog_val
# next one should increment by one
bl1 = re . match ( RE . LOG_ID_V2 , bootlog_path . name )
bl2 = re . match ( RE . LOG_ID_V2 , self . _gen_bootlog ( ) . name )
assert bl1 . group ( ' uid ' ) != bl2 . group ( ' uid ' )
assert int ( bl1 . group ( ' count ' ) ) == 0 and int ( bl2 . group ( ' count ' ) ) == 1
def test_qlog ( self ) :
qlog_services = [ s for s in CEREAL_SERVICES if SERVICE_LIST [ s ] . decimation is not None ]
no_qlog_services = [ s for s in CEREAL_SERVICES if SERVICE_LIST [ s ] . decimation is None ]
services = random . sample ( qlog_services , random . randint ( 2 , min ( 10 , len ( qlog_services ) ) ) ) + \
random . sample ( no_qlog_services , random . randint ( 2 , min ( 10 , len ( no_qlog_services ) ) ) )
sent_msgs = self . _publish_random_messages ( services )
qlog_path = os . path . join ( self . _get_latest_log_dir ( ) , " qlog " )
lr = list ( LogReader ( qlog_path ) )
# check initData and sentinel
self . _check_init_data ( lr )
self . _check_sentinel ( lr , True )
recv_msgs = defaultdict ( list )
for m in lr :
recv_msgs [ m . which ( ) ] . append ( m )
for s , msgs in sent_msgs . items ( ) :
recv_cnt = len ( recv_msgs [ s ] )
if s in no_qlog_services :
# check services with no specific decimation aren't in qlog
assert recv_cnt == 0 , f " got { recv_cnt } { s } msgs in qlog "
else :
# check logged message count matches decimation
expected_cnt = ( len ( msgs ) - 1 ) / / SERVICE_LIST [ s ] . decimation + 1
assert recv_cnt == expected_cnt , f " expected { expected_cnt } msgs for { s } , got { recv_cnt } "
def test_rlog ( self ) :
services = random . sample ( CEREAL_SERVICES , random . randint ( 5 , 10 ) )
sent_msgs = self . _publish_random_messages ( services )
lr = list ( LogReader ( os . path . join ( self . _get_latest_log_dir ( ) , " rlog " ) ) )
# check initData and sentinel
self . _check_init_data ( lr )
self . _check_sentinel ( lr , True )
# check all messages were logged and in order
lr = lr [ 2 : - 1 ] # slice off initData and both sentinels
for m in lr :
sent = sent_msgs [ m . which ( ) ] . pop ( 0 )
sent . clear_write_flag ( )
assert sent . to_bytes ( ) == m . as_builder ( ) . to_bytes ( )
def test_preserving_flagged_segments ( self ) :
services = set ( random . sample ( CEREAL_SERVICES , random . randint ( 5 , 10 ) ) ) | { " userFlag " }
self . _publish_random_messages ( services )
segment_dir = self . _get_latest_log_dir ( )
assert getxattr ( segment_dir , PRESERVE_ATTR_NAME ) == PRESERVE_ATTR_VALUE
def test_not_preserving_unflagged_segments ( self ) :
services = set ( random . sample ( CEREAL_SERVICES , random . randint ( 5 , 10 ) ) ) - { " userFlag " }
self . _publish_random_messages ( services )
segment_dir = self . _get_latest_log_dir ( )
assert getxattr ( segment_dir , PRESERVE_ATTR_NAME ) is None