|
|
|
@ -2,7 +2,6 @@ |
|
|
|
|
import json |
|
|
|
|
import os |
|
|
|
|
import pickle |
|
|
|
|
import queue |
|
|
|
|
import struct |
|
|
|
|
import subprocess |
|
|
|
|
import tempfile |
|
|
|
@ -326,7 +325,8 @@ class RawFrameReader(BaseFrameReader): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class VideoStreamDecompressor: |
|
|
|
|
def __init__(self, vid_fmt, w, h, pix_fmt): |
|
|
|
|
def __init__(self, fn, vid_fmt, w, h, pix_fmt): |
|
|
|
|
self.fn = fn |
|
|
|
|
self.vid_fmt = vid_fmt |
|
|
|
|
self.w = w |
|
|
|
|
self.h = h |
|
|
|
@ -339,12 +339,26 @@ class VideoStreamDecompressor: |
|
|
|
|
else: |
|
|
|
|
raise NotImplementedError |
|
|
|
|
|
|
|
|
|
self.out_q = queue.Queue() |
|
|
|
|
self.proc = None |
|
|
|
|
self.t = threading.Thread(target=self.write_thread) |
|
|
|
|
self.t.daemon = True |
|
|
|
|
|
|
|
|
|
def write_thread(self): |
|
|
|
|
try: |
|
|
|
|
with FileReader(self.fn) as f: |
|
|
|
|
while True: |
|
|
|
|
r = f.read(1024*1024) |
|
|
|
|
if len(r) == 0: |
|
|
|
|
break |
|
|
|
|
self.proc.stdin.write(r) |
|
|
|
|
finally: |
|
|
|
|
self.proc.stdin.close() |
|
|
|
|
|
|
|
|
|
def read(self): |
|
|
|
|
threads = os.getenv("FFMPEG_THREADS", "0") |
|
|
|
|
cuda = os.getenv("FFMPEG_CUDA", "0") == "1" |
|
|
|
|
self.proc = subprocess.Popen( |
|
|
|
|
["ffmpeg", |
|
|
|
|
cmd = [ |
|
|
|
|
"ffmpeg", |
|
|
|
|
"-threads", threads, |
|
|
|
|
"-hwaccel", "none" if not cuda else "cuda", |
|
|
|
|
"-c:v", "hevc", |
|
|
|
@ -354,39 +368,22 @@ class VideoStreamDecompressor: |
|
|
|
|
"-flush_packets", "0", |
|
|
|
|
# "-fflags", "nobuffer", |
|
|
|
|
"-vsync", "0", |
|
|
|
|
"-f", vid_fmt, |
|
|
|
|
"-f", self.vid_fmt, |
|
|
|
|
"-i", "pipe:0", |
|
|
|
|
"-threads", threads, |
|
|
|
|
"-f", "rawvideo", |
|
|
|
|
"-pix_fmt", pix_fmt, |
|
|
|
|
"pipe:1"], |
|
|
|
|
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=open("/dev/null", "wb")) |
|
|
|
|
"-pix_fmt", self.pix_fmt, |
|
|
|
|
"pipe:1" |
|
|
|
|
] |
|
|
|
|
self.proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) |
|
|
|
|
try: |
|
|
|
|
self.t.start() |
|
|
|
|
|
|
|
|
|
def read_thread(): |
|
|
|
|
while True: |
|
|
|
|
r = self.proc.stdout.read(self.out_size) |
|
|
|
|
if len(r) == 0: |
|
|
|
|
dat = self.proc.stdout.read(self.out_size) |
|
|
|
|
if len(dat) == 0: |
|
|
|
|
break |
|
|
|
|
assert len(r) == self.out_size |
|
|
|
|
self.out_q.put(r) |
|
|
|
|
|
|
|
|
|
self.t = threading.Thread(target=read_thread) |
|
|
|
|
self.t.daemon = True |
|
|
|
|
self.t.start() |
|
|
|
|
|
|
|
|
|
def __enter__(self): |
|
|
|
|
return self |
|
|
|
|
|
|
|
|
|
def __exit__(self, *args): |
|
|
|
|
self.close() |
|
|
|
|
|
|
|
|
|
def write(self, rawdat): |
|
|
|
|
self.proc.stdin.write(rawdat) |
|
|
|
|
self.proc.stdin.flush() |
|
|
|
|
|
|
|
|
|
def read(self): |
|
|
|
|
dat = self.out_q.get(block=True) |
|
|
|
|
|
|
|
|
|
assert len(dat) == self.out_size |
|
|
|
|
if self.pix_fmt == "rgb24": |
|
|
|
|
ret = np.frombuffer(dat, dtype=np.uint8).reshape((self.h, self.w, 3)) |
|
|
|
|
elif self.pix_fmt == "yuv420p": |
|
|
|
@ -395,18 +392,13 @@ class VideoStreamDecompressor: |
|
|
|
|
ret = np.frombuffer(dat, dtype=np.uint8).reshape((3, self.h, self.w)) |
|
|
|
|
else: |
|
|
|
|
assert False |
|
|
|
|
yield ret |
|
|
|
|
|
|
|
|
|
return ret |
|
|
|
|
|
|
|
|
|
def eos(self): |
|
|
|
|
self.proc.stdin.close() |
|
|
|
|
|
|
|
|
|
def close(self): |
|
|
|
|
self.proc.stdin.close() |
|
|
|
|
result_code = self.proc.wait() |
|
|
|
|
assert result_code == 0, result_code |
|
|
|
|
finally: |
|
|
|
|
self.proc.kill() |
|
|
|
|
self.t.join() |
|
|
|
|
self.proc.wait() |
|
|
|
|
assert self.proc.wait() == 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class StreamGOPReader(GOPReader): |
|
|
|
|
def __init__(self, fn, frame_type, index_data): |
|
|
|
@ -579,43 +571,9 @@ class StreamFrameReader(StreamGOPReader, GOPFrameReader): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def GOPFrameIterator(gop_reader, pix_fmt): |
|
|
|
|
# this is really ugly. ill think about how to refactor it when i can think good |
|
|
|
|
|
|
|
|
|
IN_FLIGHT_GOPS = 6 # should be enough that the stream decompressor starts returning data |
|
|
|
|
|
|
|
|
|
with VideoStreamDecompressor(gop_reader.vid_fmt, gop_reader.w, gop_reader.h, pix_fmt) as dec: |
|
|
|
|
read_work = [] |
|
|
|
|
|
|
|
|
|
def readthing(): |
|
|
|
|
# print read_work, dec.out_q.qsize() |
|
|
|
|
outf = dec.read() |
|
|
|
|
read_thing = read_work[0] |
|
|
|
|
if read_thing[0] > 0: |
|
|
|
|
read_thing[0] -= 1 |
|
|
|
|
else: |
|
|
|
|
assert read_thing[1] > 0 |
|
|
|
|
yield outf |
|
|
|
|
read_thing[1] -= 1 |
|
|
|
|
|
|
|
|
|
if read_thing[1] == 0: |
|
|
|
|
read_work.pop(0) |
|
|
|
|
|
|
|
|
|
i = 0 |
|
|
|
|
while i < gop_reader.frame_count: |
|
|
|
|
frame_b, num_frames, skip_frames, gop_data = gop_reader.get_gop(i) |
|
|
|
|
dec.write(gop_data) |
|
|
|
|
i += num_frames |
|
|
|
|
read_work.append([skip_frames, num_frames]) |
|
|
|
|
|
|
|
|
|
while len(read_work) >= IN_FLIGHT_GOPS: |
|
|
|
|
for v in readthing(): |
|
|
|
|
yield v |
|
|
|
|
|
|
|
|
|
dec.eos() |
|
|
|
|
|
|
|
|
|
while read_work: |
|
|
|
|
for v in readthing(): |
|
|
|
|
yield v |
|
|
|
|
dec = VideoStreamDecompressor(gop_reader.fn, gop_reader.vid_fmt, gop_reader.w, gop_reader.h, pix_fmt) |
|
|
|
|
for frame in dec.read(): |
|
|
|
|
yield frame |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def FrameIterator(fn, pix_fmt, **kwargs): |
|
|
|
|