Unlogger send video over VisionIPC (#20046)

* unlogger: flag to send images over vipc

* vipc default

* add todo

* populate frame extra data

* multiple frame sizes
old-commit-hash: 8b7c5503af
commatwo_master
Willem Melching 4 years ago committed by GitHub
parent e26656ddab
commit 9726154c52
  1. 44
      tools/replay/unlogger.py

@ -11,13 +11,12 @@ from collections import namedtuple
from collections import deque
from datetime import datetime
# strat 1: script to copy files
# strat 2: build pip packages around these
# could be its own pip package, which we'd need to build and release
from cereal import log as capnp_log
from cereal.services import service_list
from cereal.messaging import pub_sock, MultiplePublishersError
from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module
from common import realtime
from common.transformations.camera import eon_f_frame_size, tici_f_frame_size
from tools.lib.kbhit import KBHit
from tools.lib.logreader import MultiLogIterator
@ -30,6 +29,7 @@ SeekAbsoluteTime = namedtuple("SeekAbsoluteTime", ("secs",))
SeekRelativeTime = namedtuple("SeekRelativeTime", ("secs",))
TogglePause = namedtuple("TogglePause", ())
StopAndQuit = namedtuple("StopAndQuit", ())
VIPC_TYP = "vipc"
class UnloggerWorker(object):
@ -116,7 +116,13 @@ class UnloggerWorker(object):
if img is not None:
img = img[:, :, ::-1] # Convert RGB to BGR, which is what the camera outputs
img = img.flatten()
smsg.frame.image = img.tobytes()
bts = img.tobytes()
smsg.frame.image = bts
extra = (smsg.frame.frameId, smsg.frame.timestampSof, smsg.frame.timestampEof)
data_socket.send_pyobj((cookie, VIPC_TYP, msg.logMonoTime, route_time, extra), flags=zmq.SNDMORE)
data_socket.send(bts, copy=False)
data_socket.send_pyobj((cookie, typ, msg.logMonoTime, route_time), flags=zmq.SNDMORE)
data_socket.send(smsg.to_bytes(), copy=False)
@ -155,9 +161,16 @@ def _get_address_send_func(address):
sock = pub_sock(address)
return sock.send
def _get_vipc_server(length):
w, h = {3 * w * h: (w, h) for (w, h) in [tici_f_frame_size, eon_f_frame_size]}[length]
vipc_server = VisionIpcServer("camerad")
vipc_server.create_buffers(VisionStreamType.VISION_STREAM_RGB_BACK, 4, True, w, h)
vipc_server.start_listener()
return vipc_server
def unlogger_thread(command_address, forward_commands_address, data_address, run_realtime,
address_mapping, publish_time_length, bind_early, no_loop):
address_mapping, publish_time_length, bind_early, no_loop, no_visionipc):
# Clear context to avoid problems with multiprocessing.
zmq.Context._instance = None
context = zmq.Context.instance()
@ -195,6 +208,8 @@ def unlogger_thread(command_address, forward_commands_address, data_address, run
paused = False
reset_time = True
prev_msg_time = None
vipc_server = None
while True:
evts = dict(poller.poll())
if command_sock in evts:
@ -215,7 +230,7 @@ def unlogger_thread(command_address, forward_commands_address, data_address, run
reset_time = True
elif data_socket in evts:
msg_generation, typ, msg_time, route_time = data_socket.recv_pyobj(flags=zmq.RCVMORE)
msg_generation, typ, msg_time, route_time, *extra = data_socket.recv_pyobj(flags=zmq.RCVMORE)
msg_bytes = data_socket.recv()
if msg_generation < generation:
# Skip packets.
@ -244,7 +259,7 @@ def unlogger_thread(command_address, forward_commands_address, data_address, run
print("at", route_time)
printed_at = route_time
if typ not in send_funcs:
if typ not in send_funcs and typ != 'vipc':
if typ in address_mapping:
# Remove so we don't keep printing warnings.
address = address_mapping.pop(typ)
@ -273,7 +288,14 @@ def unlogger_thread(command_address, forward_commands_address, data_address, run
# Send message.
try:
send_funcs[typ](msg_bytes)
if typ == VIPC_TYP and (not no_visionipc):
if vipc_server is None:
vipc_server = _get_vipc_server(len(msg_bytes))
i, sof, eof = extra[0]
vipc_server.send(VisionStreamType.VISION_STREAM_RGB_BACK, msg_bytes, i, sof, eof)
if typ != VIPC_TYP:
send_funcs[typ](msg_bytes)
except MultiplePublishersError:
del send_funcs[typ]
@ -384,6 +406,10 @@ def get_arg_parser():
"--bind-early", action="store_true", default=False,
help="Bind early to avoid dropping messages.")
parser.add_argument(
"--no-visionipc", action="store_true", default=False,
help="Do not output video over visionipc")
parser.add_argument(
"--start-time", type=float, default=0.,
help="Seek to this absolute time (in seconds) upon starting playback.")
@ -423,7 +449,7 @@ def main(argv):
subprocesses["control"] = multiprocessing.Process(
target=unlogger_thread,
args=(command_address, forward_commands_address, data_address, args.realtime,
_get_address_mapping(args), args.publish_time_length, args.bind_early, args.no_loop))
_get_address_mapping(args), args.publish_time_length, args.bind_early, args.no_loop, args.no_visionipc))
subprocesses["data"].start()
subprocesses["control"].start()

Loading…
Cancel
Save