openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.

133 lines
3.9 KiB

#!/usr/bin/env python3
1 week ago
from argparse import ArgumentParser
from cereal.messaging import SubMaster
from openpilot.common.prefix import OpenpilotPrefix
from subprocess import DEVNULL
from random import randint
import atexit
1 week ago
import os
import signal
import subprocess
import time
1 week ago
RESOLUTION = "2160x1080"
PIXEL_DEPTH = "24"
FRAMERATE = 20
1 week ago
DEFAULT_OUTPUT = "output.mp4"
DEMO_START = 20
DEMO_END = 30
DEMO_ROUTE = "a2a0ccea32023010/2023-07-27--13-01-19"
1 week ago
def wait_for_video():
sm = SubMaster(['uiDebug'])
no_frames_drawn = True
while no_frames_drawn:
sm.update()
no_frames_drawn = sm['uiDebug'].drawTimeMillis == 0.
1 week ago
def main(route: str, output_filepath: str, start_seconds: int, end_seconds: int):
# TODO: evaluate creating fn that inspects /tmp/.X11-unix and creates unused display to avoid possibility of collision
1 week ago
display_num = str(randint(99, 999))
1 week ago
duration = end_seconds - start_seconds
1 week ago
env = os.environ.copy()
xauth = f'/tmp/clip-xauth--{display_num}'
env['XAUTHORITY'] = xauth
1 week ago
env["QT_QPA_PLATFORM"] = "xcb"
ui_proc = subprocess.Popen(['xvfb-run', '-f', xauth, '-n', display_num, '-s', f'-screen 0 {RESOLUTION}x{PIXEL_DEPTH}', './selfdrive/ui/ui'], env=env)
1 week ago
atexit.register(lambda: ui_proc.terminate())
replay_proc = subprocess.Popen(
1 week ago
["./tools/replay/replay", "-c", "1", "-s", str(start_seconds), "--no-loop", "--prefix", str(env.get('OPENPILOT_PREFIX')), route],
env=env, stdout=DEVNULL, stderr=DEVNULL)
1 week ago
atexit.register(lambda: replay_proc.terminate())
print('waiting for replay to begin (may take a while)...')
1 week ago
wait_for_video()
ffmpeg_cmd = [
"ffmpeg",
1 week ago
"-y",
"-video_size", RESOLUTION,
"-framerate", str(FRAMERATE),
1 week ago
"-f",
"x11grab",
"-i", f":{display_num}",
1 week ago
"-draw_mouse",
"0",
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-pix_fmt",
"yuv420p",
1 week ago
output_filepath,
1 week ago
]
1 week ago
ffmpeg_proc = subprocess.Popen(ffmpeg_cmd, env=env, stdout=DEVNULL, stderr=DEVNULL)
1 week ago
atexit.register(lambda: ffmpeg_proc.terminate())
print('recording in progress...')
1 week ago
time.sleep(duration)
1 week ago
ffmpeg_proc.send_signal(signal.SIGINT)
ffmpeg_proc.wait(timeout=5)
ui_proc.terminate()
ui_proc.wait(timeout=5)
print(f"recording complete: {output_filepath}")
1 week ago
def parse_args(parser: ArgumentParser):
args = parser.parse_args()
if not args.demo:
assert args.route is not None, 'must provide route'
assert args.route.count('/') == 1 or args.route.count('/') == 3, 'route must include or exclude timing, example: ' + DEMO_ROUTE
if args.demo:
args.route = DEMO_ROUTE
args.start = DEMO_START
args.end = DEMO_END
elif args.route.count('/') == 3:
parts = args.route.split('/')
args.start = int(parts[2])
args.end = int(parts[3])
args.route = '/'.join(parts[:2])
assert args.end > args.start, 'end must be greater than start'
return args
1 week ago
if __name__ == "__main__":
1 week ago
p = ArgumentParser(
prog='clip.py',
description='Clip your openpilot route.',
epilog='comma.ai'
)
p.add_argument('-p', '--prefix', help='openpilot prefix', default=f'clip_{randint(100, 99999)}')
1 week ago
p.add_argument('-o', '--output', help='Output clip to (.mp4)', default=DEFAULT_OUTPUT)
p.add_argument('-s', '--start', help='Start clipping at <start> seconds', type=int)
p.add_argument('-e', '--end', help='Stop clipping at <end> seconds', type=int)
p.add_argument('-d', '--demo', help='Use the demo route', action='store_true')
p.add_argument('-r', '--route', help=f'The route (e.g. {DEMO_ROUTE} or {DEMO_ROUTE}/{DEMO_START}/{DEMO_END})')
args = parse_args(p)
print(f'clipping route {args.route}, start={args.start} end={args.end}')
1 week ago
try:
with OpenpilotPrefix(args.prefix, shared_download_cache=True) as p:
1 week ago
main(args.route, args.output, args.start, args.end)
1 week ago
except KeyboardInterrupt:
print("Interrupted by user")
except Exception as e:
print(f"Error: {e}")
finally:
atexit._run_exitfuncs()