From 9726154c527aababe22fdf018ee7b466ce78f425 Mon Sep 17 00:00:00 2001 From: Willem Melching Date: Tue, 9 Feb 2021 19:48:08 +0100 Subject: [PATCH] Unlogger send video over VisionIPC (#20046) * unlogger: flag to send images over vipc * vipc default * add todo * populate frame extra data * multiple frame sizes old-commit-hash: 8b7c5503afbfb18b4f84c98323a6f2d9b2f51777 --- tools/replay/unlogger.py | 44 ++++++++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/tools/replay/unlogger.py b/tools/replay/unlogger.py index 586b71c492..117e69dd6c 100755 --- a/tools/replay/unlogger.py +++ b/tools/replay/unlogger.py @@ -11,13 +11,12 @@ from collections import namedtuple from collections import deque from datetime import datetime -# strat 1: script to copy files -# strat 2: build pip packages around these -# could be its own pip package, which we'd need to build and release from cereal import log as capnp_log from cereal.services import service_list from cereal.messaging import pub_sock, MultiplePublishersError +from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module from common import realtime +from common.transformations.camera import eon_f_frame_size, tici_f_frame_size from tools.lib.kbhit import KBHit from tools.lib.logreader import MultiLogIterator @@ -30,6 +29,7 @@ SeekAbsoluteTime = namedtuple("SeekAbsoluteTime", ("secs",)) SeekRelativeTime = namedtuple("SeekRelativeTime", ("secs",)) TogglePause = namedtuple("TogglePause", ()) StopAndQuit = namedtuple("StopAndQuit", ()) +VIPC_TYP = "vipc" class UnloggerWorker(object): @@ -116,7 +116,13 @@ class UnloggerWorker(object): if img is not None: img = img[:, :, ::-1] # Convert RGB to BGR, which is what the camera outputs img = img.flatten() - smsg.frame.image = img.tobytes() + bts = img.tobytes() + + smsg.frame.image = bts + + extra = (smsg.frame.frameId, smsg.frame.timestampSof, smsg.frame.timestampEof) + data_socket.send_pyobj((cookie, VIPC_TYP, msg.logMonoTime, route_time, extra), flags=zmq.SNDMORE) + data_socket.send(bts, copy=False) data_socket.send_pyobj((cookie, typ, msg.logMonoTime, route_time), flags=zmq.SNDMORE) data_socket.send(smsg.to_bytes(), copy=False) @@ -155,9 +161,16 @@ def _get_address_send_func(address): sock = pub_sock(address) return sock.send +def _get_vipc_server(length): + w, h = {3 * w * h: (w, h) for (w, h) in [tici_f_frame_size, eon_f_frame_size]}[length] + + vipc_server = VisionIpcServer("camerad") + vipc_server.create_buffers(VisionStreamType.VISION_STREAM_RGB_BACK, 4, True, w, h) + vipc_server.start_listener() + return vipc_server def unlogger_thread(command_address, forward_commands_address, data_address, run_realtime, - address_mapping, publish_time_length, bind_early, no_loop): + address_mapping, publish_time_length, bind_early, no_loop, no_visionipc): # Clear context to avoid problems with multiprocessing. zmq.Context._instance = None context = zmq.Context.instance() @@ -195,6 +208,8 @@ def unlogger_thread(command_address, forward_commands_address, data_address, run paused = False reset_time = True prev_msg_time = None + vipc_server = None + while True: evts = dict(poller.poll()) if command_sock in evts: @@ -215,7 +230,7 @@ def unlogger_thread(command_address, forward_commands_address, data_address, run reset_time = True elif data_socket in evts: - msg_generation, typ, msg_time, route_time = data_socket.recv_pyobj(flags=zmq.RCVMORE) + msg_generation, typ, msg_time, route_time, *extra = data_socket.recv_pyobj(flags=zmq.RCVMORE) msg_bytes = data_socket.recv() if msg_generation < generation: # Skip packets. @@ -244,7 +259,7 @@ def unlogger_thread(command_address, forward_commands_address, data_address, run print("at", route_time) printed_at = route_time - if typ not in send_funcs: + if typ not in send_funcs and typ != 'vipc': if typ in address_mapping: # Remove so we don't keep printing warnings. address = address_mapping.pop(typ) @@ -273,7 +288,14 @@ def unlogger_thread(command_address, forward_commands_address, data_address, run # Send message. try: - send_funcs[typ](msg_bytes) + if typ == VIPC_TYP and (not no_visionipc): + if vipc_server is None: + vipc_server = _get_vipc_server(len(msg_bytes)) + + i, sof, eof = extra[0] + vipc_server.send(VisionStreamType.VISION_STREAM_RGB_BACK, msg_bytes, i, sof, eof) + if typ != VIPC_TYP: + send_funcs[typ](msg_bytes) except MultiplePublishersError: del send_funcs[typ] @@ -384,6 +406,10 @@ def get_arg_parser(): "--bind-early", action="store_true", default=False, help="Bind early to avoid dropping messages.") + parser.add_argument( + "--no-visionipc", action="store_true", default=False, + help="Do not output video over visionipc") + parser.add_argument( "--start-time", type=float, default=0., help="Seek to this absolute time (in seconds) upon starting playback.") @@ -423,7 +449,7 @@ def main(argv): subprocesses["control"] = multiprocessing.Process( target=unlogger_thread, args=(command_address, forward_commands_address, data_address, args.realtime, - _get_address_mapping(args), args.publish_time_length, args.bind_early, args.no_loop)) + _get_address_mapping(args), args.publish_time_length, args.bind_early, args.no_loop, args.no_visionipc)) subprocesses["data"].start() subprocesses["control"].start()