From 41ee057acdf67d8a43d11cf51d79a5726e3563d7 Mon Sep 17 00:00:00 2001 From: Hoang Bui <47828508+bongbui321@users.noreply.github.com> Date: Mon, 22 Jul 2024 15:05:03 -0400 Subject: [PATCH] tools/Rerun: Add video logging features (#32810) * working * multiprocessing * fix that * print services * all services + fix * less verbose * start readme * segment range * cleanup * update readme + fix bug in 'all' * cleanup + update readme * update readme * cleanup * cleanup * rm frame_iter * cleanup * staticmethod * proc kill * split files * fix range with hevc vids * update reamde + add prompt * readme * readme * readme old-commit-hash: 5e0aff92ae24952bfb13271c7875e89487da5b31 --- tools/rerun/README.md | 57 ++++++++++ tools/rerun/camera_reader.py | 92 +++++++++++++++ tools/rerun/run.py | 214 ++++++++++++++++++++++++----------- 3 files changed, 299 insertions(+), 64 deletions(-) create mode 100644 tools/rerun/README.md create mode 100644 tools/rerun/camera_reader.py diff --git a/tools/rerun/README.md b/tools/rerun/README.md new file mode 100644 index 0000000000..0fd50e1562 --- /dev/null +++ b/tools/rerun/README.md @@ -0,0 +1,57 @@ +# Rerun +Rerun is a tool to quickly visualize time series data. It supports all openpilot logs , both the `logMessages` and video logs. + +[Instructions](https://rerun.io/docs/reference/viewer/overview) for navigation within the Rerun Viewer. + +## Usage +``` +usage: run.py [-h] [--demo] [--qcam] [--fcam] [--ecam] [--dcam] [--print_services] [--services [SERVICES ...]] [route_or_segment_name] + +A helper to run rerun on openpilot routes + +options: + -h, --help show this help message and exit + --demo Use the demo route instead of providing one (default: False) + --qcam Log decimated driving camera (default: False) + --fcam Log driving camera (default: False) + --ecam Log wide camera (default: False) + --dcam Log driver monitoring camera (default: False) + --print_services List out openpilot services (default: False) + --services [SERVICES ...] Specify openpilot services that will be logged. No service will be logged if not specified. + To log all services include 'all' as one of your services (default: []) + --route [ROUTE] The route or segment name to plot (default: None) +``` + +Examples using route name to observe accelerometer and qcamera: + +`./run.py --services accelerometer --qcam --route "a2a0ccea32023010/2023-07-27--13-01-19"` + +Examples using segment range (more on [SegmentRange](https://github.com/commaai/openpilot/tree/master/tools/lib)): + +`./run.py --qcam --route "a2a0ccea32023010/2023-07-27--13-01-19/2:4"` + +## Cautions: +- You can specify `--services all` to visualize all `logMessage`, but it will draw a lot of memory usage and take a long time to log all messages. Rerun isn't ready for logging big number of data. + +- Logging hevc videos (`--fcam`, `--ecam`, and `--dcam`) are expensive, and it's recommended to use `--qcam` for optimized performance. If possible, limiting your route to a few segments using `SegmentRange` will speed up logging and reduce memory usage + +This example draws 13GB of memory: + +`./run.py --services accelerometer --qcam --route "a2a0ccea32023010/2023-07-27--13-01-19"` + + +## Openpilot services +To list all openpilot services: + +`./run.py --print_services` + +Examples including openpilot services: + +`./run.py --services accelerometer cameraodometry --route "a2a0ccea32023010/2023-07-27--13-01-19/0/q"` + +Examples including all services: + +`./run.py --services all --route "a2a0ccea32023010/2023-07-27--13-01-19/0/q"` + +## Demo +`./run.py --services accelerometer carcontrol caroutput --qcam --demo` diff --git a/tools/rerun/camera_reader.py b/tools/rerun/camera_reader.py new file mode 100644 index 0000000000..325fad18b8 --- /dev/null +++ b/tools/rerun/camera_reader.py @@ -0,0 +1,92 @@ +import tqdm +import subprocess +import multiprocessing +from enum import StrEnum +from functools import partial +from collections import namedtuple + +from openpilot.tools.lib.framereader import ffprobe + +CameraConfig = namedtuple("CameraConfig", ["qcam", "fcam", "ecam", "dcam"]) + +class CameraType(StrEnum): + qcam = "qcamera" + fcam = "fcamera" + ecam = "ecamera" + dcam = "dcamera" + + +def probe_packet_info(camera_path): + args = ["ffprobe", "-v", "quiet", "-show_packets", "-probesize", "10M", camera_path] + dat = subprocess.check_output(args) + dat = dat.decode().split() + return dat + + +class _FrameReader: + def __init__(self, camera_path, segment, h, w, start_time): + self.camera_path = camera_path + self.segment = segment + self.h = h + self.w = w + self.start_time = start_time + + self.ts = self._get_ts() + + def _read_stream_nv12(self): + frame_sz = self.w * self.h * 3 // 2 + proc = subprocess.Popen( + ["ffmpeg", "-v", "quiet", "-i", self.camera_path, "-f", "rawvideo", "-pix_fmt", "nv12", "-"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL + ) + try: + while True: + dat = proc.stdout.read(frame_sz) + if len(dat) == 0: + break + yield dat + finally: + proc.kill() + + def _get_ts(self): + dat = probe_packet_info(self.camera_path) + try: + ret = [float(d.split('=')[1]) for d in dat if d.startswith("pts_time=")] + except ValueError: + # pts_times aren't available. Infer timestamps from duration_times + ret = [d for d in dat if d.startswith("duration_time")] + ret = [float(d.split('=')[1])*(i+1)+(self.segment*60)+self.start_time for i, d in enumerate(ret)] + return ret + + def __iter__(self): + for i, frame in enumerate(self._read_stream_nv12()): + yield self.ts[i], frame + + +class CameraReader: + def __init__(self, camera_paths, start_time, seg_idxs): + self.seg_idxs = seg_idxs + self.camera_paths = camera_paths + self.start_time = start_time + + probe = ffprobe(camera_paths[0])["streams"][0] + self.h = probe["height"] + self.w = probe["width"] + + self.__frs = {} + + def _get_fr(self, i): + if i not in self.__frs: + self.__frs[i] = _FrameReader(self.camera_paths[i], segment=i, h=self.h, w=self.w, start_time=self.start_time) + return self.__frs[i] + + def _run_on_segment(self, func, i): + return func(self._get_fr(i)) + + def run_across_segments(self, num_processes, func): + with multiprocessing.Pool(num_processes) as pool: + num_segs = len(self.seg_idxs) + for _ in tqdm.tqdm(pool.imap_unordered(partial(self._run_on_segment, func), self.seg_idxs), total=num_segs): + continue + diff --git a/tools/rerun/run.py b/tools/rerun/run.py index 9975478a73..421785a2d5 100755 --- a/tools/rerun/run.py +++ b/tools/rerun/run.py @@ -6,84 +6,170 @@ import rerun as rr import rerun.blueprint as rrb from functools import partial -from openpilot.tools.lib.logreader import LogReader from cereal.services import SERVICE_LIST +from openpilot.tools.rerun.camera_reader import probe_packet_info, CameraReader, CameraConfig, CameraType +from openpilot.tools.lib.logreader import LogReader +from openpilot.tools.lib.route import Route, SegmentRange NUM_CPUS = multiprocessing.cpu_count() DEMO_ROUTE = "a2a0ccea32023010|2023-07-27--13-01-19" +RR_TIMELINE_NAME = "Timeline" +RR_WIN = "rerun_test" + + +class Rerunner: + def __init__(self, route, segment_range, camera_config, enabled_services): + self.enabled_services = [s.lower() for s in enabled_services] + self.log_all = "all" in self.enabled_services + self.lr = LogReader(route_or_segment_name) + + # hevc files don't have start_time. We get it from qcamera.ts + start_time = 0 + dat = probe_packet_info(r.qcamera_paths()[0]) + for d in dat: + if d.startswith("pts_time="): + start_time = float(d.split('=')[1]) + break + + qcam, fcam, ecam, dcam = camera_config + self.camera_readers = {} + if qcam: + self.camera_readers[CameraType.qcam] = CameraReader(route.qcamera_paths(), start_time, segment_range.seg_idxs) + if fcam: + self.camera_readers[CameraType.fcam] = CameraReader(route.camera_paths(), start_time, segment_range.seg_idxs) + if ecam: + self.camera_readers[CameraType.ecam] = CameraReader(route.ecamera_paths(), start_time, segment_range.seg_idxs) + if dcam: + self.camera_readers[CameraType.dcam] = CameraReader(route.dcamera_paths(), start_time, segment_range.seg_idxs) + + def _start_rerun(self): + self.blueprint = self._create_blueprint() + rr.init(RR_WIN, spawn=True) + + def _create_blueprint(self): + blueprint = None + service_views = [] + + log_msg_visible = len(self.enabled_services) <= 3 and not self.log_all + for topic in sorted(SERVICE_LIST.keys()): + if not self.log_all and topic.lower() not in self.enabled_services: + continue + View = rrb.TimeSeriesView if topic != "thumbnail" else rrb.Spatial2DView + service_views.append(View(name=topic, origin=f"/{topic}/", visible=log_msg_visible)) + rr.log(topic, rr.SeriesLine(name=topic), timeless=True) + + blueprint = rrb.Blueprint( + rrb.Horizontal( + rrb.Vertical(*service_views), + rrb.Vertical(*[rrb.Spatial2DView(name=cam_type, origin=cam_type) for cam_type in self.camera_readers.keys()]), + ), + rrb.SelectionPanel(expanded=False), + rrb.TimePanel(expanded=False) + ) + return blueprint + + @staticmethod + def _log_msg(msg, parent_key=''): + stack = [(msg, parent_key)] + while stack: + current_msg, current_parent_key = stack.pop() + if isinstance(current_msg, list): + for index, item in enumerate(current_msg): + new_key = f"{current_parent_key}/{index}" + if isinstance(item, (int, float)): + rr.log(new_key, rr.Scalar(item)) + elif isinstance(item, dict): + stack.append((item, new_key)) + elif isinstance(current_msg, dict): + for key, value in current_msg.items(): + new_key = f"{current_parent_key}/{key}" + if isinstance(value, (int, float)): + rr.log(new_key, rr.Scalar(value)) + elif isinstance(value, dict): + stack.append((value, new_key)) + elif isinstance(value, list): + for index, item in enumerate(value): + if isinstance(item, (int, float)): + rr.log(f"{new_key}/{index}", rr.Scalar(item)) + else: + pass # Not a plottable value + + @staticmethod + @rr.shutdown_at_exit + def _process_log_msgs(blueprint, enabled_services, log_all, lr): + rr.init(RR_WIN) + rr.connect(default_blueprint=blueprint) + + for msg in lr: + rr.set_time_nanos(RR_TIMELINE_NAME, msg.logMonoTime) + msg_type = msg.which() + + if not log_all and msg_type.lower() not in enabled_services: + continue + + if msg_type != "thumbnail": + Rerunner._log_msg(msg.to_dict()[msg.which()], msg.which()) + else: + rr.log("/thumbnail", rr.ImageEncoded(contents=msg.to_dict()[msg.which()].get("thumbnail"))) + + return [] + + @staticmethod + @rr.shutdown_at_exit + def _process_cam_readers(blueprint, cam_type, h, w, fr): + rr.init(RR_WIN) + rr.connect(default_blueprint=blueprint) + + size_hint = (h, w) + for ts, frame in fr: + rr.set_time_nanos(RR_TIMELINE_NAME, int(ts * 1e9)) + rr.log(cam_type, rr.ImageEncoded(contents=frame,format=rr.ImageFormat.NV12(size_hint))) + + def load_data(self): + self._start_rerun() + if len(self.enabled_services) > 0: + self.lr.run_across_segments(NUM_CPUS, partial(self._process_log_msgs, self.blueprint, self.enabled_services, self.log_all)) + for cam_type, cr in self.camera_readers.items(): + cr.run_across_segments(NUM_CPUS, partial(self._process_cam_readers, self.blueprint, cam_type, cr.h, cr.w)) -def log_msg(msg, parent_key=''): - stack = [(msg, parent_key)] - while stack: - current_msg, current_parent_key = stack.pop() - if isinstance(current_msg, list): - for index, item in enumerate(current_msg): - new_key = f"{current_parent_key}/{index}" - if isinstance(item, (int, float)): - rr.log(str(new_key), rr.Scalar(item)) - elif isinstance(item, dict): - stack.append((item, new_key)) - elif isinstance(current_msg, dict): - for key, value in current_msg.items(): - new_key = f"{current_parent_key}/{key}" - if isinstance(value, (int, float)): - rr.log(str(new_key), rr.Scalar(value)) - elif isinstance(value, dict): - stack.append((value, new_key)) - elif isinstance(value, list): - for index, item in enumerate(value): - if isinstance(item, (int, float)): - rr.log(f"{new_key}/{index}", rr.Scalar(item)) - else: - pass # Not a plottable value - -def createBlueprint(): - blueprint = None - timeSeriesViews = [] - for topic in sorted(SERVICE_LIST.keys()): - timeSeriesViews.append(rrb.TimeSeriesView(name=topic, origin=f"/{topic}/", visible=False)) - rr.log(topic, rr.SeriesLine(name=topic), timeless=True) - blueprint = rrb.Blueprint(rrb.Grid(rrb.Vertical(*timeSeriesViews,rrb.SelectionPanel(expanded=False),rrb.TimePanel(expanded=False)), - rrb.Spatial2DView(name="thumbnail", origin="/thumbnail"))) - return blueprint - -def log_thumbnail(thumbnailMsg): - bytesImgData = thumbnailMsg.get('thumbnail') - rr.log("/thumbnail", rr.ImageEncoded(contents=bytesImgData)) - -@rr.shutdown_at_exit -def process(blueprint, lr): - rr.init("rerun_test") - rr.connect(default_blueprint=blueprint) - - ret = [] - for msg in lr: - ret.append(msg) - rr.set_time_nanos("TIMELINE", msg.logMonoTime) - if msg.which() != "thumbnail": - log_msg(msg.to_dict()[msg.which()], msg.which()) - else: - log_thumbnail(msg.to_dict()[msg.which()]) - return ret if __name__ == '__main__': parser = argparse.ArgumentParser(description="A helper to run rerun on openpilot routes", formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--demo", action="store_true", help="Use the demo route instead of providing one") - parser.add_argument("route_or_segment_name", nargs='?', help="The route or segment name to plot") + parser.add_argument("--qcam", action="store_true", help="Log decimated driving camera") + parser.add_argument("--fcam", action="store_true", help="Log driving camera") + parser.add_argument("--ecam", action="store_true", help="Log wide camera") + parser.add_argument("--dcam", action="store_true", help="Log driver monitoring camera") + parser.add_argument("--print_services", action="store_true", help="List out openpilot services") + parser.add_argument("--services", default=[], nargs='*', help="Specify openpilot services that will be logged.\ + No service will be logged if not specified.\ + To log all services include 'all' as one of your services") + parser.add_argument("--route", nargs='?', help="The route or segment name to plot") + args = parser.parse_args() - if len(sys.argv) == 1: + if not args.demo and not args.route: parser.print_help() sys.exit() - args = parser.parse_args() + if args.print_services: + print("\n".join(SERVICE_LIST.keys())) + sys.exit() + + camera_config = CameraConfig(args.qcam, args.fcam, args.ecam, args.dcam) + + route_or_segment_name = DEMO_ROUTE if args.demo else args.route.strip() + sr = SegmentRange(route_or_segment_name) + r = Route(sr.route_name) + + if len(sr.seg_idxs) > 10: + print("You're requesting more than 10 segments of the route, " + \ + "please be aware that might take a lot of memory") + response = input("Do you wish to continue? (Y/n): ") + if response.strip() != "Y": + sys.exit() - blueprint = createBlueprint() - rr.init("rerun_test") - rr.spawn(connect=False) # child processes stream data to Viewer + rerunner = Rerunner(r, sr, camera_config, args.services) + rerunner.load_data() - route_or_segment_name = DEMO_ROUTE if args.demo else args.route_or_segment_name.strip() - print("Getting route log paths") - lr = LogReader(route_or_segment_name) - lr.run_across_segments(NUM_CPUS, partial(process, blueprint))