You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					60 lines
				
				1.6 KiB
			
		
		
			
		
	
	
					60 lines
				
				1.6 KiB
			| 
											7 days ago
										 | import os
 | ||
|  | import sys
 | ||
|  | 
 | ||
|  | from typing import no_type_check
 | ||
|  | 
 | ||
|  | class FdRedirect:
 | ||
|  |   def __init__(self, file_prefix: str, fd: int):
 | ||
|  |     fname = os.path.join("/tmp", f"{file_prefix}.{fd}")
 | ||
|  |     if os.path.exists(fname):
 | ||
|  |       os.unlink(fname)
 | ||
|  |     self.dest_fd = os.open(fname, os.O_WRONLY | os.O_CREAT)
 | ||
|  |     self.dest_fname = fname
 | ||
|  |     self.source_fd = fd
 | ||
|  |     os.set_inheritable(self.dest_fd, True)
 | ||
|  | 
 | ||
|  |   def __del__(self):
 | ||
|  |     os.close(self.dest_fd)
 | ||
|  | 
 | ||
|  |   def purge(self) -> None:
 | ||
|  |     os.unlink(self.dest_fname)
 | ||
|  | 
 | ||
|  |   def read(self) -> bytes:
 | ||
|  |     with open(self.dest_fname, "rb") as f:
 | ||
|  |       return f.read() or b""
 | ||
|  | 
 | ||
|  |   def link(self) -> None:
 | ||
|  |     os.dup2(self.dest_fd, self.source_fd)
 | ||
|  | 
 | ||
|  | 
 | ||
|  | class ProcessOutputCapture:
 | ||
|  |   def __init__(self, proc_name: str, prefix: str):
 | ||
|  |     prefix = f"{proc_name}_{prefix}"
 | ||
|  |     self.stdout_redirect = FdRedirect(prefix, 1)
 | ||
|  |     self.stderr_redirect = FdRedirect(prefix, 2)
 | ||
|  | 
 | ||
|  |   def __del__(self):
 | ||
|  |     self.stdout_redirect.purge()
 | ||
|  |     self.stderr_redirect.purge()
 | ||
|  | 
 | ||
|  |   @no_type_check # ipython classes have incompatible signatures
 | ||
|  |   def link_with_current_proc(self) -> None:
 | ||
|  |     try:
 | ||
|  |       # prevent ipykernel from redirecting stdout/stderr of python subprocesses
 | ||
|  |       from ipykernel.iostream import OutStream
 | ||
|  |       if isinstance(sys.stdout, OutStream):
 | ||
|  |         sys.stdout = sys.__stdout__
 | ||
|  |       if isinstance(sys.stderr, OutStream):
 | ||
|  |         sys.stderr = sys.__stderr__
 | ||
|  |     except ImportError:
 | ||
|  |       pass
 | ||
|  | 
 | ||
|  |     # link stdout/stderr to the fifo
 | ||
|  |     self.stdout_redirect.link()
 | ||
|  |     self.stderr_redirect.link()
 | ||
|  | 
 | ||
|  |   def read_outerr(self) -> tuple[str, str]:
 | ||
|  |     out_str = self.stdout_redirect.read().decode()
 | ||
|  |     err_str = self.stderr_redirect.read().decode()
 | ||
|  |     return out_str, err_str
 |