import os import shutil import uuid from typing import List, Optional from common.params import Params class OpenpilotPrefix(object): def __init__(self, prefix: Optional[str] = None, clean_dirs_on_exit: bool = True): self.prefix = prefix if prefix else str(uuid.uuid4()) self.msgq_path = os.path.join('/dev/shm', self.prefix) self.clean_dirs_on_exit = clean_dirs_on_exit def __enter__(self): os.environ['OPENPILOT_PREFIX'] = self.prefix try: os.mkdir(self.msgq_path) except FileExistsError: pass def __exit__(self, exc_type, exc_obj, exc_tb): if self.clean_dirs_on_exit: self.clean_dirs() del os.environ['OPENPILOT_PREFIX'] return False def clean_dirs(self): symlink_path = Params().get_param_path() if os.path.exists(symlink_path): shutil.rmtree(os.path.realpath(symlink_path), ignore_errors=True) os.remove(symlink_path) shutil.rmtree(self.msgq_path, ignore_errors=True) class DummySocket: def __init__(self): self.data: List[bytes] = [] def receive(self, non_blocking: bool = False) -> Optional[bytes]: if non_blocking: return None return self.data.pop() def send(self, data: bytes): self.data.append(data)