openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.

133 lines
3.8 KiB

#!/usr/bin/env python3
2 weeks ago
from argparse import ArgumentParser
from cereal.messaging import SubMaster
from openpilot.common.prefix import OpenpilotPrefix
from subprocess import DEVNULL
from random import randint
import atexit
2 weeks ago
import os
import signal
import subprocess
import time
2 weeks ago
RESOLUTION = "2160x1080"
PIXEL_DEPTH = "24"
FRAMERATE = 20
2 weeks ago
DEFAULT_OUTPUT = "output.mp4"
DEMO_START = 20
DEMO_END = 30
DEMO_ROUTE = "a2a0ccea32023010/2023-07-27--13-01-19"
2 weeks ago
def wait_for_video():
sm = SubMaster(['uiDebug'])
no_frames_drawn = True
while no_frames_drawn:
sm.update()
no_frames_drawn = sm['uiDebug'].drawTimeMillis == 0.
2 weeks ago
def main(route: str, output_filepath: str, start_seconds: int, end_seconds: int):
# TODO: evaluate creating fn that inspects /tmp/.X11-unix and creates unused display to avoid possibility of collision
2 weeks ago
display_num = str(randint(99, 999))
2 weeks ago
duration = end_seconds - start_seconds
2 weeks ago
env = os.environ.copy()
xauth = f'/tmp/clip-xauth--{display_num}'
env['XAUTHORITY'] = xauth
2 weeks ago
env["QT_QPA_PLATFORM"] = "xcb"
ui_proc = subprocess.Popen(['xvfb-run', '-f', xauth, '-n', display_num, '-s', f'-screen 0 {RESOLUTION}x{PIXEL_DEPTH}', './selfdrive/ui/ui'], env=env)
2 weeks ago
atexit.register(lambda: ui_proc.terminate())
replay_proc = subprocess.Popen(
2 weeks ago
["./tools/replay/replay", "-c", "1", "-s", str(start_seconds), "--no-loop", "--prefix", str(env.get('OPENPILOT_PREFIX')), route],
env=env, stdout=DEVNULL, stderr=DEVNULL)
2 weeks ago
atexit.register(lambda: replay_proc.terminate())
print('waiting for replay to begin (may take a while)...')
2 weeks ago
wait_for_video()
ffmpeg_cmd = [
"ffmpeg",
2 weeks ago
"-y",
"-video_size", RESOLUTION,
"-framerate", str(FRAMERATE),
2 weeks ago
"-f",
"x11grab",
"-i", f":{display_num}",
2 weeks ago
"-draw_mouse",
"0",
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-pix_fmt",
"yuv420p",
2 weeks ago
output_filepath,
2 weeks ago
]
ffmpeg_proc = subprocess.Popen(ffmpeg_cmd, env=env)
2 weeks ago
atexit.register(lambda: ffmpeg_proc.terminate())
print('recording in progress...')
2 weeks ago
time.sleep(duration)
2 weeks ago
ffmpeg_proc.send_signal(signal.SIGINT)
ffmpeg_proc.wait(timeout=5)
ui_proc.terminate()
ui_proc.wait(timeout=5)
print(f"recording complete: {output_filepath}")
2 weeks ago
def parse_args(parser: ArgumentParser):
args = parser.parse_args()
if not args.demo:
assert args.route is not None, 'must provide route'
assert args.route.count('/') == 1 or args.route.count('/') == 3, 'route must include or exclude timing, example: ' + DEMO_ROUTE
if args.demo:
args.route = DEMO_ROUTE
args.start = DEMO_START
args.end = DEMO_END
elif args.route.count('/') == 3:
parts = args.route.split('/')
args.start = int(parts[2])
args.end = int(parts[3])
args.route = '/'.join(parts[:2])
assert args.end > args.start, 'end must be greater than start'
return args
2 weeks ago
if __name__ == "__main__":
2 weeks ago
p = ArgumentParser(
prog='clip.py',
description='Clip your openpilot route.',
epilog='comma.ai'
)
p.add_argument('-p', '--prefix', help='openpilot prefix', default=f'clip_{randint(100, 99999)}')
2 weeks ago
p.add_argument('-o', '--output', help='Output clip to (.mp4)', default=DEFAULT_OUTPUT)
p.add_argument('-s', '--start', help='Start clipping at <start> seconds', type=int)
p.add_argument('-e', '--end', help='Stop clipping at <end> seconds', type=int)
p.add_argument('-d', '--demo', help='Use the demo route', action='store_true')
p.add_argument('-r', '--route', help=f'The route (e.g. {DEMO_ROUTE} or {DEMO_ROUTE}/{DEMO_START}/{DEMO_END})')
args = parse_args(p)
print(f'clipping route {args.route}, start={args.start} end={args.end}')
2 weeks ago
try:
with OpenpilotPrefix(args.prefix, shared_download_cache=True) as p:
2 weeks ago
main(args.route, args.output, args.start, args.end)
2 weeks ago
except KeyboardInterrupt:
print("Interrupted by user")
except Exception as e:
print(f"Error: {e}")
finally:
atexit._run_exitfuncs()