You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
118 lines
3.5 KiB
118 lines
3.5 KiB
import io
|
|
import os
|
|
import tempfile
|
|
import contextlib
|
|
import subprocess
|
|
import time
|
|
import functools
|
|
from subprocess import Popen, PIPE, TimeoutExpired
|
|
import zstandard as zstd
|
|
from openpilot.common.swaglog import cloudlog
|
|
|
|
LOG_COMPRESSION_LEVEL = 10 # little benefit up to level 15. level ~17 is a small step change
|
|
|
|
|
|
class CallbackReader:
|
|
"""Wraps a file, but overrides the read method to also
|
|
call a callback function with the number of bytes read so far."""
|
|
def __init__(self, f, callback, *args):
|
|
self.f = f
|
|
self.callback = callback
|
|
self.cb_args = args
|
|
self.total_read = 0
|
|
|
|
def __getattr__(self, attr):
|
|
return getattr(self.f, attr)
|
|
|
|
def read(self, *args, **kwargs):
|
|
chunk = self.f.read(*args, **kwargs)
|
|
self.total_read += len(chunk)
|
|
self.callback(*self.cb_args, self.total_read)
|
|
return chunk
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def atomic_write_in_dir(path: str, mode: str = 'w', buffering: int = -1, encoding: str | None = None, newline: str | None = None,
|
|
overwrite: bool = False):
|
|
"""Write to a file atomically using a temporary file in the same directory as the destination file."""
|
|
dir_name = os.path.dirname(path)
|
|
|
|
if not overwrite and os.path.exists(path):
|
|
raise FileExistsError(f"File '{path}' already exists. To overwrite it, set 'overwrite' to True.")
|
|
|
|
with tempfile.NamedTemporaryFile(mode=mode, buffering=buffering, encoding=encoding, newline=newline, dir=dir_name, delete=False) as tmp_file:
|
|
yield tmp_file
|
|
tmp_file_name = tmp_file.name
|
|
os.replace(tmp_file_name, path)
|
|
|
|
|
|
def get_upload_stream(filepath: str, should_compress: bool) -> tuple[io.BufferedIOBase, int]:
|
|
if not should_compress:
|
|
file_size = os.path.getsize(filepath)
|
|
file_stream = open(filepath, "rb")
|
|
return file_stream, file_size
|
|
|
|
# Compress the file on the fly
|
|
compressed_stream = io.BytesIO()
|
|
compressor = zstd.ZstdCompressor(level=LOG_COMPRESSION_LEVEL)
|
|
|
|
with open(filepath, "rb") as f:
|
|
compressor.copy_stream(f, compressed_stream)
|
|
compressed_size = compressed_stream.tell()
|
|
compressed_stream.seek(0)
|
|
return compressed_stream, compressed_size
|
|
|
|
|
|
# remove all keys that end in DEPRECATED
|
|
def strip_deprecated_keys(d):
|
|
for k in list(d.keys()):
|
|
if isinstance(k, str):
|
|
if k.endswith('DEPRECATED'):
|
|
d.pop(k)
|
|
elif isinstance(d[k], dict):
|
|
strip_deprecated_keys(d[k])
|
|
return d
|
|
|
|
|
|
def run_cmd(cmd: list[str], cwd=None, env=None) -> str:
|
|
return subprocess.check_output(cmd, encoding='utf8', cwd=cwd, env=env).strip()
|
|
|
|
|
|
def run_cmd_default(cmd: list[str], default: str = "", cwd=None, env=None) -> str:
|
|
try:
|
|
return run_cmd(cmd, cwd=cwd, env=env)
|
|
except subprocess.CalledProcessError:
|
|
return default
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def managed_proc(cmd: list[str], env: dict[str, str]):
|
|
proc = Popen(cmd, env=env, stdout=PIPE, stderr=PIPE)
|
|
try:
|
|
yield proc
|
|
finally:
|
|
if proc.poll() is None:
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except TimeoutExpired:
|
|
proc.kill()
|
|
|
|
|
|
def retry(attempts=3, delay=1.0, ignore_failure=False):
|
|
def decorator(func):
|
|
@functools.wraps(func)
|
|
def wrapper(*args, **kwargs):
|
|
for _ in range(attempts):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except Exception:
|
|
cloudlog.exception(f"{func.__name__} failed, trying again")
|
|
time.sleep(delay)
|
|
|
|
if ignore_failure:
|
|
cloudlog.error(f"{func.__name__} failed after retry")
|
|
else:
|
|
raise Exception(f"{func.__name__} failed after retry")
|
|
return wrapper
|
|
return decorator
|
|
|