openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

59 lines
1.7 KiB

import os
import sys
from typing import Tuple, no_type_check
class FdRedirect:
def __init__(self, file_prefix: str, fd: int):
fname = os.path.join("/tmp", f"{file_prefix}.{fd}")
if os.path.exists(fname):
os.unlink(fname)
self.dest_fd = os.open(fname, os.O_WRONLY | os.O_CREAT)
self.dest_fname = fname
self.source_fd = fd
os.set_inheritable(self.dest_fd, True)
def __del__(self):
os.close(self.dest_fd)
def purge(self) -> None:
os.unlink(self.dest_fname)
def read(self) -> bytes:
with open(self.dest_fname, "rb") as f:
return f.read() or b""
def link(self) -> None:
os.dup2(self.dest_fd, self.source_fd)
class ProcessOutputCapture:
def __init__(self, proc_name: str, prefix: str):
prefix = f"{proc_name}_{prefix}"
self.stdout_redirect = FdRedirect(prefix, 1)
self.stderr_redirect = FdRedirect(prefix, 2)
def __del__(self):
self.stdout_redirect.purge()
self.stderr_redirect.purge()
@no_type_check # ipython classes have incompatible signatures
def link_with_current_proc(self) -> None:
try:
# prevent ipykernel from redirecting stdout/stderr of python subprocesses
from ipykernel.iostream import OutStream
if isinstance(sys.stdout, OutStream):
sys.stdout = sys.__stdout__
if isinstance(sys.stderr, OutStream):
sys.stderr = sys.__stderr__
except ImportError:
pass
# link stdout/stderr to the fifo
self.stdout_redirect.link()
self.stderr_redirect.link()
def read_outerr(self) -> Tuple[str, str]:
out_str = self.stdout_redirect.read().decode()
err_str = self.stderr_redirect.read().decode()
return out_str, err_str