openpilot is an open source driver assistance system. openpilot performs the functions of Automated Lane Centering and Adaptive Cruise Control for over 200 supported car makes and models.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

250 lines
9.4 KiB

#!/usr/bin/env python3
import atexit
import logging
import os
import platform
import shutil
import time
from argparse import ArgumentParser, ArgumentTypeError
from collections.abc import Sequence
from pathlib import Path
from random import randint
from subprocess import Popen, PIPE
from typing import Literal
from cereal.messaging import SubMaster
from openpilot.common.basedir import BASEDIR
from openpilot.common.prefix import OpenpilotPrefix
from openpilot.tools.lib.route import Route
DEFAULT_OUTPUT = 'output.mp4'
DEMO_START = 90
DEMO_END = 105
DEMO_ROUTE = 'a2a0ccea32023010/2023-07-27--13-01-19'
FRAMERATE = 20
PIXEL_DEPTH = '24'
RESOLUTION = '2160x1080'
SECONDS_TO_WARM = 2
PROC_WAIT_SECONDS = 30
REPLAY = str(Path(BASEDIR, 'tools/replay/replay').resolve())
UI = str(Path(BASEDIR, 'selfdrive/ui/ui').resolve())
logger = logging.getLogger('clip.py')
def check_for_failure(proc: Popen):
exit_code = proc.poll()
if exit_code is not None and exit_code != 0:
cmd = str(proc.args)
if isinstance(proc.args, str):
cmd = proc.args
elif isinstance(proc.args, Sequence):
cmd = str(proc.args[0])
msg = f'{cmd} failed, exit code {exit_code}'
logger.error(msg)
stdout, stderr = proc.communicate()
if stdout:
logger.error(stdout.decode())
if stderr:
logger.error(stderr.decode())
raise ChildProcessError(msg)
def escape_ffmpeg_text(value: str):
special_chars = {',': '\\,', ':': '\\:', '=': '\\=', '[': '\\[', ']': '\\]'}
value = value.replace('\\', '\\\\\\\\\\\\\\\\')
for char, escaped in special_chars.items():
value = value.replace(char, escaped)
return value
def get_meta_text(route: Route):
metadata = route.get_metadata()
origin_parts = metadata['git_remote'].split('/')
origin = origin_parts[3] if len(origin_parts) > 3 else 'unknown'
return ', '.join([
f"openpilot v{metadata['version']}",
f"route: {metadata['fullname']}",
f"car: {metadata['platform']}",
f"origin: {origin}",
f"branch: {metadata['git_branch']}",
f"commit: {metadata['git_commit'][:7]}",
f"modified: {str(metadata['git_dirty']).lower()}",
])
def parse_args(parser: ArgumentParser):
args = parser.parse_args()
if args.demo:
args.route = DEMO_ROUTE
if args.start is None or args.end is None:
args.start = DEMO_START
args.end = DEMO_END
elif args.route.count('/') == 1:
if args.start is None or args.end is None:
parser.error('must provide both start and end if timing is not in the route ID')
elif args.route.count('/') == 3:
if args.start is not None or args.end is not None:
parser.error('don\'t provide timing when including it in the route ID')
parts = args.route.split('/')
args.route = '/'.join(parts[:2])
args.start = int(parts[2])
args.end = int(parts[3])
if args.end <= args.start:
parser.error(f'end ({args.end}) must be greater than start ({args.start})')
if args.start < SECONDS_TO_WARM:
parser.error(f'start must be greater than {SECONDS_TO_WARM}s to allow the UI time to warm up')
try:
args.route = Route(args.route, data_dir=args.data_dir)
except Exception as e:
parser.error(f'failed to get route: {e}')
# FIXME: length isn't exactly max segment seconds, simplify to replay exiting at end of data
length = round(args.route.max_seg_number * 60)
if args.start >= length:
parser.error(f'start ({args.start}s) cannot be after end of route ({length}s)')
if args.end > length:
parser.error(f'end ({args.end}s) cannot be after end of route ({length}s)')
return args
def start_proc(args: list[str], env: dict[str, str]):
return Popen(args, env=env, stdout=PIPE, stderr=PIPE)
def validate_env(parser: ArgumentParser):
if platform.system() not in ['Linux']:
parser.exit(1, f'clip.py: error: {platform.system()} is not a supported operating system\n')
for proc in ['Xvfb', 'ffmpeg']:
if shutil.which(proc) is None:
parser.exit(1, f'clip.py: error: missing {proc} command, is it installed?\n')
for proc in [REPLAY, UI]:
if shutil.which(proc) is None:
parser.exit(1, f'clip.py: error: missing {proc} command, did you build openpilot yet?\n')
def validate_output_file(output_file: str):
if not output_file.endswith('.mp4'):
raise ArgumentTypeError('output must be an mp4')
return output_file
def validate_route(route: str):
if route.count('/') not in (1, 3):
raise ArgumentTypeError(f'route must include or exclude timing, example: {DEMO_ROUTE}')
return route
def validate_title(title: str):
if len(title) > 80:
raise ArgumentTypeError('title must be no longer than 80 chars')
return title
def wait_for_frames(procs: list[Popen]):
sm = SubMaster(['uiDebug'])
no_frames_drawn = True
while no_frames_drawn:
sm.update()
no_frames_drawn = sm['uiDebug'].drawTimeMillis == 0.
for proc in procs:
check_for_failure(proc)
def clip(data_dir: str | None, quality: Literal['low', 'high'], prefix: str, route: Route, out: str, start: int, end: int, target_mb: int, title: str | None):
logger.info(f'clipping route {route.name.canonical_name}, start={start} end={end} quality={quality} target_filesize={target_mb}MB')
begin_at = max(start - SECONDS_TO_WARM, 0)
duration = end - start
bit_rate_kbps = int(round(target_mb * 8 * 1024 * 1024 / duration / 1000))
# TODO: evaluate creating fn that inspects /tmp/.X11-unix and creates unused display to avoid possibility of collision
display = f':{randint(99, 999)}'
meta_text = get_meta_text(route)
overlays = [
f"drawtext=text='{escape_ffmpeg_text(meta_text)}':fontfile=Inter.tff:fontcolor=white:fontsize=18:box=1:boxcolor=black@0.33:boxborderw=7:x=(w-text_w)/2:y=5.5:enable='between(t,1,5)'"
]
if title:
overlays.append(f"drawtext=text='{escape_ffmpeg_text(title)}':fontfile=Inter.tff:fontcolor=white:fontsize=32:box=1:boxcolor=black@0.33:boxborderw=10:x=(w-text_w)/2:y=53")
ffmpeg_cmd = [
'ffmpeg', '-y', '-video_size', RESOLUTION, '-framerate', str(FRAMERATE), '-f', 'x11grab', '-draw_mouse', '0',
'-i', display, '-c:v', 'libx264', '-maxrate', f'{bit_rate_kbps}k', '-bufsize', f'{bit_rate_kbps*2}k', '-crf', '23',
'-filter:v', ','.join(overlays), '-preset', 'ultrafast', '-pix_fmt', 'yuv420p', '-movflags', '+faststart', '-f', 'mp4', '-t', str(duration), out
]
replay_cmd = [REPLAY, '-c', '1', '-s', str(begin_at), '--prefix', prefix]
if data_dir:
replay_cmd.extend(['--data_dir', data_dir])
if quality == 'low':
replay_cmd.append('--qcam')
replay_cmd.append(route.name.canonical_name)
ui_cmd = [UI, '-platform', 'xcb']
xvfb_cmd = ['Xvfb', display, '-terminate', '-screen', '0', f'{RESOLUTION}x{PIXEL_DEPTH}']
with OpenpilotPrefix(prefix, shared_download_cache=True):
env = os.environ.copy()
env['DISPLAY'] = display
xvfb_proc = start_proc(xvfb_cmd, env)
atexit.register(lambda: xvfb_proc.terminate())
ui_proc = start_proc(ui_cmd, env)
atexit.register(lambda: ui_proc.terminate())
replay_proc = start_proc(replay_cmd, env)
atexit.register(lambda: replay_proc.terminate())
procs = [replay_proc, ui_proc, xvfb_proc]
logger.info('waiting for replay to begin (loading segments, may take a while)...')
wait_for_frames(procs)
logger.debug(f'letting UI warm up ({SECONDS_TO_WARM}s)...')
time.sleep(SECONDS_TO_WARM)
for proc in procs:
check_for_failure(proc)
ffmpeg_proc = start_proc(ffmpeg_cmd, env)
procs.append(ffmpeg_proc)
atexit.register(lambda: ffmpeg_proc.terminate())
logger.info(f'recording in progress ({duration}s)...')
ffmpeg_proc.wait(duration + PROC_WAIT_SECONDS)
for proc in procs:
check_for_failure(proc)
logger.info(f'recording complete: {Path(out).resolve()}')
def main():
p = ArgumentParser(prog='clip.py', description='clip your openpilot route.', epilog='comma.ai')
validate_env(p)
route_group = p.add_mutually_exclusive_group(required=True)
route_group.add_argument('route', nargs='?', type=validate_route, help=f'The route (e.g. {DEMO_ROUTE} or {DEMO_ROUTE}/{DEMO_START}/{DEMO_END})')
route_group.add_argument('--demo', help='use the demo route', action='store_true')
p.add_argument('-d', '--data-dir', help='local directory where route data is stored')
p.add_argument('-e', '--end', help='stop clipping at <end> seconds', type=int)
p.add_argument('-f', '--file-size', help='target file size (Discord/GitHub support max 10MB, default is 9MB)', type=float, default=9.)
p.add_argument('-o', '--output', help='output clip to (.mp4)', type=validate_output_file, default=DEFAULT_OUTPUT)
p.add_argument('-p', '--prefix', help='openpilot prefix', default=f'clip_{randint(100, 99999)}')
p.add_argument('-q', '--quality', help='quality of camera (low = qcam, high = hevc)', choices=['low', 'high'], default='high')
p.add_argument('-s', '--start', help='start clipping at <start> seconds', type=int)
p.add_argument('-t', '--title', help='overlay this title on the video (e.g. "Chill driving across the Golden Gate Bridge")', type=validate_title)
args = parse_args(p)
try:
clip(args.data_dir, args.quality, args.prefix, args.route, args.output, args.start, args.end, args.file_size, args.title)
except KeyboardInterrupt as e:
logger.exception('interrupted by user', exc_info=e)
except Exception as e:
logger.exception('encountered error', exc_info=e)
finally:
atexit._run_exitfuncs()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(name)s %(levelname)s\t%(message)s')
main()