import os import random import unittest from pathlib import Path import openpilot.system.loggerd.deleter as deleter import openpilot.system.loggerd.uploader as uploader from openpilot.common.params import Params from openpilot.system.hardware.hw import Paths from openpilot.system.loggerd.xattr_cache import setxattr def create_random_file(file_path: Path, size_mb: float, lock: bool = False, upload_xattr: bytes | None = None) -> None: file_path.parent.mkdir(parents=True, exist_ok=True) if lock: lock_path = str(file_path) + ".lock" os.close(os.open(lock_path, os.O_CREAT | os.O_EXCL)) chunks = 128 chunk_bytes = int(size_mb * 1024 * 1024 / chunks) data = os.urandom(chunk_bytes) with open(file_path, "wb") as f: for _ in range(chunks): f.write(data) if upload_xattr is not None: setxattr(str(file_path), uploader.UPLOAD_ATTR_NAME, upload_xattr) class MockResponse(): def __init__(self, text, status_code): self.text = text self.status_code = status_code class MockApi(): def __init__(self, dongle_id): pass def get(self, *args, **kwargs): return MockResponse('{"url": "http://localhost/does/not/exist", "headers": {}}', 200) def get_token(self): return "fake-token" class MockApiIgnore(): def __init__(self, dongle_id): pass def get(self, *args, **kwargs): return MockResponse('', 412) def get_token(self): return "fake-token" class UploaderTestCase(unittest.TestCase): f_type = "UNKNOWN" root: Path seg_num: int seg_format: str seg_format2: str seg_dir: str def set_ignore(self): uploader.Api = MockApiIgnore def setUp(self): uploader.Api = MockApi uploader.fake_upload = True uploader.force_wifi = True uploader.allow_sleep = False self.seg_num = random.randint(1, 300) self.seg_format = "2019-04-18--12-52-54--{}" self.seg_format2 = "2019-05-18--11-22-33--{}" self.seg_dir = self.seg_format.format(self.seg_num) self.params = Params() self.params.put("IsOffroad", "1") self.params.put("DongleId", "0000000000000000") def make_file_with_data(self, f_dir: str, fn: str, size_mb: float = .1, lock: bool = False, upload_xattr: bytes | None = None, preserve_xattr: bytes | None = None) -> Path: file_path = Path(Paths.log_root()) / f_dir / fn create_random_file(file_path, size_mb, lock, upload_xattr) if preserve_xattr is not None: setxattr(str(file_path.parent), deleter.PRESERVE_ATTR_NAME, preserve_xattr) return file_path