From 67ac96c8b42c25c9d07788a9d0fa1d4378cb1a54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kacper=20R=C4=85czy?= Date: Tue, 1 Aug 2023 01:30:58 +0200 Subject: [PATCH] process_replay: capture process output (#29027) * Add ProcessOutputProxy * Move launcher to its own field * Move ProcessOutputCapture to its own file * Return itself from __enter__ of OpenpilotPrefix * Integrate ProcessOutputCapture into process_replay * Add note about capture_output_store to README * ipykernel import is optional * Disable type checking for link_with_current_proc * Remove assertion * Decode outputs to utf-8 * read(self): return empty buf if its none * Fix type annotations * Replace fifo with regular file, to avoid hitting fifo size limit old-commit-hash: 547a033a3c32f05f2dd678280cee4d7378b60b83 --- selfdrive/manager/process.py | 6 +- selfdrive/test/process_replay/README.md | 12 ++++ selfdrive/test/process_replay/capture.py | 59 +++++++++++++++++++ selfdrive/test/process_replay/helpers.py | 2 + .../test/process_replay/process_replay.py | 48 +++++++++++---- 5 files changed, 114 insertions(+), 13 deletions(-) create mode 100644 selfdrive/test/process_replay/capture.py diff --git a/selfdrive/manager/process.py b/selfdrive/manager/process.py index 7d0993188c..b5f1e7a71a 100644 --- a/selfdrive/manager/process.py +++ b/selfdrive/manager/process.py @@ -196,6 +196,7 @@ class NativeProcess(ManagerProcess): self.unkillable = unkillable self.sigkill = sigkill self.watchdog_max_dt = watchdog_max_dt + self.launcher = nativelauncher def prepare(self) -> None: pass @@ -210,7 +211,7 @@ class NativeProcess(ManagerProcess): cwd = os.path.join(BASEDIR, self.cwd) cloudlog.info(f"starting process {self.name}") - self.proc = Process(name=self.name, target=nativelauncher, args=(self.cmdline, cwd, self.name)) + self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name)) self.proc.start() self.watchdog_seen = False self.shutting_down = False @@ -227,6 +228,7 @@ class PythonProcess(ManagerProcess): self.unkillable = unkillable self.sigkill = sigkill self.watchdog_max_dt = watchdog_max_dt + self.launcher = launcher def prepare(self) -> None: if self.enabled: @@ -242,7 +244,7 @@ class PythonProcess(ManagerProcess): return cloudlog.info(f"starting python {self.module}") - self.proc = Process(name=self.name, target=launcher, args=(self.module, self.name)) + self.proc = Process(name=self.name, target=self.launcher, args=(self.module, self.name)) self.proc.start() self.watchdog_seen = False self.shutting_down = False diff --git a/selfdrive/test/process_replay/README.md b/selfdrive/test/process_replay/README.md index 571157dfc9..d1743ac539 100644 --- a/selfdrive/test/process_replay/README.md +++ b/selfdrive/test/process_replay/README.md @@ -113,4 +113,16 @@ frs = { } output_logs = replay_process_with_name(['modeld', 'dmonitoringmodeld'], lr, frs=frs) +``` + +To capture stdout/stderr of the replayed process, `captured_output_store` can be provided. + +```py +output_store = dict() +# pass dictionary by reference, it will be filled with standard outputs - even if process replay fails +output_logs = replay_process_with_name(['radard', 'plannerd'], lr, captured_output_store=output_store) + +# entries with captured output in format { 'out': '...', 'err': '...' } will be added to provided dictionary for each replayed process +print(output_store['radard']['out']) # radard stdout +print(output_store['radard']['err']) # radard stderr ``` \ No newline at end of file diff --git a/selfdrive/test/process_replay/capture.py b/selfdrive/test/process_replay/capture.py new file mode 100644 index 0000000000..4c4828200a --- /dev/null +++ b/selfdrive/test/process_replay/capture.py @@ -0,0 +1,59 @@ +import os +import sys + +from typing import Tuple, no_type_check + +class FdRedirect: + def __init__(self, file_prefix: str, fd: int): + fname = os.path.join("/tmp", f"{file_prefix}.{fd}") + if os.path.exists(fname): + os.unlink(fname) + self.dest_fd = os.open(fname, os.O_WRONLY | os.O_CREAT) + self.dest_fname = fname + self.source_fd = fd + os.set_inheritable(self.dest_fd, True) + + def __del__(self): + os.close(self.dest_fd) + + def purge(self) -> None: + os.unlink(self.dest_fname) + + def read(self) -> bytes: + with open(self.dest_fname, "rb") as f: + return f.read() or b"" + + def link(self) -> None: + os.dup2(self.dest_fd, self.source_fd) + + +class ProcessOutputCapture: + def __init__(self, proc_name: str, prefix: str): + prefix = f"{proc_name}_{prefix}" + self.stdout_redirect = FdRedirect(prefix, 1) + self.stderr_redirect = FdRedirect(prefix, 2) + + def __del__(self): + self.stdout_redirect.purge() + self.stderr_redirect.purge() + + @no_type_check # ipython classes have incompatible signatures + def link_with_current_proc(self) -> None: + try: + # prevent ipykernel from redirecting stdout/stderr of python subprocesses + from ipykernel.iostream import OutStream + if isinstance(sys.stdout, OutStream): + sys.stdout = sys.__stdout__ + if isinstance(sys.stderr, OutStream): + sys.stderr = sys.__stderr__ + except ImportError: + pass + + # link stdout/stderr to the fifo + self.stdout_redirect.link() + self.stderr_redirect.link() + + def read_outerr(self) -> Tuple[str, str]: + out_str = self.stdout_redirect.read().decode() + err_str = self.stderr_redirect.read().decode() + return out_str, err_str diff --git a/selfdrive/test/process_replay/helpers.py b/selfdrive/test/process_replay/helpers.py index 42c3aeb02a..5cf1acfa59 100644 --- a/selfdrive/test/process_replay/helpers.py +++ b/selfdrive/test/process_replay/helpers.py @@ -19,6 +19,8 @@ class OpenpilotPrefix(object): except FileExistsError: pass + return self + def __exit__(self, exc_type, exc_obj, exc_tb): if self.clean_dirs_on_exit: self.clean_dirs() diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 72e519d057..ddcc0b2fe8 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -25,6 +25,7 @@ from selfdrive.manager.process_config import managed_processes from selfdrive.test.process_replay.helpers import OpenpilotPrefix, DummySocket from selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams from selfdrive.test.process_replay.migration import migrate_all +from selfdrive.test.process_replay.capture import ProcessOutputCapture from tools.lib.logreader import LogReader # Numpy gives different results based on CPU features after version 19 @@ -33,6 +34,16 @@ PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__)) FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/") +class LauncherWithCapture: + def __init__(self, capture: ProcessOutputCapture, launcher: Callable): + self.capture = capture + self.launcher = launcher + + def __call__(self, *args, **kwargs): + self.capture.link_with_current_proc() + self.launcher(*args, **kwargs) + + class ReplayContext: def __init__(self, cfg): self.proc_name = cfg.proc_name @@ -126,13 +137,14 @@ class ProcessContainer: def __init__(self, cfg: ProcessConfig): self.prefix = OpenpilotPrefix(clean_dirs_on_exit=False) self.cfg = copy.deepcopy(cfg) - self.process = managed_processes[cfg.proc_name] + self.process = copy.deepcopy(managed_processes[cfg.proc_name]) self.msg_queue: List[capnp._DynamicStructReader] = [] self.cnt = 0 self.pm: Optional[messaging.PubMaster] = None self.sockets: Optional[List[messaging.SubSocket]] = None self.rc: Optional[ReplayContext] = None self.vipc_server: Optional[VisionIpcServer] = None + self.capture: Optional[ProcessOutputCapture] = None @property def has_empty_queue(self) -> bool: @@ -180,11 +192,18 @@ class ProcessContainer: self.vipc_server = vipc_server + def _start_process(self): + if self.capture is not None: + self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher) + self.process.prepare() + self.process.start() + def start( - self, params_config: Dict[str, Any], environ_config: Dict[str, Any], - all_msgs: Union[LogReader, List[capnp._DynamicStructReader]], fingerprint: Optional[str] + self, params_config: Dict[str, Any], environ_config: Dict[str, Any], + all_msgs: Union[LogReader, List[capnp._DynamicStructReader]], + fingerprint: Optional[str], capture_output: bool ): - with self.prefix: + with self.prefix as p: self._setup_env(params_config, environ_config) if self.cfg.config_callback is not None: @@ -201,8 +220,10 @@ class ProcessContainer: self._setup_vision_ipc(all_msgs) assert self.vipc_server is not None - self.process.prepare() - self.process.start() + if capture_output: + self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix) + + self._start_process() if self.cfg.init_callback is not None: self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint) @@ -598,7 +619,8 @@ def replay_process_with_name(name: Union[str, Iterable[str]], lr: Union[LogReade def replay_process( cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None, - fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, disable_progress: bool = False + fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, + captured_output_store: Optional[Dict[str, Dict[str, str]]] = None, disable_progress: bool = False ) -> List[capnp._DynamicStructReader]: if isinstance(cfg, Iterable): cfgs = list(cfg) @@ -606,7 +628,7 @@ def replay_process( cfgs = [cfg] all_msgs = migrate_all(lr, old_logtime=True, camera_states=any(len(cfg.vision_pubs) != 0 for cfg in cfgs)) - process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, disable_progress) + process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, captured_output_store, disable_progress) if return_all_logs: keys = {m.which() for m in process_logs} @@ -621,8 +643,8 @@ def replay_process( def _replay_multi_process( - cfgs: List[ProcessConfig], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]], - fingerprint: Optional[str], custom_params: Optional[Dict[str, Any]], disable_progress: bool + cfgs: List[ProcessConfig], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]], fingerprint: Optional[str], + custom_params: Optional[Dict[str, Any]], captured_output_store: Optional[Dict[str, Dict[str, str]]], disable_progress: bool ) -> List[capnp._DynamicStructReader]: if fingerprint is not None: params_config = generate_params_config(lr=lr, fingerprint=fingerprint, custom_params=custom_params) @@ -647,7 +669,7 @@ def _replay_multi_process( containers = [] for cfg in cfgs: container = ProcessContainer(cfg) - container.start(params_config, env_config, all_msgs, fingerprint) + container.start(params_config, env_config, all_msgs, fingerprint, captured_output_store is not None) containers.append(container) all_pubs = set([pub for container in containers for pub in container.pubs]) @@ -682,6 +704,10 @@ def _replay_multi_process( finally: for container in containers: container.stop() + if captured_output_store is not None: + assert container.capture is not None + out, err = container.capture.read_outerr() + captured_output_store[container.cfg.proc_name] = {"out": out, "err": err} return log_msgs