#!/usr/bin/env python3 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  os 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  time 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  threading 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								import  unittest 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  logging 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  json 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  selfdrive . swaglog  import  cloudlog 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								import  selfdrive . loggerd . uploader  as  uploader 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								from  selfdrive . loggerd . tests . loggerd_tests_common  import  UploaderTestCase 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestLogHandler ( logging . Handler ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  __init__ ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    logging . Handler . __init__ ( self ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . reset ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  reset ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . upload_order  =  list ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . upload_ignored  =  list ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  emit ( self ,  record ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    try : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      j  =  json . loads ( record . getMessage ( ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      if  j [ " event " ]  ==  " upload_success " : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . upload_order . append ( j [ " key " ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      if  j [ " event " ]  ==  " upload_ignored " : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        self . upload_ignored . append ( j [ " key " ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    except  Exception : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      pass 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								log_handler  =  TestLogHandler ( ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								cloudlog . addHandler ( log_handler ) 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class  TestUploader ( UploaderTestCase ) : 
 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  setUp ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    super ( ) . setUp ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    log_handler . reset ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  start_thread ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . end_event  =  threading . Event ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . up_thread  =  threading . Thread ( target = uploader . uploader_fn ,  args = [ self . end_event ] ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . up_thread . daemon  =  True 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . up_thread . start ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  join_thread ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . end_event . set ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . up_thread . join ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  gen_files ( self ,  lock = False ,  boot = True ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    f_paths  =  list ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    for  t  in  [ " qlog " ,  " rlog " ,  " dcamera.hevc " ,  " fcamera.hevc " ] : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      f_paths . append ( self . make_file_with_data ( self . seg_dir ,  t ,  1 ,  lock = lock ) ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  boot : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      f_paths . append ( self . make_file_with_data ( " boot " ,  f " { self . seg_dir } " ,  1 ,  lock = lock ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  f_paths 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  gen_order ( self ,  seg1 ,  seg2 ,  boot = True ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    keys  =  [ ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    if  boot : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      keys  + =  [ f " boot/ { self . seg_format . format ( i ) } .bz2 "  for  i  in  seg1 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      keys  + =  [ f " boot/ { self . seg_format2 . format ( i ) } .bz2 "  for  i  in  seg2 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    keys  + =  [ f " { self . seg_format . format ( i ) } /qlog.bz2 "  for  i  in  seg1 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    keys  + =  [ f " { self . seg_format2 . format ( i ) } /qlog.bz2 "  for  i  in  seg2 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    return  keys 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_upload ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . gen_files ( lock = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . start_thread ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # allow enough time that files could upload twice if there is a bug in the logic 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 5 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . join_thread ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    exp_order  =  self . gen_order ( [ self . seg_num ] ,  [ ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertTrue ( len ( log_handler . upload_ignored )  ==  0 ,  " Some files were ignored " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertFalse ( len ( log_handler . upload_order )  <  len ( exp_order ) ,  " Some files failed to upload " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertFalse ( len ( log_handler . upload_order )  >  len ( exp_order ) ,  " Some files were uploaded twice " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  f_path  in  exp_order : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . assertTrue ( os . getxattr ( os . path . join ( self . root ,  f_path . replace ( ' .bz2 ' ,  ' ' ) ) ,  uploader . UPLOAD_ATTR_NAME ) ,  " All files not uploaded " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertTrue ( log_handler . upload_order  ==  exp_order ,  " Files uploaded in wrong order " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_upload_ignored ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . set_ignore ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . gen_files ( lock = False ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . start_thread ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # allow enough time that files could upload twice if there is a bug in the logic 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 5 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . join_thread ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    exp_order  =  self . gen_order ( [ self . seg_num ] ,  [ ] ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertTrue ( len ( log_handler . upload_order )  ==  0 ,  " Some files were not ignored " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertFalse ( len ( log_handler . upload_ignored )  <  len ( exp_order ) ,  " Some files failed to ignore " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertFalse ( len ( log_handler . upload_ignored )  >  len ( exp_order ) ,  " Some files were ignored twice " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  f_path  in  exp_order : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . assertTrue ( os . getxattr ( os . path . join ( self . root ,  f_path . replace ( ' .bz2 ' ,  ' ' ) ) ,  uploader . UPLOAD_ATTR_NAME ) ,  " All files not ignored " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . assertTrue ( log_handler . upload_ignored  ==  exp_order ,  " Files ignored in wrong order " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_upload_files_in_create_order ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    seg1_nums  =  [ 0 ,  1 ,  2 ,  10 ,  20 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  i  in  seg1_nums : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . seg_dir  =  self . seg_format . format ( i ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . gen_files ( boot = False ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    seg2_nums  =  [ 5 ,  50 ,  51 ] 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  i  in  seg2_nums : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . seg_dir  =  self . seg_format2 . format ( i ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . gen_files ( boot = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    exp_order  =  self . gen_order ( seg1_nums ,  seg2_nums ,  boot = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . start_thread ( ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    # allow enough time that files could upload twice if there is a bug in the logic 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 5 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . join_thread ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . assertTrue ( len ( log_handler . upload_ignored )  ==  0 ,  " Some files were ignored " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . assertFalse ( len ( log_handler . upload_order )  <  len ( exp_order ) ,  " Some files failed to upload " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertFalse ( len ( log_handler . upload_order )  >  len ( exp_order ) ,  " Some files were uploaded twice " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  f_path  in  exp_order : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      self . assertTrue ( os . getxattr ( os . path . join ( self . root ,  f_path . replace ( ' .bz2 ' ,  ' ' ) ) ,  uploader . UPLOAD_ATTR_NAME ) ,  " All files not uploaded " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . assertTrue ( log_handler . upload_order  ==  exp_order ,  " Files uploaded in wrong order " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								  def  test_no_upload_with_lock_file ( self ) : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    self . start_thread ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 0.25 ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								    f_paths  =  self . gen_files ( lock = True ,  boot = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    # allow enough time that files should have been uploaded if they would be uploaded 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 5 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . join_thread ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  f_path  in  f_paths : 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								      uploaded  =  uploader . UPLOAD_ATTR_NAME  in  os . listxattr ( f_path . replace ( ' .bz2 ' ,  ' ' ) ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . assertFalse ( uploaded ,  " File upload when locked " ) 
  
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  def  test_clear_locks_on_startup ( self ) : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    f_paths  =  self . gen_files ( lock = True ,  boot = False ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . start_thread ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    time . sleep ( 1 ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    self . join_thread ( ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    for  f_path  in  f_paths : 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								      self . assertFalse ( os . path . isfile ( f_path  +  " .lock " ) ,  " File lock not cleared on startup " ) 
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								if  __name__  ==  " __main__ " : 
 
						 
					
						
							
								
							 
							
								
									
										 
								
							 
							
								 
							
							
								  unittest . main ( )