From d0d63b7fb973db02a032f78595f574082111b43b Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Mon, 4 Oct 2021 21:07:40 +0200 Subject: [PATCH] replace unlogger.py with c++ replay (#22430) old-commit-hash: e233d59e03683b829884a0fea8ec119c9c5a8378 --- tools/replay/README.md | 54 ++--- tools/replay/rqplot.py | 81 ------- tools/replay/unlogger.py | 500 --------------------------------------- 3 files changed, 23 insertions(+), 612 deletions(-) delete mode 100755 tools/replay/rqplot.py delete mode 100755 tools/replay/unlogger.py diff --git a/tools/replay/README.md b/tools/replay/README.md index df840a7dc6..c27169ca1d 100644 --- a/tools/replay/README.md +++ b/tools/replay/README.md @@ -10,24 +10,26 @@ In order to replay specific route: MOCK=1 selfdrive/boardd/tests/boardd_old.py # In another terminal: -python replay/unlogger.py +selfdrive/ui/replay/replay ``` Replay driving data ------------- -`unlogger.py` replays all the messages logged while running openpilot. +`replay` replays all the messages logged while running openpilot. -Unlogger with remote data: +Replay with remote data: ```bash -# Log in via browser +# Log in via browser to have access to non public route python lib/auth.py -# Start unlogger -python replay/unlogger.py +# Start replay +selfdrive/ui/replay/replay # Example: -# python replay/unlogger.py '4cf7a6ad03080c90|2021-09-29--13-46-36' +# selfdrive/ui/replay/replay '4cf7a6ad03080c90|2021-09-29--13-46-36' +# or use --demo to replay the default demo route: +# selfdrive/ui/replay/replay --demo # In another terminal you can run a debug visualizer: python replay/ui.py # Define the environmental variable HORIZONTAL is the ui layout is too tall @@ -40,30 +42,20 @@ cd selfdrive/ui && ./ui ## usage ``` bash -$ ./unlogger.py -h -usage: unlogger.py [-h] [--no-loop] [--min | --enabled ENABLED] [--disabled DISABLED] [--tl PUBLISH_TIME_LENGTH] [--no-realtime] - [--no-interactive] [--bind-early] [--no-visionipc] [--start-time START_TIME] - [route_name] [data_dir] [address_mapping [address_mapping ...]] - +$ selfdrive/ui/replay/replay -h +Usage: selfdrive/ui/replay/replay [options] route Mock openpilot components by publishing logged messages. -positional arguments: - route_name The route whose messages will be published. (default: None) - data_dir Path to directory in which log and camera files are located. (default: None) - address_mapping Pairs = to publish on . (default: None) - -optional arguments: - -h, --help show this help message and exit - --no-loop Stop at the end of the replay. (default: False) - --min - --enabled ENABLED - --disabled DISABLED - --tl PUBLISH_TIME_LENGTH - Length of interval in event time for which messages should be published. (default: None) - --no-realtime Publish messages as quickly as possible instead of realtime. (default: True) - --no-interactive Disable interactivity. (default: True) - --bind-early Bind early to avoid dropping messages. (default: False) - --no-visionipc Do not output video over visionipc (default: False) - --start-time START_TIME - Seek to this absolute time (in seconds) upon starting playback. (default: 0.0) +Options: + -h, --help Displays this help. + -a, --allow whitelist of services to send + -b, --block blacklist of services to send + -s, --start start from + --demo use a demo route instead of providing your own + --dcam load driver camera + --ecam load wide road camera + +Arguments: + route the drive to replay. find your drives at + connect.comma.ai ``` diff --git a/tools/replay/rqplot.py b/tools/replay/rqplot.py deleted file mode 100755 index 98efe85f94..0000000000 --- a/tools/replay/rqplot.py +++ /dev/null @@ -1,81 +0,0 @@ -#!/usr/bin/env python -# type: ignore -import sys -import matplotlib.pyplot as plt -import numpy as np -import cereal.messaging as messaging -import time - -# tool to plot one or more signals live. Call ex: -#./rqplot.py log.carState.vEgo log.carState.aEgo - -# TODO: can this tool consume 10x less cpu? - -def recursive_getattr(x, name): - l = name.split('.') - if len(l) == 1: - return getattr(x, name) - else: - return recursive_getattr(getattr(x, l[0]), ".".join(l[1:]) ) - - -if __name__ == "__main__": - poller = messaging.Poller() - - services = [] - fields = [] - subs = [] - values = [] - - plt.ion() - fig, ax = plt.subplots() - #fig = plt.figure(figsize=(10, 15)) - #ax = fig.add_subplot(111) - ax.grid(True) - fig.canvas.draw() - - subs_name = sys.argv[1:] - lines = [] - x, y = [], [] - LEN = 500 - - for i, sub in enumerate(subs_name): - sub_split = sub.split(".") - services.append(sub_split[0]) - fields.append(".".join(sub_split[1:])) - subs.append(messaging.sub_sock(sub_split[0], poller)) - - x.append(np.ones(LEN)*np.nan) - y.append(np.ones(LEN)*np.nan) - lines.append(ax.plot(x[i], y[i])[0]) - - for l in lines: - l.set_marker("*") - - cur_t = 0. - ax.legend(subs_name) - ax.set_xlabel('time [s]') - - while 1: - print(1./(time.time() - cur_t)) - cur_t = time.time() - for i, s in enumerate(subs): - msg = messaging.recv_sock(s) - #msg = messaging.recv_one_or_none(s) - if msg is not None: - x[i] = np.append(x[i], getattr(msg, 'logMonoTime') / 1e9) - x[i] = np.delete(x[i], 0) - y[i] = np.append(y[i], recursive_getattr(msg, subs_name[i])) - y[i] = np.delete(y[i], 0) - - lines[i].set_xdata(x[i]) - lines[i].set_ydata(y[i]) - - ax.relim() - ax.autoscale_view(True, scaley=True, scalex=True) - - fig.canvas.blit(ax.bbox) - fig.canvas.flush_events() - - # just a bit of wait to avoid 100% CPU usage - time.sleep(0.001) diff --git a/tools/replay/unlogger.py b/tools/replay/unlogger.py deleted file mode 100755 index ff0ab9bcf2..0000000000 --- a/tools/replay/unlogger.py +++ /dev/null @@ -1,500 +0,0 @@ -#!/usr/bin/env python3 -import argparse -import os -import sys -import zmq -import time -import signal -import multiprocessing -from uuid import uuid4 -from collections import namedtuple -from collections import deque -from datetime import datetime - -from cereal import log as capnp_log -from cereal.services import service_list -from cereal.messaging import pub_sock, MultiplePublishersError -from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module, import-error -from common import realtime -from common.transformations.camera import eon_f_frame_size, tici_f_frame_size - -from tools.lib.kbhit import KBHit -from tools.lib.logreader import MultiLogIterator -from tools.lib.route import Route -from tools.lib.framereader import rgb24toyuv420 -from tools.lib.route_framereader import RouteFrameReader - -# Commands. -SetRoute = namedtuple("SetRoute", ("name", "start_time", "data_dir")) -SeekAbsoluteTime = namedtuple("SeekAbsoluteTime", ("secs",)) -SeekRelativeTime = namedtuple("SeekRelativeTime", ("secs",)) -TogglePause = namedtuple("TogglePause", ()) -StopAndQuit = namedtuple("StopAndQuit", ()) -VIPC_RGB = "rgb" -VIPC_YUV = "yuv" - - -class UnloggerWorker(object): - def __init__(self): - self._frame_reader = None - self._cookie = None - self._readahead = deque() - - def run(self, commands_address, data_address, pub_types): - zmq.Context._instance = None - commands_socket = zmq.Context.instance().socket(zmq.PULL) - commands_socket.connect(commands_address) - - data_socket = zmq.Context.instance().socket(zmq.PUSH) - data_socket.connect(data_address) - - poller = zmq.Poller() - poller.register(commands_socket, zmq.POLLIN) - - # We can't publish frames without roadEncodeIdx, so add when it's missing. - if "roadCameraState" in pub_types: - pub_types["roadEncodeIdx"] = None - - # gc.set_debug(gc.DEBUG_LEAK | gc.DEBUG_OBJECTS | gc.DEBUG_STATS | gc.DEBUG_SAVEALL | - # gc.DEBUG_UNCOLLECTABLE) - - # TODO: WARNING pycapnp leaks memory all over the place after unlogger runs for a while, gc - # pauses become huge because there are so many tracked objects solution will be to switch to new - # cython capnp - try: - route = None - while True: - while poller.poll(0.) or route is None: - cookie, cmd = commands_socket.recv_pyobj() - route = self._process_commands(cmd, route, pub_types) - - # **** get message **** - self._read_logs(cookie, pub_types) - self._send_logs(data_socket) - finally: - if self._frame_reader is not None: - self._frame_reader.close() - data_socket.close() - commands_socket.close() - - def _read_logs(self, cookie, pub_types): - fullHEVC = capnp_log.EncodeIndex.Type.fullHEVC - lr = self._lr - while len(self._readahead) < 1000: - route_time = lr.tell() - msg = next(lr) - typ = msg.which() - if typ not in pub_types: - continue - - # **** special case certain message types **** - if typ == "roadEncodeIdx" and msg.roadEncodeIdx.type == fullHEVC: - # this assumes the roadEncodeIdx always comes before the frame - self._frame_id_lookup[ - msg.roadEncodeIdx.frameId] = msg.roadEncodeIdx.segmentNum, msg.roadEncodeIdx.segmentId - #print "encode", msg.roadEncodeIdx.frameId, len(self._readahead), route_time - self._readahead.appendleft((typ, msg, route_time, cookie)) - - def _send_logs(self, data_socket): - while len(self._readahead) > 500: - typ, msg, route_time, cookie = self._readahead.pop() - smsg = msg.as_builder() - - if typ == "roadCameraState": - frame_id = msg.roadCameraState.frameId - - # Frame exists, make sure we have a framereader. - # load the frame readers as needed - s1 = time.time() - try: - img = self._frame_reader.get(frame_id, pix_fmt="rgb24") - except Exception: - img = None - - fr_time = time.time() - s1 - if fr_time > 0.05: - print("FRAME(%d) LAG -- %.2f ms" % (frame_id, fr_time*1000.0)) - - if img is not None: - - extra = (smsg.roadCameraState.frameId, smsg.roadCameraState.timestampSof, smsg.roadCameraState.timestampEof) - - # send YUV frame - if os.getenv("YUV") is not None: - img_yuv = rgb24toyuv420(img) - data_socket.send_pyobj((cookie, VIPC_YUV, msg.logMonoTime, route_time, extra), flags=zmq.SNDMORE) - data_socket.send(img_yuv.flatten().tobytes(), copy=False) - - img = img[:, :, ::-1] # Convert RGB to BGR, which is what the camera outputs - img = img.flatten() - bts = img.tobytes() - - smsg.roadCameraState.image = bts - - # send RGB frame - data_socket.send_pyobj((cookie, VIPC_RGB, msg.logMonoTime, route_time, extra), flags=zmq.SNDMORE) - data_socket.send(bts, copy=False) - - data_socket.send_pyobj((cookie, typ, msg.logMonoTime, route_time), flags=zmq.SNDMORE) - data_socket.send(smsg.to_bytes(), copy=False) - - def _process_commands(self, cmd, route, pub_types): - seek_to = None - if route is None or (isinstance(cmd, SetRoute) and route.name != cmd.name): - seek_to = cmd.start_time - route = Route(cmd.name, cmd.data_dir) - self._lr = MultiLogIterator(route.log_paths(), wraparound=True) - if self._frame_reader is not None: - self._frame_reader.close() - if "roadCameraState" in pub_types or "roadEncodeIdx" in pub_types: - # reset frames for a route - self._frame_id_lookup = {} - self._frame_reader = RouteFrameReader( - route.camera_paths(), None, self._frame_id_lookup, readahead=True) - - # always reset this on a seek - if isinstance(cmd, SeekRelativeTime): - seek_to = self._lr.tell() + cmd.secs - elif isinstance(cmd, SeekAbsoluteTime): - seek_to = cmd.secs - elif isinstance(cmd, StopAndQuit): - exit() - - if seek_to is not None: - print("seeking", seek_to) - if not self._lr.seek(seek_to): - print("Can't seek: time out of bounds") - else: - next(self._lr) # ignore one - return route - -def _get_address_send_func(address): - sock = pub_sock(address) - return sock.send - -def _get_vipc_server(length): - sizes = {3 * w * h: (w, h) for (w, h) in [tici_f_frame_size, eon_f_frame_size]} # RGB - sizes.update({(3 * w * h) / 2: (w, h) for (w, h) in [tici_f_frame_size, eon_f_frame_size]}) # YUV - - w, h = sizes[length] - - vipc_server = VisionIpcServer("camerad") - vipc_server.create_buffers(VisionStreamType.VISION_STREAM_RGB_BACK, 4, True, w, h) - vipc_server.create_buffers(VisionStreamType.VISION_STREAM_YUV_BACK, 40, False, w, h) - vipc_server.start_listener() - return vipc_server - -def unlogger_thread(command_address, forward_commands_address, data_address, run_realtime, - address_mapping, publish_time_length, bind_early, no_loop, no_visionipc): - # Clear context to avoid problems with multiprocessing. - zmq.Context._instance = None - context = zmq.Context.instance() - - command_sock = context.socket(zmq.PULL) - command_sock.bind(command_address) - - forward_commands_socket = context.socket(zmq.PUSH) - forward_commands_socket.bind(forward_commands_address) - - data_socket = context.socket(zmq.PULL) - data_socket.bind(data_address) - - # Set readahead to a reasonable number. - data_socket.setsockopt(zmq.RCVHWM, 10000) - - poller = zmq.Poller() - poller.register(command_sock, zmq.POLLIN) - poller.register(data_socket, zmq.POLLIN) - - if bind_early: - send_funcs = { - typ: _get_address_send_func(address) - for typ, address in address_mapping.items() - } - - # Give subscribers a chance to connect. - time.sleep(0.1) - else: - send_funcs = {} - - start_time = float("inf") - printed_at = 0 - generation = 0 - paused = False - reset_time = True - prev_msg_time = None - vipc_server = None - - while True: - evts = dict(poller.poll()) - if command_sock in evts: - cmd = command_sock.recv_pyobj() - if isinstance(cmd, TogglePause): - paused = not paused - if paused: - poller.modify(data_socket, 0) - else: - poller.modify(data_socket, zmq.POLLIN) - else: - # Forward the command the the log data thread. - # TODO: Remove everything on data_socket. - generation += 1 - forward_commands_socket.send_pyobj((generation, cmd)) - if isinstance(cmd, StopAndQuit): - return - - reset_time = True - elif data_socket in evts: - msg_generation, typ, msg_time, route_time, *extra = data_socket.recv_pyobj(flags=zmq.RCVMORE) - msg_bytes = data_socket.recv() - if msg_generation < generation: - # Skip packets. - continue - - if no_loop and prev_msg_time is not None and prev_msg_time > msg_time + 1e9: - generation += 1 - forward_commands_socket.send_pyobj((generation, StopAndQuit())) - return - prev_msg_time = msg_time - - msg_time_seconds = msg_time * 1e-9 - if reset_time: - msg_start_time = msg_time_seconds - real_start_time = realtime.sec_since_boot() - start_time = min(start_time, msg_start_time) - reset_time = False - - if publish_time_length and msg_time_seconds - start_time > publish_time_length: - generation += 1 - forward_commands_socket.send_pyobj((generation, StopAndQuit())) - return - - # Print time. - if abs(printed_at - route_time) > 5.: - print("at", route_time) - printed_at = route_time - - if typ not in send_funcs and typ not in [VIPC_RGB, VIPC_YUV]: - if typ in address_mapping: - # Remove so we don't keep printing warnings. - address = address_mapping.pop(typ) - try: - print("binding", typ) - send_funcs[typ] = _get_address_send_func(address) - except Exception as e: - print("couldn't replay {}: {}".format(typ, e)) - continue - else: - # Skip messages that we are not registered to publish. - continue - - # Sleep as needed for real time playback. - if run_realtime: - msg_time_offset = msg_time_seconds - msg_start_time - real_time_offset = realtime.sec_since_boot() - real_start_time - lag = msg_time_offset - real_time_offset - if lag > 0 and lag < 30: # a large jump is OK, likely due to an out of order segment - if lag > 1: - print("sleeping for", lag) - time.sleep(lag) - elif lag < -1: - # Relax the real time schedule when we slip far behind. - reset_time = True - - # Send message. - try: - if typ in [VIPC_RGB, VIPC_YUV]: - if not no_visionipc: - if vipc_server is None: - vipc_server = _get_vipc_server(len(msg_bytes)) - - i, sof, eof = extra[0] - stream = VisionStreamType.VISION_STREAM_RGB_BACK if typ == VIPC_RGB else VisionStreamType.VISION_STREAM_YUV_BACK - vipc_server.send(stream, msg_bytes, i, sof, eof) - else: - send_funcs[typ](msg_bytes) - except MultiplePublishersError: - del send_funcs[typ] - -def timestamp_to_s(tss): - return time.mktime(datetime.strptime(tss, '%Y-%m-%d--%H-%M-%S').timetuple()) - -def absolute_time_str(s, start_time): - try: - # first try if it's a float - return float(s) - except ValueError: - # now see if it's a timestamp - return timestamp_to_s(s) - start_time - -def _get_address_mapping(args): - if args.min is not None: - services_to_mock = [ - 'deviceState', 'can', 'pandaState', 'sensorEvents', 'gpsNMEA', 'roadCameraState', 'roadEncodeIdx', - 'modelV2', 'liveLocation', - ] - elif args.enabled is not None: - services_to_mock = args.enabled - else: - services_to_mock = service_list.keys() - - address_mapping = {service_name: service_name for service_name in services_to_mock} - address_mapping.update(dict(args.address_mapping)) - - for k in args.disabled: - address_mapping.pop(k, None) - - non_services = set(address_mapping) - set(service_list) - if non_services: - print("WARNING: Unknown services {}".format(list(non_services))) - - return address_mapping - -def keyboard_controller_thread(q, route_start_time): - print("keyboard waiting for input") - kb = KBHit() - while 1: - c = kb.getch() - if c == 'm': # Move forward by 1m - q.send_pyobj(SeekRelativeTime(60)) - elif c == 'M': # Move backward by 1m - q.send_pyobj(SeekRelativeTime(-60)) - elif c == 's': # Move forward by 10s - q.send_pyobj(SeekRelativeTime(10)) - elif c == 'S': # Move backward by 10s - q.send_pyobj(SeekRelativeTime(-10)) - elif c == 'G': # Move backward by 10s - q.send_pyobj(SeekAbsoluteTime(0.)) - elif c == "\x20": # Space bar. - q.send_pyobj(TogglePause()) - elif c == "\n": - try: - seek_time_input = input('time: ') - seek_time = absolute_time_str(seek_time_input, route_start_time) - - # If less than 60, assume segment number - if seek_time < 60: - seek_time *= 60 - - q.send_pyobj(SeekAbsoluteTime(seek_time)) - except Exception as e: - print("Time not understood: {}".format(e)) - -def get_arg_parser(): - parser = argparse.ArgumentParser( - description="Mock openpilot components by publishing logged messages.", - formatter_class=argparse.ArgumentDefaultsHelpFormatter) - - parser.add_argument("route_name", type=(lambda x: x.replace("#", "|")), nargs="?", - help="The route whose messages will be published.") - parser.add_argument("data_dir", nargs='?', default=os.getenv('UNLOGGER_DATA_DIR'), - help="Path to directory in which log and camera files are located.") - - parser.add_argument("--no-loop", action="store_true", help="Stop at the end of the replay.") - - def key_value_pair(x): - return x.split("=") - - parser.add_argument("address_mapping", nargs="*", type=key_value_pair, - help="Pairs = to publish on .") - - def comma_list(x): - return x.split(",") - - to_mock_group = parser.add_mutually_exclusive_group() - to_mock_group.add_argument("--min", action="store_true", default=os.getenv("MIN")) - to_mock_group.add_argument("--enabled", default=os.getenv("ENABLED"), type=comma_list) - - parser.add_argument("--disabled", type=comma_list, default=os.getenv("DISABLED") or ()) - - parser.add_argument( - "--tl", dest="publish_time_length", type=float, default=None, - help="Length of interval in event time for which messages should be published.") - - parser.add_argument( - "--no-realtime", dest="realtime", action="store_false", default=True, - help="Publish messages as quickly as possible instead of realtime.") - - parser.add_argument( - "--no-interactive", dest="interactive", action="store_false", default=True, - help="Disable interactivity.") - - parser.add_argument( - "--bind-early", action="store_true", default=False, - help="Bind early to avoid dropping messages.") - - parser.add_argument( - "--no-visionipc", action="store_true", default=False, - help="Do not output video over visionipc") - - parser.add_argument( - "--start-time", type=float, default=0., - help="Seek to this absolute time (in seconds) upon starting playback.") - - return parser - -def main(argv): - args = get_arg_parser().parse_args(sys.argv[1:]) - - command_address = "ipc:///tmp/{}".format(uuid4()) - forward_commands_address = "ipc:///tmp/{}".format(uuid4()) - data_address = "ipc:///tmp/{}".format(uuid4()) - - address_mapping = _get_address_mapping(args) - - command_sock = zmq.Context.instance().socket(zmq.PUSH) - command_sock.connect(command_address) - - if args.route_name is not None: - route_name_split = args.route_name.split("|") - if len(route_name_split) > 1: - route_start_time = timestamp_to_s(route_name_split[1]) - else: - route_start_time = 0 - command_sock.send_pyobj( - SetRoute(args.route_name, args.start_time, args.data_dir)) - else: - print("waiting for external command...") - route_start_time = 0 - - subprocesses = {} - try: - subprocesses["data"] = multiprocessing.Process( - target=UnloggerWorker().run, - args=(forward_commands_address, data_address, address_mapping.copy())) - - subprocesses["control"] = multiprocessing.Process( - target=unlogger_thread, - args=(command_address, forward_commands_address, data_address, args.realtime, - _get_address_mapping(args), args.publish_time_length, args.bind_early, args.no_loop, args.no_visionipc)) - - subprocesses["data"].start() - subprocesses["control"].start() - - # Exit if any of the children die. - def exit_if_children_dead(*_): - for _, p in subprocesses.items(): - if not p.is_alive(): - [p.terminate() for p in subprocesses.values()] - exit() - signal.signal(signal.SIGCHLD, signal.SIG_IGN) - signal.signal(signal.SIGCHLD, exit_if_children_dead) - - if args.interactive: - keyboard_controller_thread(command_sock, route_start_time) - else: - # Wait forever for children. - while True: - time.sleep(10000.) - finally: - for p in subprocesses.values(): - if p.is_alive(): - try: - p.join(3.) - except multiprocessing.TimeoutError: - p.terminate() - continue - return 0 - -if __name__ == "__main__": - sys.exit(main(sys.argv[1:]))