#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  numpy  as  np 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  os 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  re 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  random 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  string 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  subprocess 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  collections  import  defaultdict 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  pathlib  import  Path 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  flaky  import  flaky 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  cereal . messaging  as  messaging 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal  import  log 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  cereal . services  import  SERVICE_LIST 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . basedir  import  BASEDIR 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . common . params  import  Params 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . common . timeout  import  Timeout 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . system . hardware . hw  import  Paths 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . system . loggerd . xattr_cache  import  getxattr 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . system . loggerd . deleter  import  PRESERVE_ATTR_NAME ,  PRESERVE_ATTR_VALUE 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . selfdrive . manager . process_config  import  managed_processes 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  openpilot . system . version  import  get_version 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . tools . lib . helpers  import  RE 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . tools . lib . logreader  import  LogReader 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  cereal . visionipc  import  VisionIpcServer ,  VisionStreamType 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								from  openpilot . common . transformations . camera  import  DEVICE_CAMERAS 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								SentinelType  =  log . Sentinel . SentinelType 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								CEREAL_SERVICES  =  [ f  for  f  in  log . Event . schema . union_fields  if  f  in  SERVICE_LIST 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								                   and  SERVICE_LIST [ f ] . should_log  and  " encode "  not  in  f . lower ( ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								class  TestLoggerd : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _get_latest_log_dir ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    log_dirs  =  sorted ( Path ( Paths . log_root ( ) ) . iterdir ( ) ,  key = lambda  f :  f . stat ( ) . st_mtime ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  log_dirs [ - 1 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _get_log_dir ( self ,  x ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  l  in  x . splitlines ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  p  in  l . split ( '   ' ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        path  =  Path ( p . strip ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  path . is_dir ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          return  path 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _get_log_fn ( self ,  x ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  l  in  x . splitlines ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  p  in  l . split ( '   ' ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        path  =  Path ( p . strip ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        if  path . is_file ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          return  path 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _gen_bootlog ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    with  Timeout ( 5 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      out  =  subprocess . check_output ( " ./bootlog " ,  cwd = os . path . join ( BASEDIR ,  " system/loggerd " ) ,  encoding = ' utf-8 ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    log_fn  =  self . _get_log_fn ( out ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # check existence 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  log_fn  is  not  None 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  log_fn 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _check_init_data ( self ,  msgs ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    msg  =  msgs [ 0 ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  msg . which ( )  ==  ' initData ' 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  _check_sentinel ( self ,  msgs ,  route ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    start_type  =  SentinelType . startOfRoute  if  route  else  SentinelType . startOfSegment 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  msgs [ 1 ] . sentinel . type  ==  start_type 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    end_type  =  SentinelType . endOfRoute  if  route  else  SentinelType . endOfSegment 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  msgs [ - 1 ] . sentinel . type  ==  end_type 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  _publish_random_messages ( self ,  services :  list [ str ] )  - >  dict [ str ,  list ] : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    pm  =  messaging . PubMaster ( services ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    managed_processes [ " loggerd " ] . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  s  in  services : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      assert  pm . wait_for_readers_to_update ( s ,  timeout = 5 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    sent_msgs  =  defaultdict ( list ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  _  in  range ( random . randint ( 2 ,  10 )  *  100 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  s  in  services : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        try : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          m  =  messaging . new_message ( s ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          m  =  messaging . new_message ( s ,  random . randint ( 2 ,  10 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        pm . send ( s ,  m ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        sent_msgs [ s ] . append ( m ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  s  in  services : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      assert  pm . wait_for_readers_to_update ( s ,  timeout = 5 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    managed_processes [ " loggerd " ] . stop ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  sent_msgs 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_init_data_values ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    os . environ [ " CLEAN " ]  =  random . choice ( [ " 0 " ,  " 1 " ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    dongle   =  ' ' . join ( random . choice ( string . printable )  for  n  in  range ( random . randint ( 1 ,  100 ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    fake_params  =  [ 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      # param, initData field, value 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ( " DongleId " ,  " dongleId " ,  dongle ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ( " GitCommit " ,  " gitCommit " ,  " commit " ) , 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      ( " GitCommitDate " ,  " gitCommitDate " ,  " date " ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ( " GitBranch " ,  " gitBranch " ,  " branch " ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      ( " GitRemote " ,  " gitRemote " ,  " remote " ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    params  =  Params ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  k ,  _ ,  v  in  fake_params : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      params . put ( k ,  v ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    params . put ( " AccessToken " ,  " abc " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lr  =  list ( LogReader ( str ( self . _gen_bootlog ( ) ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    initData  =  lr [ 0 ] . initData 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  initData . dirty  !=  bool ( os . environ [ " CLEAN " ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  initData . version  ==  get_version ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  os . path . isfile ( " /proc/cmdline " ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      with  open ( " /proc/cmdline " )  as  f : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        assert  list ( initData . kernelArgs )  ==  f . read ( ) . strip ( ) . split ( "   " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      with  open ( " /proc/version " )  as  f : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        assert  initData . kernelVersion  ==  f . read ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # check params 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    logged_params  =  { entry . key :  entry . value  for  entry  in  initData . params . entries } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    expected_params  =  { k  for  k ,  _ ,  __  in  fake_params }  |  { ' AccessToken ' ,  ' BootCount ' } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  set ( logged_params . keys ( ) )  ==  expected_params ,  set ( logged_params . keys ( ) )  ^  expected_params 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  logged_params [ ' AccessToken ' ]  ==  b ' ' ,  f " DONT_LOG param value was logged:  { repr ( logged_params [ ' AccessToken ' ] ) } " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  param_key ,  initData_key ,  v  in  fake_params : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      assert  getattr ( initData ,  initData_key )  ==  v 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      assert  logged_params [ param_key ] . decode ( )  ==  v 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  @flaky ( max_runs = 3 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_rotation ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    os . environ [ " LOGGERD_TEST " ]  =  " 1 " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    Params ( ) . put ( " RecordFront " ,  " 1 " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    d  =  DEVICE_CAMERAS [ ( " tici " ,  " ar0231 " ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    expected_files  =  { " rlog " ,  " qlog " ,  " qcamera.ts " ,  " fcamera.hevc " ,  " dcamera.hevc " ,  " ecamera.hevc " } 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    streams  =  [ ( VisionStreamType . VISION_STREAM_ROAD ,  ( d . fcam . width ,  d . fcam . height ,  2048 * 2346 ,  2048 ,  2048 * 1216 ) ,  " roadCameraState " ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								               ( VisionStreamType . VISION_STREAM_DRIVER ,  ( d . dcam . width ,  d . dcam . height ,  2048 * 2346 ,  2048 ,  2048 * 1216 ) ,  " driverCameraState " ) , 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								               ( VisionStreamType . VISION_STREAM_WIDE_ROAD ,  ( d . ecam . width ,  d . ecam . height ,  2048 * 2346 ,  2048 ,  2048 * 1216 ) ,  " wideRoadCameraState " ) ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    pm  =  messaging . PubMaster ( [ " roadCameraState " ,  " driverCameraState " ,  " wideRoadCameraState " ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    vipc_server  =  VisionIpcServer ( " camerad " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  stream_type ,  frame_spec ,  _  in  streams : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      vipc_server . create_buffers_with_sizes ( stream_type ,  40 ,  False ,  * ( frame_spec ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    vipc_server . start_listener ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    num_segs  =  random . randint ( 2 ,  5 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    length  =  random . randint ( 1 ,  3 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    os . environ [ " LOGGERD_SEGMENT_LENGTH " ]  =  str ( length ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    managed_processes [ " loggerd " ] . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    managed_processes [ " encoderd " ] . start ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  pm . wait_for_readers_to_update ( " roadCameraState " ,  timeout = 5 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    fps  =  20.0 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  n  in  range ( 1 ,  int ( num_segs * length * fps ) + 1 ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  stream_type ,  frame_spec ,  state  in  streams : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        dat  =  np . empty ( frame_spec [ 2 ] ,  dtype = np . uint8 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        vipc_server . send ( stream_type ,  dat [ : ] . flatten ( ) . tobytes ( ) ,  n ,  n / fps ,  n / fps ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        camera_state  =  messaging . new_message ( state ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        frame  =  getattr ( camera_state ,  state ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        frame . frameId  =  n 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        pm . send ( state ,  camera_state ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      for  _ ,  _ ,  state  in  streams : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        assert  pm . wait_for_readers_to_update ( state ,  timeout = 5 ,  dt = 0.001 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    managed_processes [ " loggerd " ] . stop ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    managed_processes [ " encoderd " ] . stop ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    route_path  =  str ( self . _get_latest_log_dir ( ) ) . rsplit ( " -- " ,  1 ) [ 0 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  n  in  range ( num_segs ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      p  =  Path ( f " { route_path } -- { n } " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      logged  =  { f . name  for  f  in  p . iterdir ( )  if  f . is_file ( ) } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      diff  =  logged  ^  expected_files 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      assert  len ( diff )  ==  0 ,  f " didn ' t get all expected files. run= { _ }  seg= { n }   { route_path =} ,  { diff =} \n { logged =}   { expected_files =} " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_bootlog ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # generate bootlog with fake launch log 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    launch_log  =  ' ' . join ( str ( random . choice ( string . printable ) )  for  _  in  range ( 100 ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    with  open ( " /tmp/launch_log " ,  " w " )  as  f : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      f . write ( launch_log ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    bootlog_path  =  self . _gen_bootlog ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lr  =  list ( LogReader ( str ( bootlog_path ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # check length 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  len ( lr )  ==  2   # boot + initData 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _check_init_data ( lr ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # check msgs 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    bootlog_msgs  =  [ m  for  m  in  lr  if  m . which ( )  ==  ' boot ' ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  len ( bootlog_msgs )  ==  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # sanity check values 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    boot  =  bootlog_msgs . pop ( ) . boot 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  abs ( boot . wallTimeNanos  -  time . time_ns ( ) )  <  5 * 1e9  # within 5s 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  boot . launchLog  ==  launch_log 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  fn  in  [ " console-ramoops " ,  " pmsg-ramoops-0 " ] : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      path  =  Path ( os . path . join ( " /sys/fs/pstore/ " ,  fn ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  path . is_file ( ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        with  open ( path ,  " rb " )  as  f : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								          expected_val  =  f . read ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        bootlog_val  =  [ e . value  for  e  in  boot . pstore . entries  if  e . key  ==  fn ] [ 0 ] 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        assert  expected_val  ==  bootlog_val 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # next one should increment by one 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    bl1  =  re . match ( RE . LOG_ID_V2 ,  bootlog_path . name ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    bl2  =  re . match ( RE . LOG_ID_V2 ,  self . _gen_bootlog ( ) . name ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  bl1 . group ( ' uid ' )  !=  bl2 . group ( ' uid ' ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    assert  int ( bl1 . group ( ' count ' ) )  ==  0  and  int ( bl2 . group ( ' count ' ) )  ==  1 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_qlog ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    qlog_services  =  [ s  for  s  in  CEREAL_SERVICES  if  SERVICE_LIST [ s ] . decimation  is  not  None ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    no_qlog_services  =  [ s  for  s  in  CEREAL_SERVICES  if  SERVICE_LIST [ s ] . decimation  is  None ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    services  =  random . sample ( qlog_services ,  random . randint ( 2 ,  min ( 10 ,  len ( qlog_services ) ) ) )  +  \
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								               random . sample ( no_qlog_services ,  random . randint ( 2 ,  min ( 10 ,  len ( no_qlog_services ) ) ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    sent_msgs  =  self . _publish_random_messages ( services ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    qlog_path  =  os . path . join ( self . _get_latest_log_dir ( ) ,  " qlog " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lr  =  list ( LogReader ( qlog_path ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # check initData and sentinel 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _check_init_data ( lr ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _check_sentinel ( lr ,  True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    recv_msgs  =  defaultdict ( list ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  m  in  lr : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      recv_msgs [ m . which ( ) ] . append ( m ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  s ,  msgs  in  sent_msgs . items ( ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      recv_cnt  =  len ( recv_msgs [ s ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  s  in  no_qlog_services : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # check services with no specific decimation aren't in qlog 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        assert  recv_cnt  ==  0 ,  f " got  { recv_cnt }   { s }  msgs in qlog " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      else : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        # check logged message count matches decimation 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        expected_cnt  =  ( len ( msgs )  -  1 )  / /  SERVICE_LIST [ s ] . decimation  +  1 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								        assert  recv_cnt  ==  expected_cnt ,  f " expected  { expected_cnt }  msgs for  { s } , got  { recv_cnt } " 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_rlog ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    services  =  random . sample ( CEREAL_SERVICES ,  random . randint ( 5 ,  10 ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    sent_msgs  =  self . _publish_random_messages ( services ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lr  =  list ( LogReader ( os . path . join ( self . _get_latest_log_dir ( ) ,  " rlog " ) ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # check initData and sentinel 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _check_init_data ( lr ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _check_sentinel ( lr ,  True ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # check all messages were logged and in order 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    lr  =  lr [ 2 : - 1 ]  # slice off initData and both sentinels 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  m  in  lr : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      sent  =  sent_msgs [ m . which ( ) ] . pop ( 0 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      sent . clear_write_flag ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      assert  sent . to_bytes ( )  ==  m . as_builder ( ) . to_bytes ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_preserving_flagged_segments ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    services  =  set ( random . sample ( CEREAL_SERVICES ,  random . randint ( 5 ,  10 ) ) )  |  { " userFlag " } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _publish_random_messages ( services ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    segment_dir  =  self . _get_latest_log_dir ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  getxattr ( segment_dir ,  PRESERVE_ATTR_NAME )  ==  PRESERVE_ATTR_VALUE 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_not_preserving_unflagged_segments ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    services  =  set ( random . sample ( CEREAL_SERVICES ,  random . randint ( 5 ,  10 ) ) )  -  { " userFlag " } 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . _publish_random_messages ( services ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    segment_dir  =  self . _get_latest_log_dir ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    assert  getxattr ( segment_dir ,  PRESERVE_ATTR_NAME )  is  None