You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							59 lines
						
					
					
						
							1.6 KiB
						
					
					
				
			
		
		
	
	
							59 lines
						
					
					
						
							1.6 KiB
						
					
					
				import os
 | 
						|
import sys
 | 
						|
 | 
						|
from typing import no_type_check
 | 
						|
 | 
						|
class FdRedirect:
 | 
						|
  def __init__(self, file_prefix: str, fd: int):
 | 
						|
    fname = os.path.join("/tmp", f"{file_prefix}.{fd}")
 | 
						|
    if os.path.exists(fname):
 | 
						|
      os.unlink(fname)
 | 
						|
    self.dest_fd = os.open(fname, os.O_WRONLY | os.O_CREAT)
 | 
						|
    self.dest_fname = fname
 | 
						|
    self.source_fd = fd
 | 
						|
    os.set_inheritable(self.dest_fd, True)
 | 
						|
 | 
						|
  def __del__(self):
 | 
						|
    os.close(self.dest_fd)
 | 
						|
 | 
						|
  def purge(self) -> None:
 | 
						|
    os.unlink(self.dest_fname)
 | 
						|
 | 
						|
  def read(self) -> bytes:
 | 
						|
    with open(self.dest_fname, "rb") as f:
 | 
						|
      return f.read() or b""
 | 
						|
 | 
						|
  def link(self) -> None:
 | 
						|
    os.dup2(self.dest_fd, self.source_fd)
 | 
						|
 | 
						|
 | 
						|
class ProcessOutputCapture:
 | 
						|
  def __init__(self, proc_name: str, prefix: str):
 | 
						|
    prefix = f"{proc_name}_{prefix}"
 | 
						|
    self.stdout_redirect = FdRedirect(prefix, 1)
 | 
						|
    self.stderr_redirect = FdRedirect(prefix, 2)
 | 
						|
 | 
						|
  def __del__(self):
 | 
						|
    self.stdout_redirect.purge()
 | 
						|
    self.stderr_redirect.purge()
 | 
						|
 | 
						|
  @no_type_check # ipython classes have incompatible signatures
 | 
						|
  def link_with_current_proc(self) -> None:
 | 
						|
    try:
 | 
						|
      # prevent ipykernel from redirecting stdout/stderr of python subprocesses
 | 
						|
      from ipykernel.iostream import OutStream
 | 
						|
      if isinstance(sys.stdout, OutStream):
 | 
						|
        sys.stdout = sys.__stdout__
 | 
						|
      if isinstance(sys.stderr, OutStream):
 | 
						|
        sys.stderr = sys.__stderr__
 | 
						|
    except ImportError:
 | 
						|
      pass
 | 
						|
 | 
						|
    # link stdout/stderr to the fifo
 | 
						|
    self.stdout_redirect.link()
 | 
						|
    self.stderr_redirect.link()
 | 
						|
 | 
						|
  def read_outerr(self) -> tuple[str, str]:
 | 
						|
    out_str = self.stdout_redirect.read().decode()
 | 
						|
    err_str = self.stderr_redirect.read().decode()
 | 
						|
    return out_str, err_str
 | 
						|
 |