@ -4,7 +4,6 @@ import os 
			
		
	
		
		
			
				
					
					import  random import  random  
			
		
	
		
		
			
				
					
					import  shutil import  shutil  
			
		
	
		
		
			
				
					
					import  subprocess import  subprocess  
			
		
	
		
		
			
				
					
					import  threading  
			
		
	
		
		
			
				
					
					import  time import  time  
			
		
	
		
		
			
				
					
					import  unittest import  unittest  
			
		
	
		
		
			
				
					
					from  parameterized  import  parameterized from  parameterized  import  parameterized  
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -15,22 +14,24 @@ from common.params import Params 
			
		
	
		
		
			
				
					
					from  common . timeout  import  Timeout from  common . timeout  import  Timeout  
			
		
	
		
		
			
				
					
					from  selfdrive . hardware  import  EON ,  TICI from  selfdrive . hardware  import  EON ,  TICI  
			
		
	
		
		
			
				
					
					from  selfdrive . test . helpers  import  with_processes from  selfdrive . test . helpers  import  with_processes  
			
		
	
		
		
			
				
					
					from  selfdrive . loggerd . config  import  ROOT ,  CAMERA_FPS from  selfdrive . loggerd . config  import  ROOT  
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					# baseline file sizes for a 2s segment, in bytes  
			
		
	
		
		
			
				
					
					SEGMENT_LENGTH  =  2 SEGMENT_LENGTH  =  2  
			
		
	
		
		
			
				
					
					FULL_SIZE  =  1253786 FULL_SIZE  =  1253786   # file size for a 2s segment in bytes   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					if  EON : if  EON :  
			
		
	
		
		
			
				
					
					  CAMERAS  =  {    CAMERAS  =  [   
			
				
				
			
		
	
		
		
			
				
					
					    " fcamera " :  FULL_SIZE ,      ( " fcamera.hevc  " ,  20 ,  FULL_SIZE ) ,   
			
				
				
			
		
	
		
		
			
				
					
					    " dcamera " :  770920 ,      ( " dcamera.hevc  " ,  10 ,  770920 ) ,   
			
				
				
			
		
	
		
		
			
				
					
					    " qcamera " :  38533 ,      ( " qcamera.ts  " ,  20 ,  38533 ) ,   
			
				
				
			
		
	
		
		
			
				
					
					  }    ]   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					else : else :  
			
		
	
		
		
			
				
					
					  CAMERAS  =  { f " { c } camera " :  FULL_SIZE  if  c != " q "  else  38533  for  c  in  [ " f " ,  " e " ,  " d " ,  " q " ] }    CAMERAS  =  [   
			
				
				
			
		
	
		
		
			
				
					
					
    ( " fcamera.hevc " ,  20 ,  FULL_SIZE ) ,   
			
				
				
			
		
	
		
		
			
				
					
					ALL_CAMERA_COMBINATIONS  =  [ ( cameras , )  for  cameras  in  [ CAMERAS ,  { k : CAMERAS [ k ]  for  k  in  CAMERAS  if  k != ' dcamera ' } ] ]     ( " dcamera.hevc " ,  20 ,  FULL_SIZE ) ,   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					    ( " ecamera.hevc " ,  20 ,  FULL_SIZE ) ,   
			
		
	
		
		
			
				
					
					    ( " qcamera.ts " ,  20 ,  38533 ) ,   
			
		
	
		
		
			
				
					
					  ]   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					# we check frame count, so we don't have to be too strict on size # we check frame count, so we don't have to be too strict on size  
			
		
	
		
		
			
				
					
					FILE_SIZE_TOLERANCE  =  0.5 FILE_SIZE_TOLERANCE  =  0.5  
			
		
	
	
		
		
			
				
					
						
							
								 
						
						
							
								 
						
						
					 
					@ -60,11 +61,10 @@ class TestEncoder(unittest.TestCase): 
			
		
	
		
		
			
				
					
					    return  os . path . join ( ROOT ,  last_route )      return  os . path . join ( ROOT ,  last_route )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					  # TODO: this should run faster than real time    # TODO: this should run faster than real time   
			
		
	
		
		
			
				
					
					  @parameterized . expand ( ALL_CAMERA_COMBINATIONS )    @parameterized . expand ( [ ( True ,  ) ,  ( False ,  ) ] )   
			
				
				
			
		
	
		
		
			
				
					
					  @with_processes ( [ ' camerad ' ,  ' sensord ' ,  ' loggerd ' ] )    @with_processes ( [ ' camerad ' ,  ' sensord ' ,  ' loggerd ' ] ,  init_time = 3 )   
			
				
				
			
		
	
		
		
			
				
					
					  def  test_log_rotation ( self ,  cameras ) :    def  test_log_rotation ( self ,  record_front ) :   
			
				
				
			
		
	
		
		
			
				
					
					    print ( " checking targets: " ,  cameras )      Params ( ) . put ( " RecordFront " ,  str ( int ( record_front ) ) )   
			
				
				
			
		
	
		
		
			
				
					
					    Params ( ) . put ( " RecordFront " ,  " 1 "  if  ' dcamera '  in  cameras  else  " 0 " )   
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    num_segments  =  random . randint ( 80 ,  150 )      num_segments  =  random . randint ( 80 ,  150 )   
			
		
	
		
		
			
				
					
					    if  " CI "  in  os . environ :      if  " CI "  in  os . environ :   
			
		
	
	
		
		
			
				
					
						
						
						
							
								 
						
					 
					@ -72,58 +72,45 @@ class TestEncoder(unittest.TestCase): 
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    # wait for loggerd to make the dir for first segment      # wait for loggerd to make the dir for first segment   
			
		
	
		
		
			
				
					
					    route_prefix_path  =  None      route_prefix_path  =  None   
			
		
	
		
		
			
				
					
					    with  Timeout ( int ( SEGMENT_LENGTH * 2 ) ) :      with  Timeout ( int ( SEGMENT_LENGTH * 3 ) ) :   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					      while  route_prefix_path  is  None :        while  route_prefix_path  is  None :   
			
		
	
		
		
			
				
					
					        try :          try :   
			
		
	
		
		
			
				
					
					          route_prefix_path  =  self . _get_latest_segment_path ( ) . rsplit ( " -- " ,  1 ) [ 0 ]            route_prefix_path  =  self . _get_latest_segment_path ( ) . rsplit ( " -- " ,  1 ) [ 0 ]   
			
		
	
		
		
			
				
					
					        except  Exception :          except  Exception :   
			
		
	
		
		
			
				
					
					          time . sleep ( 0.1 )            time . sleep ( 0.1 )   
			
		
	
		
		
			
				
					
					          continue   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    def  check_seg ( i ) :      def  check_seg ( i ) :   
			
		
	
		
		
			
				
					
					      # check each camera file size        # check each camera file size   
			
		
	
		
		
			
				
					
					      for  camera ,  size  in  cameras . items ( ) :        for  camera ,  fps ,  size  in  CAMERAS :   
			
				
				
			
		
	
		
		
			
				
					
					        ext  =  " ts "  if  camera == ' qcamera '  else  " hevc "          if  not  record_front  and  " dcamera "  in  camera :   
			
				
				
			
		
	
		
		
			
				
					
					        file_path  =  f " { route_prefix_path } -- { i } / { camera } . { ext } "            continue   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					        file_path  =  f " { route_prefix_path } -- { i } / { camera } "   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					        # check file size          # check file size   
			
		
	
		
		
			
				
					
					        self . assertTrue ( os . path . exists ( file_path ) ,  f " couldn ' t find  { file_path } " )          self . assertTrue ( os . path . exists ( file_path ) )   
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					        file_size  =  os . path . getsize ( file_path )          file_size  =  os . path . getsize ( file_path )   
			
		
	
		
		
			
				
					
					        self . assertTrue ( math . isclose ( file_size ,  size ,  rel_tol = FILE_SIZE_TOLERANCE ) ,          self . assertTrue ( math . isclose ( file_size ,  size ,  rel_tol = FILE_SIZE_TOLERANCE ) )   
			
				
				
			
		
	
		
		
			
				
					
					                        f " { camera }  failed size check: expected  { size } , got  { file_size } " )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					        if  camera  ==  ' qcamera ' :   
			
		
	
		
		
			
				
					
					          continue   
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					        # TODO: this ffprobe call is really slow          # TODO: this ffprobe call is really slow   
			
		
	
		
		
			
				
					
					        # check frame count          # check frame count   
			
		
	
		
		
			
				
					
					        cmd  =  f " ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames  \          cmd  =  f " ffprobe -v error -count_frames -select_streams v:0 -show_entries stream=nb_read_frames  \   
			
		
	
		
		
			
				
					
					               - of  default = nokey = 1 : noprint_wrappers = 1  { file_path } "                 - of  default = nokey = 1 : noprint_wrappers = 1  { file_path } "   
			
		
	
		
		
			
				
					
					        expected_frames  =  SEGMENT_LENGTH  *  CAMERA_FPS  / /  2  if  ( EON  and  camera == ' dcamera ' )  else  SEGMENT_LENGTH  *  CAMERA_FPS          expected_frames  =  fps  *  SEGMENT_LENGTH   
			
				
				
			
		
	
		
		
			
				
					
					        frame_tolerance  =  1  if  ( EON  and  camera  ==  ' dcamera ' )  else  0          frame_tolerance  =  1  if  ( EON  and  camera  ==  ' dcamera.hevc ' )  else  0   
			
				
				
			
		
	
		
		
			
				
					
					        frame_count  =  int ( subprocess . check_output ( cmd ,  shell = True ,  encoding = ' utf8 ' ) . strip ( ) )          probe  =  subprocess . check_output ( cmd ,  shell = True ,  encoding = ' utf8 ' )   
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					        frame_count  =  int ( probe . split ( ' \n ' ) [ 0 ] . strip ( ) )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					        self . assertTrue ( abs ( expected_frames  -  frame_count )  < =  frame_tolerance ,          self . assertTrue ( abs ( expected_frames  -  frame_count )  < =  frame_tolerance ,   
			
		
	
		
		
			
				
					
					                        f " { camera }  failed frame count check: expected  { expected_frames } , got  { frame_count } " )                          f " { camera }  failed frame count check: expected  { expected_frames } , got  { frame_count } " )   
			
		
	
		
		
			
				
					
					      shutil . rmtree ( f " { route_prefix_path } -- { i } " )        shutil . rmtree ( f " { route_prefix_path } -- { i } " )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    def  join ( ts ,  timeout ) :   
			
		
	
		
		
			
				
					
					      for  t  in  ts :   
			
		
	
		
		
			
				
					
					        t . join ( timeout )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    threads  =  [ ]   
			
		
	
		
		
			
				
					
					    for  i  in  trange ( num_segments ) :      for  i  in  trange ( num_segments ) :   
			
		
	
		
		
			
				
					
					      # poll for next segment        # poll for next segment   
			
		
	
		
		
			
				
					
					      with  Timeout ( int ( SEGMENT_LENGTH * 2 ) ,  error_msg = f " timed out waiting for segment  { i } " ) :        with  Timeout ( int ( SEGMENT_LENGTH * 2 ) ,  error_msg = f " timed out waiting for segment  { i } " ) :   
			
		
	
		
		
			
				
					
					        while  int ( self . _get_latest_segment_path ( ) . rsplit ( " -- " ,  1 ) [ 1 ] )  < =  i :          while  int ( self . _get_latest_segment_path ( ) . rsplit ( " -- " ,  1 ) [ 1 ] )  < =  i :   
			
		
	
		
		
			
				
					
					          time . sleep ( 0.1 )            time . sleep ( 0.1 )   
			
		
	
		
		
			
				
					
					      t  =  threading . Thread ( target = check_seg ,  args = ( i ,  ) )        check_seg ( i )   
			
				
				
			
		
	
		
		
			
				
					
					      t . start ( )   
			
		
	
		
		
			
				
					
					      threads . append ( t )   
			
		
	
		
		
			
				
					
					      join ( threads ,  0.1 )   
			
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					    with  Timeout ( 20 ) :   
			
		
	
		
		
			
				
					
					      join ( threads ,  None )   
			
		
	
		
		
	
		
		
			
				
					
					
 
			
		
	
		
		
			
				
					
					if  __name__  ==  " __main__ " : if  __name__  ==  " __main__ " :  
			
		
	
		
		
			
				
					
					  unittest . main ( )    unittest . main ( )