update compressed vipc to function on packets

pull/24477/head
George Hotz 3 years ago
parent 4354f7cd28
commit dded24662f
  1. 123
      tools/camerastream/compressed_vipc.py

@ -1,120 +1,101 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
import sys import sys
import argparse
import numpy as np import numpy as np
import multiprocessing import multiprocessing
import time
import cereal.messaging as messaging
from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module, import-error from cereal.visionipc.visionipc_pyx import VisionIpcServer, VisionStreamType # pylint: disable=no-name-in-module, import-error
W, H = 1928, 1208
W, H = 1928, 1208
V4L2_BUF_FLAG_KEYFRAME = 8 V4L2_BUF_FLAG_KEYFRAME = 8
def writer(fn, addr, sock_name): def decoder(addr, sock_name, vipc_server, vst, nvidia):
import cereal.messaging as messaging print("start decoder for %s" % sock_name)
fifo_file = open(fn, "wb") if nvidia:
sys.path += os.environ["LD_LIBRARY_PATH"].split(":")
import PyNvCodec as nvc # pylint: disable=import-error
nvDec = nvc.PyNvDecoder(W, H, nvc.PixelFormat.NV12, nvc.CudaVideoCodec.HEVC, 0)
cc1 = nvc.ColorspaceConversionContext(nvc.ColorSpace.BT_709, nvc.ColorRange.JPEG)
conv_yuv = nvc.PySurfaceConverter(W, H, nvc.PixelFormat.NV12, nvc.PixelFormat.YUV420, 0)
nvDwn_yuv = nvc.PySurfaceDownloader(W, H, nvc.PixelFormat.YUV420, 0)
img_yuv = np.ndarray((H*W//2*3), dtype=np.uint8)
else:
import av # pylint: disable=import-error
codec = av.CodecContext.create("hevc", "r")
os.environ["ZMQ"] = "1" os.environ["ZMQ"] = "1"
messaging.context = messaging.Context() messaging.context = messaging.Context()
sock = messaging.sub_sock(sock_name, None, addr=addr, conflate=False) sock = messaging.sub_sock(sock_name, None, addr=addr, conflate=False)
cnt = 0
last_idx = -1 last_idx = -1
seen_iframe = False seen_iframe = False
time_q = []
while 1: while 1:
msgs = messaging.drain_sock(sock, wait_for_one=True) msgs = messaging.drain_sock(sock, wait_for_one=True)
for evt in msgs: for evt in msgs:
evta = getattr(evt, evt.which()) evta = getattr(evt, evt.which())
lat = ((evt.logMonoTime/1e9) - (evta.idx.timestampEof/1e9))*1000
print("%2d %4d %.3f %.3f latency %.2f ms" % (len(msgs), evta.idx.encodeId, evt.logMonoTime/1e9, evta.idx.timestampEof/1e6, lat), len(evta.data), sock_name)
if evta.idx.encodeId != 0 and evta.idx.encodeId != (last_idx+1): if evta.idx.encodeId != 0 and evta.idx.encodeId != (last_idx+1):
print("DROP!") print("DROP PACKET!")
last_idx = evta.idx.encodeId last_idx = evta.idx.encodeId
if evta.idx.flags & V4L2_BUF_FLAG_KEYFRAME: if not seen_iframe and not (evta.idx.flags & V4L2_BUF_FLAG_KEYFRAME):
fifo_file.write(evta.header)
seen_iframe = True
if not seen_iframe:
print("waiting for iframe") print("waiting for iframe")
continue continue
fifo_file.write(evta.data) time_q.append(time.monotonic())
latency = ((evt.logMonoTime/1e9) - (evta.idx.timestampEof/1e9))*1000
FFMPEG_OPTIONS = {"probesize": "32", "flags": "low_delay"} # put in header (first)
if not seen_iframe:
def decoder_nvidia(fn, vipc_server, vst, yuv=True, rgb=False): if nvidia:
sys.path.append("/raid.dell2/PyNvCodec") nvDec.DecodeSurfaceFromPacket(np.frombuffer(evta.header, dtype=np.uint8))
import PyNvCodec as nvc # pylint: disable=import-error else:
decoder = nvc.PyNvDecoder(fn, 0, FFMPEG_OPTIONS) codec.decode(av.packet.Packet(evta.header))
cc1 = nvc.ColorspaceConversionContext(nvc.ColorSpace.BT_709, nvc.ColorRange.JPEG) seen_iframe = True
if rgb:
conv = nvc.PySurfaceConverter(W, H, nvc.PixelFormat.NV12, nvc.PixelFormat.BGR, 0)
nvDwn = nvc.PySurfaceDownloader(W, H, nvc.PixelFormat.BGR, 0)
img = np.ndarray((H,W,3), dtype=np.uint8)
if yuv:
conv_yuv = nvc.PySurfaceConverter(W, H, nvc.PixelFormat.NV12, nvc.PixelFormat.YUV420, 0)
nvDwn_yuv = nvc.PySurfaceDownloader(W, H, nvc.PixelFormat.YUV420, 0)
img_yuv = np.ndarray((H*W//2*3), dtype=np.uint8)
cnt = 0 if nvidia:
while 1: rawSurface = nvDec.DecodeSurfaceFromPacket(np.frombuffer(evta.data, dtype=np.uint8))
rawSurface = decoder.DecodeSingleSurface()
if rawSurface.Empty(): if rawSurface.Empty():
print("DROP SURFACE")
continue continue
if rgb:
convSurface = conv.Execute(rawSurface, cc1)
nvDwn.DownloadSingleSurface(convSurface, img)
vipc_server.send(vst, img.flatten().data, cnt, 0, 0)
if yuv:
convSurface = conv_yuv.Execute(rawSurface, cc1) convSurface = conv_yuv.Execute(rawSurface, cc1)
nvDwn_yuv.DownloadSingleSurface(convSurface, img_yuv) nvDwn_yuv.DownloadSingleSurface(convSurface, img_yuv)
vipc_server.send(vst+3, img_yuv.flatten().data, cnt, 0, 0) else:
cnt += 1 frames = codec.decode(av.packet.Packet(evta.data))
if len(frames) == 0:
print("DROP SURFACE")
continue
assert len(frames) == 1
img_yuv = frames[0].to_ndarray(format=av.video.format.VideoFormat('yuv420p'))
def decoder_ffmpeg(fn, vipc_server, vst, yuv=True, rgb=False): vipc_server.send(vst, img_yuv.flatten().data, cnt, 0, 0)
import av # pylint: disable=import-error
container = av.open(fn, options=FFMPEG_OPTIONS)
cnt = 0
for frame in container.decode(video=0):
if rgb:
img = frame.to_ndarray(format=av.video.format.VideoFormat('bgr24'))
vipc_server.send(vst, img.flatten().data, cnt, 0, 0)
if yuv:
img_yuv = frame.to_ndarray(format=av.video.format.VideoFormat('yuv420p'))
vipc_server.send(vst+3, img_yuv.flatten().data, cnt, 0, 0)
cnt += 1 cnt += 1
import argparse pc_latency = (time.monotonic()-time_q[0])*1000
time_q = time_q[1:]
print("%2d %4d %.3f %.3f latency %6.2fms + %6.2f ms" % (len(msgs), evta.idx.encodeId, evt.logMonoTime/1e9, evta.idx.timestampEof/1e6, latency, pc_latency), len(evta.data), sock_name)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Decode video streams and broacast on VisionIPC') parser = argparse.ArgumentParser(description='Decode video streams and broacast on VisionIPC')
parser.add_argument("addr", help="Address of comma 3") parser.add_argument("addr", help="Address of comma 3")
parser.add_argument('--pipes', action='store_true', help='Only create pipes')
parser.add_argument('--nvidia', action='store_true', help='Use nvidia instead of ffmpeg') parser.add_argument('--nvidia', action='store_true', help='Use nvidia instead of ffmpeg')
parser.add_argument('--rgb', action='store_true', help='Also broadcast RGB')
parser.add_argument("--cams", default="0,1,2", help="Cameras to decode") parser.add_argument("--cams", default="0,1,2", help="Cameras to decode")
args = parser.parse_args() args = parser.parse_args()
all_cams = [ all_cams = [
("roadEncodeData", VisionStreamType.VISION_STREAM_RGB_ROAD), ("roadEncodeData", VisionStreamType.VISION_STREAM_ROAD),
("wideRoadEncodeData", VisionStreamType.VISION_STREAM_RGB_WIDE_ROAD), ("wideRoadEncodeData", VisionStreamType.VISION_STREAM_WIDE_ROAD),
("driverEncodeData", VisionStreamType.VISION_STREAM_RGB_DRIVER), ("driverEncodeData", VisionStreamType.VISION_STREAM_DRIVER),
] ]
cams = dict([all_cams[int(x)] for x in args.cams.split(",")]) cams = dict([all_cams[int(x)] for x in args.cams.split(",")])
vipc_server = VisionIpcServer("camerad") vipc_server = VisionIpcServer("camerad")
for vst in cams.values(): for vst in cams.values():
if args.rgb: vipc_server.create_buffers(vst, 4, False, W, H)
vipc_server.create_buffers(vst, 4, True, W, H)
vipc_server.create_buffers(vst+3, 4, False, W, H)
vipc_server.start_listener() vipc_server.start_listener()
for k,v in cams.items(): for k,v in cams.items():
FIFO_NAME = "/tmp/decodepipe_"+k multiprocessing.Process(target=decoder, args=(args.addr, k, vipc_server, v, args.nvidia)).start()
if os.path.exists(FIFO_NAME):
os.unlink(FIFO_NAME)
os.mkfifo(FIFO_NAME)
multiprocessing.Process(target=writer, args=(FIFO_NAME, sys.argv[1], k)).start()
if args.pipes:
print("connect to", FIFO_NAME)
elif args.nvidia:
multiprocessing.Process(target=decoder_nvidia, args=(FIFO_NAME, vipc_server, v, True, args.rgb)).start()
else:
multiprocessing.Process(target=decoder_ffmpeg, args=(FIFO_NAME, vipc_server, v, True, args.rgb)).start()

Loading…
Cancel
Save