process_replay: capture process output (#29027)

* Add ProcessOutputProxy

* Move launcher to its own field

* Move ProcessOutputCapture to its own file

* Return itself from __enter__ of OpenpilotPrefix

* Integrate ProcessOutputCapture into process_replay

* Add note about capture_output_store to README

* ipykernel import is optional

* Disable type checking for link_with_current_proc

* Remove assertion

* Decode outputs to utf-8

* read(self): return empty buf if its none

* Fix type annotations

* Replace fifo with regular file, to avoid hitting fifo size limit
old-commit-hash: 547a033a3c
beeps
Kacper Rączy 2 years ago committed by GitHub
parent cb45be4b6f
commit 67ac96c8b4
  1. 6
      selfdrive/manager/process.py
  2. 12
      selfdrive/test/process_replay/README.md
  3. 59
      selfdrive/test/process_replay/capture.py
  4. 2
      selfdrive/test/process_replay/helpers.py
  5. 46
      selfdrive/test/process_replay/process_replay.py

@ -196,6 +196,7 @@ class NativeProcess(ManagerProcess):
self.unkillable = unkillable
self.sigkill = sigkill
self.watchdog_max_dt = watchdog_max_dt
self.launcher = nativelauncher
def prepare(self) -> None:
pass
@ -210,7 +211,7 @@ class NativeProcess(ManagerProcess):
cwd = os.path.join(BASEDIR, self.cwd)
cloudlog.info(f"starting process {self.name}")
self.proc = Process(name=self.name, target=nativelauncher, args=(self.cmdline, cwd, self.name))
self.proc = Process(name=self.name, target=self.launcher, args=(self.cmdline, cwd, self.name))
self.proc.start()
self.watchdog_seen = False
self.shutting_down = False
@ -227,6 +228,7 @@ class PythonProcess(ManagerProcess):
self.unkillable = unkillable
self.sigkill = sigkill
self.watchdog_max_dt = watchdog_max_dt
self.launcher = launcher
def prepare(self) -> None:
if self.enabled:
@ -242,7 +244,7 @@ class PythonProcess(ManagerProcess):
return
cloudlog.info(f"starting python {self.module}")
self.proc = Process(name=self.name, target=launcher, args=(self.module, self.name))
self.proc = Process(name=self.name, target=self.launcher, args=(self.module, self.name))
self.proc.start()
self.watchdog_seen = False
self.shutting_down = False

@ -114,3 +114,15 @@ frs = {
output_logs = replay_process_with_name(['modeld', 'dmonitoringmodeld'], lr, frs=frs)
```
To capture stdout/stderr of the replayed process, `captured_output_store` can be provided.
```py
output_store = dict()
# pass dictionary by reference, it will be filled with standard outputs - even if process replay fails
output_logs = replay_process_with_name(['radard', 'plannerd'], lr, captured_output_store=output_store)
# entries with captured output in format { 'out': '...', 'err': '...' } will be added to provided dictionary for each replayed process
print(output_store['radard']['out']) # radard stdout
print(output_store['radard']['err']) # radard stderr
```

@ -0,0 +1,59 @@
import os
import sys
from typing import Tuple, no_type_check
class FdRedirect:
def __init__(self, file_prefix: str, fd: int):
fname = os.path.join("/tmp", f"{file_prefix}.{fd}")
if os.path.exists(fname):
os.unlink(fname)
self.dest_fd = os.open(fname, os.O_WRONLY | os.O_CREAT)
self.dest_fname = fname
self.source_fd = fd
os.set_inheritable(self.dest_fd, True)
def __del__(self):
os.close(self.dest_fd)
def purge(self) -> None:
os.unlink(self.dest_fname)
def read(self) -> bytes:
with open(self.dest_fname, "rb") as f:
return f.read() or b""
def link(self) -> None:
os.dup2(self.dest_fd, self.source_fd)
class ProcessOutputCapture:
def __init__(self, proc_name: str, prefix: str):
prefix = f"{proc_name}_{prefix}"
self.stdout_redirect = FdRedirect(prefix, 1)
self.stderr_redirect = FdRedirect(prefix, 2)
def __del__(self):
self.stdout_redirect.purge()
self.stderr_redirect.purge()
@no_type_check # ipython classes have incompatible signatures
def link_with_current_proc(self) -> None:
try:
# prevent ipykernel from redirecting stdout/stderr of python subprocesses
from ipykernel.iostream import OutStream
if isinstance(sys.stdout, OutStream):
sys.stdout = sys.__stdout__
if isinstance(sys.stderr, OutStream):
sys.stderr = sys.__stderr__
except ImportError:
pass
# link stdout/stderr to the fifo
self.stdout_redirect.link()
self.stderr_redirect.link()
def read_outerr(self) -> Tuple[str, str]:
out_str = self.stdout_redirect.read().decode()
err_str = self.stderr_redirect.read().decode()
return out_str, err_str

@ -19,6 +19,8 @@ class OpenpilotPrefix(object):
except FileExistsError:
pass
return self
def __exit__(self, exc_type, exc_obj, exc_tb):
if self.clean_dirs_on_exit:
self.clean_dirs()

@ -25,6 +25,7 @@ from selfdrive.manager.process_config import managed_processes
from selfdrive.test.process_replay.helpers import OpenpilotPrefix, DummySocket
from selfdrive.test.process_replay.vision_meta import meta_from_camera_state, available_streams
from selfdrive.test.process_replay.migration import migrate_all
from selfdrive.test.process_replay.capture import ProcessOutputCapture
from tools.lib.logreader import LogReader
# Numpy gives different results based on CPU features after version 19
@ -33,6 +34,16 @@ PROC_REPLAY_DIR = os.path.dirname(os.path.abspath(__file__))
FAKEDATA = os.path.join(PROC_REPLAY_DIR, "fakedata/")
class LauncherWithCapture:
def __init__(self, capture: ProcessOutputCapture, launcher: Callable):
self.capture = capture
self.launcher = launcher
def __call__(self, *args, **kwargs):
self.capture.link_with_current_proc()
self.launcher(*args, **kwargs)
class ReplayContext:
def __init__(self, cfg):
self.proc_name = cfg.proc_name
@ -126,13 +137,14 @@ class ProcessContainer:
def __init__(self, cfg: ProcessConfig):
self.prefix = OpenpilotPrefix(clean_dirs_on_exit=False)
self.cfg = copy.deepcopy(cfg)
self.process = managed_processes[cfg.proc_name]
self.process = copy.deepcopy(managed_processes[cfg.proc_name])
self.msg_queue: List[capnp._DynamicStructReader] = []
self.cnt = 0
self.pm: Optional[messaging.PubMaster] = None
self.sockets: Optional[List[messaging.SubSocket]] = None
self.rc: Optional[ReplayContext] = None
self.vipc_server: Optional[VisionIpcServer] = None
self.capture: Optional[ProcessOutputCapture] = None
@property
def has_empty_queue(self) -> bool:
@ -180,11 +192,18 @@ class ProcessContainer:
self.vipc_server = vipc_server
def _start_process(self):
if self.capture is not None:
self.process.launcher = LauncherWithCapture(self.capture, self.process.launcher)
self.process.prepare()
self.process.start()
def start(
self, params_config: Dict[str, Any], environ_config: Dict[str, Any],
all_msgs: Union[LogReader, List[capnp._DynamicStructReader]], fingerprint: Optional[str]
all_msgs: Union[LogReader, List[capnp._DynamicStructReader]],
fingerprint: Optional[str], capture_output: bool
):
with self.prefix:
with self.prefix as p:
self._setup_env(params_config, environ_config)
if self.cfg.config_callback is not None:
@ -201,8 +220,10 @@ class ProcessContainer:
self._setup_vision_ipc(all_msgs)
assert self.vipc_server is not None
self.process.prepare()
self.process.start()
if capture_output:
self.capture = ProcessOutputCapture(self.cfg.proc_name, p.prefix)
self._start_process()
if self.cfg.init_callback is not None:
self.cfg.init_callback(self.rc, self.pm, all_msgs, fingerprint)
@ -598,7 +619,8 @@ def replay_process_with_name(name: Union[str, Iterable[str]], lr: Union[LogReade
def replay_process(
cfg: Union[ProcessConfig, Iterable[ProcessConfig]], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]] = None,
fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None, disable_progress: bool = False
fingerprint: Optional[str] = None, return_all_logs: bool = False, custom_params: Optional[Dict[str, Any]] = None,
captured_output_store: Optional[Dict[str, Dict[str, str]]] = None, disable_progress: bool = False
) -> List[capnp._DynamicStructReader]:
if isinstance(cfg, Iterable):
cfgs = list(cfg)
@ -606,7 +628,7 @@ def replay_process(
cfgs = [cfg]
all_msgs = migrate_all(lr, old_logtime=True, camera_states=any(len(cfg.vision_pubs) != 0 for cfg in cfgs))
process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, disable_progress)
process_logs = _replay_multi_process(cfgs, all_msgs, frs, fingerprint, custom_params, captured_output_store, disable_progress)
if return_all_logs:
keys = {m.which() for m in process_logs}
@ -621,8 +643,8 @@ def replay_process(
def _replay_multi_process(
cfgs: List[ProcessConfig], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]],
fingerprint: Optional[str], custom_params: Optional[Dict[str, Any]], disable_progress: bool
cfgs: List[ProcessConfig], lr: Union[LogReader, List[capnp._DynamicStructReader]], frs: Optional[Dict[str, Any]], fingerprint: Optional[str],
custom_params: Optional[Dict[str, Any]], captured_output_store: Optional[Dict[str, Dict[str, str]]], disable_progress: bool
) -> List[capnp._DynamicStructReader]:
if fingerprint is not None:
params_config = generate_params_config(lr=lr, fingerprint=fingerprint, custom_params=custom_params)
@ -647,7 +669,7 @@ def _replay_multi_process(
containers = []
for cfg in cfgs:
container = ProcessContainer(cfg)
container.start(params_config, env_config, all_msgs, fingerprint)
container.start(params_config, env_config, all_msgs, fingerprint, captured_output_store is not None)
containers.append(container)
all_pubs = set([pub for container in containers for pub in container.pubs])
@ -682,6 +704,10 @@ def _replay_multi_process(
finally:
for container in containers:
container.stop()
if captured_output_store is not None:
assert container.capture is not None
out, err = container.capture.read_outerr()
captured_output_store[container.cfg.proc_name] = {"out": out, "err": err}
return log_msgs

Loading…
Cancel
Save