You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							117 lines
						
					
					
						
							2.9 KiB
						
					
					
				
			
		
		
	
	
							117 lines
						
					
					
						
							2.9 KiB
						
					
					
				| import os
 | |
| import errno
 | |
| import shutil
 | |
| import random
 | |
| import tempfile
 | |
| import unittest
 | |
| from pathlib import Path
 | |
| from typing import Optional
 | |
| 
 | |
| import system.loggerd.deleter as deleter
 | |
| import system.loggerd.uploader as uploader
 | |
| from system.loggerd.xattr_cache import setxattr
 | |
| 
 | |
| 
 | |
| def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: Optional[bytes] = None) -> None:
 | |
|   file_path.parent.mkdir(parents=True, exist_ok=True)
 | |
| 
 | |
|   if lock:
 | |
|     lock_path = str(file_path) + ".lock"
 | |
|     os.close(os.open(lock_path, os.O_CREAT | os.O_EXCL))
 | |
| 
 | |
|   chunks = 128
 | |
|   chunk_bytes = int(size_mb * 1024 * 1024 / chunks)
 | |
|   data = os.urandom(chunk_bytes)
 | |
| 
 | |
|   with open(file_path, "wb") as f:
 | |
|     for _ in range(chunks):
 | |
|       f.write(data)
 | |
| 
 | |
|   if upload_xattr is not None:
 | |
|     setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, upload_xattr)
 | |
| 
 | |
| class MockResponse():
 | |
|   def __init__(self, text, status_code):
 | |
|     self.text = text
 | |
|     self.status_code = status_code
 | |
| 
 | |
| class MockApi():
 | |
|   def __init__(self, dongle_id):
 | |
|     pass
 | |
| 
 | |
|   def get(self, *args, **kwargs):
 | |
|     return MockResponse('{"url": "http://localhost/does/not/exist", "headers": {}}', 200)
 | |
| 
 | |
|   def get_token(self):
 | |
|     return "fake-token"
 | |
| 
 | |
| class MockApiIgnore():
 | |
|   def __init__(self, dongle_id):
 | |
|     pass
 | |
| 
 | |
|   def get(self, *args, **kwargs):
 | |
|     return MockResponse('', 412)
 | |
| 
 | |
|   def get_token(self):
 | |
|     return "fake-token"
 | |
| 
 | |
| class MockParams():
 | |
|   def __init__(self):
 | |
|     self.params = {
 | |
|       "DongleId": b"0000000000000000",
 | |
|       "IsOffroad": b"1",
 | |
|     }
 | |
| 
 | |
|   def get(self, k, block=False, encoding=None):
 | |
|     val = self.params[k]
 | |
| 
 | |
|     if encoding is not None:
 | |
|       return val.decode(encoding)
 | |
|     else:
 | |
|       return val
 | |
| 
 | |
|   def get_bool(self, k):
 | |
|     val = self.params[k]
 | |
|     return (val == b'1')
 | |
| 
 | |
| class UploaderTestCase(unittest.TestCase):
 | |
|   f_type = "UNKNOWN"
 | |
| 
 | |
|   root: Path
 | |
|   seg_num: int
 | |
|   seg_format: str
 | |
|   seg_format2: str
 | |
|   seg_dir: str
 | |
| 
 | |
|   def set_ignore(self):
 | |
|     uploader.Api = MockApiIgnore
 | |
| 
 | |
|   def setUp(self):
 | |
|     self.root = Path(tempfile.mkdtemp())
 | |
|     uploader.ROOT = str(self.root)  # Monkey patch root dir
 | |
|     uploader.Api = MockApi
 | |
|     uploader.Params = MockParams
 | |
|     uploader.fake_upload = True
 | |
|     uploader.force_wifi = True
 | |
|     uploader.allow_sleep = False
 | |
|     self.seg_num = random.randint(1, 300)
 | |
|     self.seg_format = "2019-04-18--12-52-54--{}"
 | |
|     self.seg_format2 = "2019-05-18--11-22-33--{}"
 | |
|     self.seg_dir = self.seg_format.format(self.seg_num)
 | |
| 
 | |
|   def tearDown(self):
 | |
|     try:
 | |
|       shutil.rmtree(self.root)
 | |
|     except OSError as e:
 | |
|       if e.errno != errno.ENOENT:
 | |
|         raise
 | |
| 
 | |
|   def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False,
 | |
|                           upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path:
 | |
|     file_path = self.root / f_dir / fn
 | |
|     create_random_file(file_path, size_mb, lock, upload_xattr)
 | |
| 
 | |
|     if preserve_xattr is not None:
 | |
|       setxattr(str(file_path.parent), deleter.PRESERVE_ATTR_NAME, preserve_xattr)
 | |
| 
 | |
|     return file_path
 | |
| 
 |