import os import random import unittest from pathlib import Path from typing import Optional import openpilot.system.loggerd.deleter as deleter import openpilot.system.loggerd.uploader as uploader from openpilot.common.params import Params from openpilot.system.hardware.hw import Paths from openpilot.system.loggerd.xattr_cache import setxattr def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: Optional[bytes] = None) -> None: file_path.parent.mkdir(parents=True, exist_ok=True) if lock: lock_path = str(file_path) + ".lock" os.close(os.open(lock_path, os.O_CREAT | os.O_EXCL)) chunks = 128 chunk_bytes = int(size_mb * 1024 * 1024 / chunks) data = os.urandom(chunk_bytes) with open(file_path, "wb") as f: for _ in range(chunks): f.write(data) if upload_xattr is not None: setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, upload_xattr) class MockResponse(): def __init__(self, text, status_code): self.text = text self.status_code = status_code class MockApi(): def __init__(self, dongle_id): pass def get(self, *args, **kwargs): return MockResponse('{"url": "http://localhost/does/not/exist", "headers": {}}', 200) def get_token(self): return "fake-token" class MockApiIgnore(): def __init__(self, dongle_id): pass def get(self, *args, **kwargs): return MockResponse('', 412) def get_token(self): return "fake-token" class UploaderTestCase(unittest.TestCase): f_type = "UNKNOWN" root: Path seg_num: int seg_format: str seg_format2: str seg_dir: str def set_ignore(self): uploader.Api = MockApiIgnore def setUp(self): uploader.Api = MockApi uploader.fake_upload = True uploader.force_wifi = True uploader.allow_sleep = False self.seg_num = random.randint(1, 300) self.seg_format = "2019-04-18--12-52-54--{}" self.seg_format2 = "2019-05-18--11-22-33--{}" self.seg_dir = self.seg_format.format(self.seg_num) self.params = Params() self.params.put("IsOffroad", "1") self.params.put("DongleId", "0000000000000000") def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False, upload_xattr: Optional[bytes] = None, preserve_xattr: Optional[bytes] = None) -> Path: file_path = Path(Paths.log_root()) / f_dir / fn create_random_file(file_path, size_mb, lock, upload_xattr) if preserve_xattr is not None: setxattr(str(file_path.parent), deleter.PRESERVE_ATTR_NAME, preserve_xattr) return file_path