from msgq.visionipc import VisionIpcClient, VisionStreamType import os import signal import subprocess from subprocess import DEVNULL import time import atexit XVFB_DISPLAY = ":99" RESOLUTION = "2160x1080" PIXEL_DEPTH = "24" FRAMERATE = 20 FFMPEG_OUTPUT = "output.mp4" # TODO: input ROUTE = "a2a0ccea32023010/2023-07-27--13-01-19/0" # TODO: input def wait_for_video(): vipc = VisionIpcClient('camerad', VisionStreamType.VISION_STREAM_ROAD, True) while True: if not vipc.is_connected(): vipc.connect(True) new_data = vipc.recv() if new_data is not None and new_data.data.any(): return time.sleep(0.01) # Prevent tight loop def ensure_xvfb(): """Start Xvfb and return the process.""" xvfb_cmd = [ "Xvfb", XVFB_DISPLAY, "-screen", "0", f"{RESOLUTION}x{PIXEL_DEPTH}" ] xvfb_proc = subprocess.Popen(xvfb_cmd, stdout=DEVNULL, stderr=DEVNULL) time.sleep(1) # Give Xvfb time to start if xvfb_proc.poll() is not None: raise RuntimeError("Failed to start Xvfb") return xvfb_proc def main(): # Set up environment env = os.environ.copy() env["DISPLAY"] = XVFB_DISPLAY # Set DISPLAY for all processes env["QT_QPA_PLATFORM"] = "xcb" # Start Xvfb xvfb_proc = ensure_xvfb() atexit.register(lambda: xvfb_proc.terminate()) # Ensure cleanup on exit # Start UI process ui_args = ["./selfdrive/ui/ui"] ui_proc = subprocess.Popen(ui_args, env=env, stdout=DEVNULL, stderr=DEVNULL) atexit.register(lambda: ui_proc.terminate()) # Start replay process replay_proc = subprocess.Popen( ["./tools/replay/replay", "--no-loop", ROUTE], env=env, stdout=DEVNULL, stderr=DEVNULL ) atexit.register(lambda: replay_proc.terminate()) # Wait for video data wait_for_video() # Start FFmpeg ffmpeg_cmd = [ "ffmpeg", "-y", # Overwrite output "-video_size", RESOLUTION, "-framerate", str(FRAMERATE), "-f", "x11grab", "-draw_mouse", "0", "-i", XVFB_DISPLAY, "-c:v", "libx264", "-preset", "ultrafast", "-pix_fmt", "yuv420p", FFMPEG_OUTPUT, ] ffmpeg_proc = subprocess.Popen(ffmpeg_cmd, env=env, stdout=DEVNULL, stderr=DEVNULL) atexit.register(lambda: ffmpeg_proc.terminate()) # Wait for replay to finish replay_proc.wait() # Stop FFmpeg gracefully ffmpeg_proc.send_signal(signal.SIGINT) ffmpeg_proc.wait(timeout=5) # Clean up ui_proc.terminate() ui_proc.wait(timeout=5) xvfb_proc.terminate() xvfb_proc.wait(timeout=5) print(f"Recording complete: {FFMPEG_OUTPUT}") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("Interrupted by user") except Exception as e: print(f"Error: {e}") finally: # Ensure all processes are terminated atexit._run_exitfuncs()