import os
import sys
import json
import struct
import tempfile
import threading
import xml . etree . ElementTree as ET
import numpy as np
if sys . version_info > = ( 3 , 0 ) :
import queue
import pickle
from io import BytesIO as StringIO
else :
import Queue as queue
import cPickle as pickle
from cStringIO import StringIO
import subprocess
from aenum import Enum
from lru import LRU
from functools import wraps
from tools . lib . cache import cache_path_for_file_path
from tools . lib . exceptions import DataUnreadableError
try :
from xx . chffr . lib . filereader import FileReader
except ImportError :
from tools . lib . filereader import FileReader
from tools . lib . file_helpers import atomic_write_in_dir
from tools . lib . mkvparse import mkvindex
from tools . lib . route import Route
H264_SLICE_P = 0
H264_SLICE_B = 1
H264_SLICE_I = 2
HEVC_SLICE_B = 0
HEVC_SLICE_P = 1
HEVC_SLICE_I = 2
SLICE_I = 2 # hevc and h264 are the same :)
class FrameType ( Enum ) :
raw = 1
h265_stream = 2
h264_mp4 = 3
h264_pstream = 4
ffv1_mkv = 5
ffvhuff_mkv = 6
def fingerprint_video ( fn ) :
with FileReader ( fn ) as f :
header = f . read ( 4 )
if len ( header ) == 0 :
raise DataUnreadableError ( " %s is empty " % fn )
elif header == b " \x00 \xc0 \x12 \x00 " :
return FrameType . raw
elif header == b " \x00 \x00 \x00 \x01 " :
if ' hevc ' in fn :
return FrameType . h265_stream
elif os . path . basename ( fn ) in ( " camera " , " acamera " ) :
return FrameType . h264_pstream
else :
raise NotImplementedError ( fn )
elif header == b " \x00 \x00 \x00 \x1c " :
return FrameType . h264_mp4
elif header == b " \x1a \x45 \xdf \xa3 " :
return FrameType . ffv1_mkv
else :
raise NotImplementedError ( fn )
def ffprobe ( fn , fmt = None ) :
cmd = [ " ffprobe " ,
" -v " , " quiet " ,
" -print_format " , " json " ,
" -show_format " , " -show_streams " ]
if fmt :
cmd + = [ " -f " , fmt ]
cmd + = [ fn ]
try :
ffprobe_output = subprocess . check_output ( cmd )
except subprocess . CalledProcessError as e :
raise DataUnreadableError ( fn )
return json . loads ( ffprobe_output )
def vidindex ( fn , typ ) :
vidindex_dir = os . path . join ( os . path . dirname ( os . path . realpath ( __file__ ) ) , " vidindex " )
vidindex = os . path . join ( vidindex_dir , " vidindex " )
subprocess . check_call ( [ " make " ] , cwd = vidindex_dir , stdout = open ( " /dev/null " , " w " ) )
with tempfile . NamedTemporaryFile ( ) as prefix_f , \
tempfile . NamedTemporaryFile ( ) as index_f :
try :
subprocess . check_call ( [ vidindex , typ , fn , prefix_f . name , index_f . name ] )
except subprocess . CalledProcessError as e :
raise DataUnreadableError ( " vidindex failed on file %s " % fn )
with open ( index_f . name , " rb " ) as f :
index = f . read ( )
with open ( prefix_f . name , " rb " ) as f :
prefix = f . read ( )
index = np . frombuffer ( index , np . uint32 ) . reshape ( - 1 , 2 )
assert index [ - 1 , 0 ] == 0xFFFFFFFF
assert index [ - 1 , 1 ] == os . path . getsize ( fn )
return index , prefix
def cache_fn ( func ) :
@wraps ( func )
def cache_inner ( fn , * args , * * kwargs ) :
if kwargs . pop ( ' no_cache ' , None ) :
cache_path = None
else :
cache_prefix = kwargs . pop ( ' cache_prefix ' , None )
cache_path = cache_path_for_file_path ( fn , cache_prefix )
if cache_path and os . path . exists ( cache_path ) :
with open ( cache_path , " rb " ) as cache_file :
cache_value = pickle . load ( cache_file )
else :
cache_value = func ( fn , * args , * * kwargs )
if cache_path :
with atomic_write_in_dir ( cache_path , mode = " wb " , overwrite = True ) as cache_file :
pickle . dump ( cache_value , cache_file , - 1 )
return cache_value
return cache_inner
@cache_fn
def index_stream ( fn , typ ) :
assert typ in ( " hevc " , " h264 " )
with FileReader ( fn ) as f :
assert os . path . exists ( f . name ) , fn
index , prefix = vidindex ( f . name , typ )
probe = ffprobe ( f . name , typ )
return {
' index ' : index ,
' global_prefix ' : prefix ,
' probe ' : probe
}
@cache_fn
def index_mp4 ( fn ) :
with FileReader ( fn ) as f :
return vidindex_mp4 ( f . name )
@cache_fn
def index_mkv ( fn ) :
with FileReader ( fn ) as f :
probe = ffprobe ( f . name , " matroska " )
with open ( f . name , " rb " ) as d_f :
config_record , index = mkvindex . mkvindex ( d_f )
return {
' probe ' : probe ,
' config_record ' : config_record ,
' index ' : index
}
def index_videos ( camera_paths , cache_prefix = None ) :
""" Requires that paths in camera_paths are contiguous and of the same type. """
if len ( camera_paths ) < 1 :
raise ValueError ( " must provide at least one video to index " )
frame_type = fingerprint_video ( camera_paths [ 0 ] )
if frame_type == FrameType . h264_pstream :
index_pstream ( camera_paths , " h264 " , cache_prefix )
else :
for fn in camera_paths :
index_video ( fn , frame_type , cache_prefix )
def index_video ( fn , frame_type = None , cache_prefix = None ) :
cache_path = cache_path_for_file_path ( fn , cache_prefix )
if os . path . exists ( cache_path ) :
return
if frame_type is None :
frame_type = fingerprint_video ( fn [ 0 ] )
if frame_type == FrameType . h264_pstream :
#hack: try to index the whole route now
route = Route . from_file_path ( fn )
camera_paths = route . camera_paths ( )
if fn not in camera_paths :
raise DataUnreadableError ( " Not a contiguous route camera file: {} " . format ( fn ) )
print ( " no pstream cache for %s , indexing route %s now " % ( fn , route . name ) )
index_pstream ( route . camera_paths ( ) , " h264 " , cache_prefix )
elif frame_type == FrameType . h265_stream :
index_stream ( fn , " hevc " , cache_prefix = cache_prefix )
elif frame_type == FrameType . h264_mp4 :
index_mp4 ( fn , cache_prefix = cache_prefix )
def get_video_index ( fn , frame_type , cache_prefix = None ) :
cache_path = cache_path_for_file_path ( fn , cache_prefix )
if not os . path . exists ( cache_path ) :
index_video ( fn , frame_type , cache_prefix )
if not os . path . exists ( cache_path ) :
return None
with open ( cache_path , " rb " ) as cache_file :
return pickle . load ( cache_file )
def pstream_predecompress ( fns , probe , indexes , global_prefix , cache_prefix , multithreaded = False ) :
assert len ( fns ) == len ( indexes )
out_fns = [ cache_path_for_file_path ( fn , cache_prefix , extension = " .predecom.mkv " ) for fn in fns ]
out_exists = map ( os . path . exists , out_fns )
if all ( out_exists ) :
return
w = probe [ ' streams ' ] [ 0 ] [ ' width ' ]
h = probe [ ' streams ' ] [ 0 ] [ ' height ' ]
frame_size = w * h * 3 / 2 # yuv420p
decompress_proc = subprocess . Popen (
[ " ffmpeg " ,
" -threads " , " 0 " if multithreaded else " 1 " ,
" -vsync " , " 0 " ,
" -f " , " h264 " ,
" -i " , " pipe:0 " ,
" -threads " , " 0 " if multithreaded else " 1 " ,
" -f " , " rawvideo " ,
" -pix_fmt " , " yuv420p " ,
" pipe:1 " ] ,
stdin = subprocess . PIPE , stdout = subprocess . PIPE , stderr = open ( " /dev/null " , " wb " ) )
def write_thread ( ) :
for fn in fns :
with FileReader ( fn ) as f :
decompress_proc . stdin . write ( f . read ( ) )
decompress_proc . stdin . close ( )
def read_frame ( ) :
frame = None
try :
frame = decompress_proc . stdout . read ( frame_size )
except ( IOError , ValueError ) :
pass
if frame is None or frame == " " or len ( frame ) != frame_size :
raise DataUnreadableError ( " pre-decompression failed for %s " % fn )
return frame
t = threading . Thread ( target = write_thread )
t . daemon = True
t . start ( )
try :
for fn , out_fn , out_exist , index in zip ( fns , out_fns , out_exists , indexes ) :
if out_exist :
for fi in range ( index . shape [ 0 ] - 1 ) :
read_frame ( )
continue
with atomic_write_in_dir ( out_fn , mode = " w+b " , overwrite = True ) as out_tmp :
compress_proc = subprocess . Popen (
[ " ffmpeg " ,
" -threads " , " 0 " if multithreaded else " 1 " ,
" -y " ,
" -vsync " , " 0 " ,
" -f " , " rawvideo " ,
" -pix_fmt " , " yuv420p " ,
" -s " , " %d x %d " % ( w , h ) ,
" -i " , " pipe:0 " ,
" -threads " , " 0 " if multithreaded else " 1 " ,
" -f " , " matroska " ,
" -vcodec " , " ffv1 " ,
" -g " , " 0 " ,
out_tmp . name ] ,
stdin = subprocess . PIPE , stderr = open ( " /dev/null " , " wb " ) )
try :
for fi in range ( index . shape [ 0 ] - 1 ) :
frame = read_frame ( )
compress_proc . stdin . write ( frame )
compress_proc . stdin . close ( )
except :
compress_proc . kill ( )
raise
assert compress_proc . wait ( ) == 0
cache_path = cache_path_for_file_path ( fn , cache_prefix )
with atomic_write_in_dir ( cache_path , mode = " wb " , overwrite = True ) as cache_file :
pickle . dump ( {
' predecom ' : os . path . basename ( out_fn ) ,
' index ' : index ,
' probe ' : probe ,
' global_prefix ' : global_prefix ,
} , cache_file , - 1 )
except :
decompress_proc . kill ( )
raise
finally :
t . join ( )
rc = decompress_proc . wait ( )
if rc != 0 :
raise DataUnreadableError ( fns [ 0 ] )
def index_pstream ( fns , typ , cache_prefix = None ) :
if typ != " h264 " :
raise NotImplementedError ( typ )
if not fns :
raise DataUnreadableError ( " chffr h264 requires contiguous files " )
out_fns = [ cache_path_for_file_path ( fn , cache_prefix ) for fn in fns ]
out_exists = map ( os . path . exists , out_fns )
if all ( out_exists ) : return
# load existing index files to avoid re-doing work
existing_indexes = [ ]
for out_fn , exists in zip ( out_fns , out_exists ) :
existing = None
if exists :
with open ( out_fn , " rb " ) as cache_file :
existing = pickle . load ( cache_file )
existing_indexes . append ( existing )
# probe the first file
if existing_indexes [ 0 ] :
probe = existing_indexes [ 0 ] [ ' probe ' ]
else :
with FileReader ( fns [ 0 ] ) as f :
probe = ffprobe ( f . name , typ )
global_prefix = None
# get the video index of all the segments in this stream
indexes = [ ]
for i , fn in enumerate ( fns ) :
if existing_indexes [ i ] :
index = existing_indexes [ i ] [ ' index ' ]
prefix = existing_indexes [ i ] [ ' global_prefix ' ]
else :
with FileReader ( fn ) as f :
index , prefix = vidindex ( f . name , typ )
if i == 0 :
# assert prefix
if not prefix :
raise DataUnreadableError ( " vidindex failed for %s " % fn )
global_prefix = prefix
indexes . append ( index )
assert global_prefix
if np . sum ( indexes [ 0 ] [ : , 0 ] == H264_SLICE_I ) < = 1 :
print ( " pstream %s is unseekable. pre-decompressing all the segments... " % ( fns [ 0 ] ) )
pstream_predecompress ( fns , probe , indexes , global_prefix , cache_prefix )
return
# generate what's required to make each segment self-contained
# (the partial GOP from the end of each segments are put asside to add
# to the start of the following segment)
prefix_data = [ " " for _ in fns ]
prefix_index = [ [ ] for _ in fns ]
for i in range ( len ( fns ) - 1 ) :
if indexes [ i + 1 ] [ 0 , 0 ] == H264_SLICE_I and indexes [ i + 1 ] [ 0 , 1 ] < = 1 :
# next file happens to start with a i-frame, dont need use this file's end
continue
index = indexes [ i ]
if i == 0 and np . sum ( index [ : , 0 ] == H264_SLICE_I ) < = 1 :
raise NotImplementedError ( " No I-frames in pstream. " )
# find the last GOP in the index
frame_b = len ( index ) - 1
while frame_b > 0 and index [ frame_b , 0 ] != H264_SLICE_I :
frame_b - = 1
assert frame_b > = 0
assert index [ frame_b , 0 ] == H264_SLICE_I
end_len = len ( index ) - frame_b
with FileReader ( fns [ i ] ) as vid :
vid . seek ( index [ frame_b , 1 ] )
end_data = vid . read ( )
prefix_data [ i + 1 ] = end_data
prefix_index [ i + 1 ] = index [ frame_b : - 1 ]
# indexes[i] = index[:frame_b]
for i , fn in enumerate ( fns ) :
cache_path = out_fns [ i ]
if os . path . exists ( cache_path ) :
continue
segment_index = {
' index ' : indexes [ i ] ,
' global_prefix ' : global_prefix ,
' probe ' : probe ,
' prefix_frame_data ' : prefix_data [ i ] , # data to prefix the first GOP with
' num_prefix_frames ' : len ( prefix_index [ i ] ) , # number of frames to skip in the first GOP
}
with atomic_write_in_dir ( cache_path , mode = " wb " , overwrite = True ) as cache_file :
pickle . dump ( segment_index , cache_file , - 1 )
def read_file_check_size ( f , sz , cookie ) :
buff = bytearray ( sz )
bytes_read = f . readinto ( buff )
assert bytes_read == sz , ( bytes_read , sz )
return buff
import signal
import ctypes
def _set_pdeathsig ( sig = signal . SIGTERM ) :
def f ( ) :
libc = ctypes . CDLL ( ' libc.so.6 ' )
return libc . prctl ( 1 , sig )
return f
def vidindex_mp4 ( fn ) :
try :
xmls = subprocess . check_output ( [ " MP4Box " , fn , " -diso " , " -out " , " /dev/stdout " ] )
except subprocess . CalledProcessError as e :
raise DataUnreadableError ( fn )
tree = ET . fromstring ( xmls )
def parse_content ( s ) :
assert s . startswith ( " data:application/octet-string, " )
return s [ len ( " data:application/octet-string, " ) : ] . decode ( " hex " )
avc_element = tree . find ( " .//AVCSampleEntryBox " )
width = int ( avc_element . attrib [ ' Width ' ] )
height = int ( avc_element . attrib [ ' Height ' ] )
sps_element = avc_element . find ( " .//AVCDecoderConfigurationRecord/SequenceParameterSet " )
pps_element = avc_element . find ( " .//AVCDecoderConfigurationRecord/PictureParameterSet " )
sps = parse_content ( sps_element . attrib [ ' content ' ] )
pps = parse_content ( pps_element . attrib [ ' content ' ] )
media_header = tree . find ( " MovieBox/TrackBox/MediaBox/MediaHeaderBox " )
time_scale = int ( media_header . attrib [ ' TimeScale ' ] )
sample_sizes = [
int ( entry . attrib [ ' Size ' ] ) for entry in tree . findall (
" MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/SampleSizeBox/SampleSizeEntry " )
]
sample_dependency = [
entry . attrib [ ' dependsOnOther ' ] == " yes " for entry in tree . findall (
" MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/SampleDependencyTypeBox/SampleDependencyEntry " )
]
assert len ( sample_sizes ) == len ( sample_dependency )
chunk_offsets = [
int ( entry . attrib [ ' offset ' ] ) for entry in tree . findall (
" MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/ChunkOffsetBox/ChunkEntry " )
]
sample_chunk_table = [
( int ( entry . attrib [ ' FirstChunk ' ] ) - 1 , int ( entry . attrib [ ' SamplesPerChunk ' ] ) ) for entry in tree . findall (
" MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/SampleToChunkBox/SampleToChunkEntry " )
]
sample_offsets = [ None for _ in sample_sizes ]
sample_i = 0
for i , ( first_chunk , samples_per_chunk ) in enumerate ( sample_chunk_table ) :
if i == len ( sample_chunk_table ) - 1 :
last_chunk = len ( chunk_offsets ) - 1
else :
last_chunk = sample_chunk_table [ i + 1 ] [ 0 ] - 1
for k in range ( first_chunk , last_chunk + 1 ) :
sample_offset = chunk_offsets [ k ]
for _ in range ( samples_per_chunk ) :
sample_offsets [ sample_i ] = sample_offset
sample_offset + = sample_sizes [ sample_i ]
sample_i + = 1
assert sample_i == len ( sample_sizes )
pts_offset_table = [
( int ( entry . attrib [ ' CompositionOffset ' ] ) , int ( entry . attrib [ ' SampleCount ' ] ) ) for entry in tree . findall (
" MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/CompositionOffsetBox/CompositionOffsetEntry " )
]
sample_pts_offset = [ 0 for _ in sample_sizes ]
sample_i = 0
for dt , count in pts_offset_table :
for _ in range ( count ) :
sample_pts_offset [ sample_i ] = dt
sample_i + = 1
sample_time_table = [
( int ( entry . attrib [ ' SampleDelta ' ] ) , int ( entry . attrib [ ' SampleCount ' ] ) ) for entry in tree . findall (
" MovieBox/TrackBox/MediaBox/MediaInformationBox/SampleTableBox/TimeToSampleBox/TimeToSampleEntry " )
]
sample_time = [ None for _ in sample_sizes ]
cur_ts = 0
sample_i = 0
for dt , count in sample_time_table :
for _ in range ( count ) :
sample_time [ sample_i ] = ( cur_ts + sample_pts_offset [ sample_i ] ) * 1000 / time_scale
cur_ts + = dt
sample_i + = 1
sample_time . sort ( ) # because we ony decode GOPs in PTS order
return {
' width ' : width ,
' height ' : height ,
' sample_offsets ' : sample_offsets ,
' sample_sizes ' : sample_sizes ,
' sample_dependency ' : sample_dependency ,
' sample_time ' : sample_time ,
' sps ' : sps ,
' pps ' : pps
}
class BaseFrameReader ( object ) :
# properties: frame_type, frame_count, w, h
def __enter__ ( self ) :
return self
def __exit__ ( self , * args ) :
self . close ( )
def close ( self ) :
pass
def get ( self , num , count = 1 , pix_fmt = " yuv420p " ) :
raise NotImplementedError
def FrameReader ( fn , cache_prefix = None , readahead = False , readbehind = False , multithreaded = True , index_data = None ) :
frame_type = fingerprint_video ( fn )
if frame_type == FrameType . raw :
return RawFrameReader ( fn )
elif frame_type in ( FrameType . h265_stream , FrameType . h264_pstream ) :
if not index_data :
index_data = get_video_index ( fn , frame_type , cache_prefix )
if index_data is not None and " predecom " in index_data :
cache_path = cache_path_for_file_path ( fn , cache_prefix )
return MKVFrameReader (
os . path . join ( os . path . dirname ( cache_path ) , index_data [ " predecom " ] ) )
else :
return StreamFrameReader ( fn , frame_type , index_data ,
readahead = readahead , readbehind = readbehind , multithreaded = multithreaded )
elif frame_type == FrameType . h264_mp4 :
return MP4FrameReader ( fn , readahead = readahead )
elif frame_type == FrameType . ffv1_mkv :
return MKVFrameReader ( fn )
else :
raise NotImplementedError ( frame_type )
def rgb24toyuv420 ( rgb ) :
yuv_from_rgb = np . array ( [ [ 0.299 , 0.587 , 0.114 ] ,
[ - 0.14714119 , - 0.28886916 , 0.43601035 ] ,
[ 0.61497538 , - 0.51496512 , - 0.10001026 ] ] )
img = np . dot ( rgb . reshape ( - 1 , 3 ) , yuv_from_rgb . T ) . reshape ( rgb . shape )
y_len = img . shape [ 0 ] * img . shape [ 1 ]
uv_len = y_len / 4
ys = img [ : , : , 0 ]
us = ( img [ : : 2 , : : 2 , 1 ] + img [ 1 : : 2 , : : 2 , 1 ] + img [ : : 2 , 1 : : 2 , 1 ] + img [ 1 : : 2 , 1 : : 2 , 1 ] ) / 4 + 128
vs = ( img [ : : 2 , : : 2 , 2 ] + img [ 1 : : 2 , : : 2 , 2 ] + img [ : : 2 , 1 : : 2 , 2 ] + img [ 1 : : 2 , 1 : : 2 , 2 ] ) / 4 + 128
yuv420 = np . empty ( y_len + 2 * uv_len , dtype = img . dtype )
yuv420 [ : y_len ] = ys . reshape ( - 1 )
yuv420 [ y_len : y_len + uv_len ] = us . reshape ( - 1 )
yuv420 [ y_len + uv_len : y_len + 2 * uv_len ] = vs . reshape ( - 1 )
return yuv420 . clip ( 0 , 255 ) . astype ( ' uint8 ' )
class RawData ( object ) :
def __init__ ( self , f ) :
self . f = _io . FileIO ( f , ' rb ' )
self . lenn = struct . unpack ( " I " , self . f . read ( 4 ) ) [ 0 ]
self . count = os . path . getsize ( f ) / ( self . lenn + 4 )
def read ( self , i ) :
self . f . seek ( ( self . lenn + 4 ) * i + 4 )
return self . f . read ( self . lenn )
class RawFrameReader ( BaseFrameReader ) :
def __init__ ( self , fn ) :
# raw camera
self . fn = fn
self . frame_type = FrameType . raw
self . rawfile = RawData ( self . fn )
self . frame_count = self . rawfile . count
self . w , self . h = 640 , 480
def load_and_debayer ( self , img ) :
img = np . frombuffer ( img , dtype = ' uint8 ' ) . reshape ( 960 , 1280 )
cimg = np . dstack ( [ img [ 0 : : 2 , 1 : : 2 ] , (
( img [ 0 : : 2 , 0 : : 2 ] . astype ( " uint16 " ) + img [ 1 : : 2 , 1 : : 2 ] . astype ( " uint16 " ) )
>> 1 ) . astype ( " uint8 " ) , img [ 1 : : 2 , 0 : : 2 ] ] )
return cimg
def get ( self , num , count = 1 , pix_fmt = " yuv420p " ) :
assert self . frame_count is not None
assert num + count < = self . frame_count
if pix_fmt not in ( " yuv420p " , " rgb24 " ) :
raise ValueError ( " Unsupported pixel format %r " % pix_fmt )
app = [ ]
for i in range ( num , num + count ) :
dat = self . rawfile . read ( i )
rgb_dat = self . load_and_debayer ( dat )
if pix_fmt == " rgb24 " :
app . append ( rgb_dat )
elif pix_fmt == " yuv420p " :
app . append ( rgb24toyuv420 ( rgb_dat ) )
else :
raise NotImplementedError
return app
def decompress_video_data ( rawdat , vid_fmt , w , h , pix_fmt , multithreaded = False ) :
# using a tempfile is much faster than proc.communicate for some reason
with tempfile . TemporaryFile ( ) as tmpf :
tmpf . write ( rawdat )
tmpf . seek ( 0 )
proc = subprocess . Popen (
[ " ffmpeg " ,
" -threads " , " 0 " if multithreaded else " 1 " ,
" -vsync " , " 0 " ,
" -f " , vid_fmt ,
" -flags2 " , " showall " ,
" -i " , " pipe:0 " ,
" -threads " , " 0 " if multithreaded else " 1 " ,
" -f " , " rawvideo " ,
" -pix_fmt " , pix_fmt ,
" pipe:1 " ] ,
stdin = tmpf , stdout = subprocess . PIPE , stderr = open ( " /dev/null " ) )
# dat = proc.communicate()[0]
dat = proc . stdout . read ( )
if proc . wait ( ) != 0 :
raise DataUnreadableError ( " ffmpeg failed " )
if pix_fmt == " rgb24 " :
ret = np . frombuffer ( dat , dtype = np . uint8 ) . reshape ( - 1 , h , w , 3 )
elif pix_fmt == " yuv420p " :
ret = np . frombuffer ( dat , dtype = np . uint8 ) . reshape ( - 1 , ( h * w * 3 / / 2 ) )
elif pix_fmt == " yuv444p " :
ret = np . frombuffer ( dat , dtype = np . uint8 ) . reshape ( - 1 , 3 , h , w )
else :
raise NotImplementedError
return ret
class VideoStreamDecompressor ( object ) :
def __init__ ( self , vid_fmt , w , h , pix_fmt , multithreaded = False ) :
self . vid_fmt = vid_fmt
self . w = w
self . h = h
self . pix_fmt = pix_fmt
if pix_fmt == " yuv420p " :
self . out_size = w * h * 3 / / 2 # yuv420p
elif pix_fmt in ( " rgb24 " , " yuv444p " ) :
self . out_size = w * h * 3
else :
raise NotImplementedError
self . out_q = queue . Queue ( )
self . proc = subprocess . Popen (
[ " ffmpeg " ,
" -threads " , " 0 " if multithreaded else " 1 " ,
# "-avioflags", "direct",
" -analyzeduration " , " 0 " ,
" -probesize " , " 32 " ,
" -flush_packets " , " 0 " ,
# "-fflags", "nobuffer",
" -vsync " , " 0 " ,
" -f " , vid_fmt ,
" -i " , " pipe:0 " ,
" -threads " , " 0 " if multithreaded else " 1 " ,
" -f " , " rawvideo " ,
" -pix_fmt " , pix_fmt ,
" pipe:1 " ] ,
stdin = subprocess . PIPE , stdout = subprocess . PIPE , stderr = open ( " /dev/null " , " wb " ) )
def read_thread ( ) :
while True :
r = self . proc . stdout . read ( self . out_size )
if len ( r ) == 0 :
break
assert len ( r ) == self . out_size
self . out_q . put ( r )
self . t = threading . Thread ( target = read_thread )
self . t . daemon = True
self . t . start ( )
def __enter__ ( self ) :
return self
def __exit__ ( self , * args ) :
self . close ( )
def write ( self , rawdat ) :
self . proc . stdin . write ( rawdat )
self . proc . stdin . flush ( )
def read ( self ) :
dat = self . out_q . get ( block = True )
if self . pix_fmt == " rgb24 " :
ret = np . frombuffer ( dat , dtype = np . uint8 ) . reshape ( ( self . h , self . w , 3 ) )
elif self . pix_fmt == " yuv420p " :
ret = np . frombuffer ( dat , dtype = np . uint8 )
elif self . pix_fmt == " yuv444p " :
ret = np . frombuffer ( dat , dtype = np . uint8 ) . reshape ( ( 3 , self . h , self . w ) )
else :
assert False
return ret
def eos ( self ) :
self . proc . stdin . close ( )
def close ( self ) :
self . proc . stdin . close ( )
self . t . join ( )
self . proc . wait ( )
assert self . proc . wait ( ) == 0
class MKVFrameReader ( BaseFrameReader ) :
def __init__ ( self , fn ) :
self . fn = fn
#print("MKVFrameReader", fn)
index_data = index_mkv ( fn )
stream = index_data [ ' probe ' ] [ ' streams ' ] [ 0 ]
self . w = stream [ ' width ' ]
self . h = stream [ ' height ' ]
if stream [ ' codec_name ' ] == ' ffv1 ' :
self . frame_type = FrameType . ffv1_mkv
elif stream [ ' codec_name ' ] == ' ffvhuff ' :
self . frame_type = FrameType . ffvhuff_mkv
else :
raise NotImplementedError
self . config_record = index_data [ ' config_record ' ]
self . index = index_data [ ' index ' ]
self . frame_count = len ( self . index )
def get ( self , num , count = 1 , pix_fmt = " yuv420p " ) :
assert 0 < num + count < = self . frame_count
frame_dats = [ ]
with FileReader ( self . fn ) as f :
for i in range ( num , num + count ) :
pos , length , _ = self . index [ i ]
f . seek ( pos )
frame_dats . append ( f . read ( length ) )
of = StringIO ( )
mkvindex . simple_gen ( of , self . config_record , self . w , self . h , frame_dats )
r = decompress_video_data ( of . getvalue ( ) , " matroska " , self . w , self . h , pix_fmt )
assert len ( r ) == count
return r
class GOPReader ( object ) :
def get_gop ( self , num ) :
# returns (start_frame_num, num_frames, frames_to_skip, gop_data)
raise NotImplementedError
class DoNothingContextManager ( object ) :
def __enter__ ( self ) : return self
def __exit__ ( * x ) : pass
class GOPFrameReader ( BaseFrameReader ) :
#FrameReader with caching and readahead for formats that are group-of-picture based
def __init__ ( self , readahead = False , readbehind = False , multithreaded = True ) :
self . open_ = True
self . multithreaded = multithreaded
self . readahead = readahead
self . readbehind = readbehind
self . frame_cache = LRU ( 64 )
if self . readahead :
self . cache_lock = threading . RLock ( )
self . readahead_last = None
self . readahead_len = 30
self . readahead_c = threading . Condition ( )
self . readahead_thread = threading . Thread ( target = self . _readahead_thread )
self . readahead_thread . daemon = True
self . readahead_thread . start ( )
else :
self . cache_lock = DoNothingContextManager ( )
def close ( self ) :
if not self . open_ :
return
self . open_ = False
if self . readahead :
self . readahead_c . acquire ( )
self . readahead_c . notify ( )
self . readahead_c . release ( )
self . readahead_thread . join ( )
def _readahead_thread ( self ) :
while True :
self . readahead_c . acquire ( )
try :
if not self . open_ :
break
self . readahead_c . wait ( )
finally :
self . readahead_c . release ( )
if not self . open_ :
break
assert self . readahead_last
num , pix_fmt = self . readahead_last
if self . readbehind :
for k in range ( num - 1 , max ( 0 , num - self . readahead_len ) , - 1 ) :
self . _get_one ( k , pix_fmt )
else :
for k in range ( num , min ( self . frame_count , num + self . readahead_len ) ) :
self . _get_one ( k , pix_fmt )
def _get_one ( self , num , pix_fmt ) :
assert num < self . frame_count
if ( num , pix_fmt ) in self . frame_cache :
return self . frame_cache [ ( num , pix_fmt ) ]
with self . cache_lock :
if ( num , pix_fmt ) in self . frame_cache :
return self . frame_cache [ ( num , pix_fmt ) ]
frame_b , num_frames , skip_frames , rawdat = self . get_gop ( num )
ret = decompress_video_data ( rawdat , self . vid_fmt , self . w , self . h , pix_fmt ,
multithreaded = self . multithreaded )
ret = ret [ skip_frames : ]
assert ret . shape [ 0 ] == num_frames
for i in range ( ret . shape [ 0 ] ) :
self . frame_cache [ ( frame_b + i , pix_fmt ) ] = ret [ i ]
return self . frame_cache [ ( num , pix_fmt ) ]
def get ( self , num , count = 1 , pix_fmt = " yuv420p " ) :
assert self . frame_count is not None
if num + count > self . frame_count :
raise ValueError ( " {} > {} " . format ( num + count , self . frame_count ) )
if pix_fmt not in ( " yuv420p " , " rgb24 " , " yuv444p " ) :
raise ValueError ( " Unsupported pixel format %r " % pix_fmt )
ret = [ self . _get_one ( num + i , pix_fmt ) for i in range ( count ) ]
if self . readahead :
self . readahead_last = ( num + count , pix_fmt )
self . readahead_c . acquire ( )
self . readahead_c . notify ( )
self . readahead_c . release ( )
return ret
class MP4GOPReader ( GOPReader ) :
def __init__ ( self , fn ) :
self . fn = fn
self . frame_type = FrameType . h264_mp4
self . index = index_mp4 ( fn )
self . w = self . index [ ' width ' ]
self . h = self . index [ ' height ' ]
self . sample_sizes = self . index [ ' sample_sizes ' ]
self . sample_offsets = self . index [ ' sample_offsets ' ]
self . sample_dependency = self . index [ ' sample_dependency ' ]
self . vid_fmt = " h264 "
self . frame_count = len ( self . sample_sizes )
self . prefix = " \x00 \x00 \x00 \x01 " + self . index [ ' sps ' ] + " \x00 \x00 \x00 \x01 " + self . index [ ' pps ' ]
def _lookup_gop ( self , num ) :
frame_b = num
while frame_b > 0 and self . sample_dependency [ frame_b ] :
frame_b - = 1
frame_e = num + 1
while frame_e < ( len ( self . sample_dependency ) - 1 ) and self . sample_dependency [ frame_e ] :
frame_e + = 1
return ( frame_b , frame_e )
def get_gop ( self , num ) :
frame_b , frame_e = self . _lookup_gop ( num )
assert frame_b < = num < frame_e
num_frames = frame_e - frame_b
with FileReader ( self . fn ) as f :
rawdat = [ ]
sample_i = frame_b
while sample_i < frame_e :
size = self . sample_sizes [ sample_i ]
start_offset = self . sample_offsets [ sample_i ]
# try to read contiguously because a read could actually be a http request
sample_i + = 1
while sample_i < frame_e and size < 10000000 and start_offset + size == self . sample_offsets [ sample_i ] :
size + = self . sample_sizes [ sample_i ]
sample_i + = 1
f . seek ( start_offset )
sampledat = f . read ( size )
# read length-prefixed NALUs and output in Annex-B
i = 0
while i < len ( sampledat ) :
nal_len , = struct . unpack ( " >I " , sampledat [ i : i + 4 ] )
rawdat . append ( " \x00 \x00 \x00 \x01 " + sampledat [ i + 4 : i + 4 + nal_len ] )
i = i + 4 + nal_len
assert i == len ( sampledat )
rawdat = self . prefix + ' ' . join ( rawdat )
return frame_b , num_frames , 0 , rawdat
class MP4FrameReader ( MP4GOPReader , GOPFrameReader ) :
def __init__ ( self , fn , readahead = False ) :
MP4GOPReader . __init__ ( self , fn )
GOPFrameReader . __init__ ( self , readahead )
class StreamGOPReader ( GOPReader ) :
def __init__ ( self , fn , frame_type , index_data ) :
self . fn = fn
self . frame_type = frame_type
self . frame_count = None
self . w , self . h = None , None
self . prefix = None
self . index = None
self . index = index_data [ ' index ' ]
self . prefix = index_data [ ' global_prefix ' ]
probe = index_data [ ' probe ' ]
if self . frame_type == FrameType . h265_stream :
self . prefix_frame_data = None
self . num_prefix_frames = 0
self . vid_fmt = " hevc "
elif self . frame_type == FrameType . h264_pstream :
self . prefix_frame_data = index_data [ ' prefix_frame_data ' ]
self . num_prefix_frames = index_data [ ' num_prefix_frames ' ]
self . vid_fmt = " h264 "
i = 0
while i < self . index . shape [ 0 ] and self . index [ i , 0 ] != SLICE_I :
i + = 1
self . first_iframe = i
if self . frame_type == FrameType . h265_stream :
assert self . first_iframe == 0
self . frame_count = len ( self . index ) - 1
self . w = probe [ ' streams ' ] [ 0 ] [ ' width ' ]
self . h = probe [ ' streams ' ] [ 0 ] [ ' height ' ]
def _lookup_gop ( self , num ) :
frame_b = num
while frame_b > 0 and self . index [ frame_b , 0 ] != SLICE_I :
frame_b - = 1
frame_e = num + 1
while frame_e < ( len ( self . index ) - 1 ) and self . index [ frame_e , 0 ] != SLICE_I :
frame_e + = 1
offset_b = self . index [ frame_b , 1 ]
offset_e = self . index [ frame_e , 1 ]
return ( frame_b , frame_e , offset_b , offset_e )
def get_gop ( self , num ) :
frame_b , frame_e , offset_b , offset_e = self . _lookup_gop ( num )
assert frame_b < = num < frame_e
num_frames = frame_e - frame_b
with FileReader ( self . fn ) as f :
f . seek ( offset_b )
rawdat = f . read ( offset_e - offset_b )
if num < self . first_iframe :
assert self . prefix_frame_data
rawdat = self . prefix_frame_data + rawdat
rawdat = self . prefix + rawdat
skip_frames = 0
if num < self . first_iframe :
skip_frames = self . num_prefix_frames
return frame_b , num_frames , skip_frames , rawdat
class StreamFrameReader ( StreamGOPReader , GOPFrameReader ) :
def __init__ ( self , fn , frame_type , index_data , readahead = False , readbehind = False , multithreaded = False ) :
StreamGOPReader . __init__ ( self , fn , frame_type , index_data )
GOPFrameReader . __init__ ( self , readahead , readbehind , multithreaded )
def GOPFrameIterator ( gop_reader , pix_fmt , multithreaded = True ) :
# this is really ugly. ill think about how to refactor it when i can think good
IN_FLIGHT_GOPS = 6 # should be enough that the stream decompressor starts returning data
with VideoStreamDecompressor (
gop_reader . vid_fmt , gop_reader . w , gop_reader . h , pix_fmt , multithreaded ) as dec :
read_work = [ ]
def readthing ( ) :
# print read_work, dec.out_q.qsize()
outf = dec . read ( )
read_thing = read_work [ 0 ]
if read_thing [ 0 ] > 0 :
read_thing [ 0 ] - = 1
else :
assert read_thing [ 1 ] > 0
yield outf
read_thing [ 1 ] - = 1
if read_thing [ 1 ] == 0 :
read_work . pop ( 0 )
i = 0
while i < gop_reader . frame_count :
frame_b , num_frames , skip_frames , gop_data = gop_reader . get_gop ( i )
dec . write ( gop_data )
i + = num_frames
read_work . append ( [ skip_frames , num_frames ] )
while len ( read_work ) > = IN_FLIGHT_GOPS :
for v in readthing ( ) : yield v
dec . eos ( )
while read_work :
for v in readthing ( ) : yield v
def FrameIterator ( fn , pix_fmt , * * kwargs ) :
fr = FrameReader ( fn , * * kwargs )
if isinstance ( fr , GOPReader ) :
for v in GOPFrameIterator ( fr , pix_fmt , kwargs . get ( " multithreaded " , True ) ) : yield v
else :
for i in range ( fr . frame_count ) :
yield fr . get ( i , pix_fmt = pix_fmt ) [ 0 ]
def FrameWriter ( ofn , frames , vid_fmt = FrameType . ffvhuff_mkv , pix_fmt = " rgb24 " , framerate = 20 , multithreaded = False ) :
if pix_fmt not in ( " rgb24 " , " yuv420p " ) :
raise NotImplementedError
if vid_fmt == FrameType . ffv1_mkv :
assert ofn . endswith ( " .mkv " )
vcodec = " ffv1 "
elif vid_fmt == FrameType . ffvhuff_mkv :
assert ofn . endswith ( " .mkv " )
vcodec = " ffvhuff "
else :
raise NotImplementedError
frame_gen = iter ( frames )
first_frame = next ( frame_gen )
# assert len(frames) > 1
if pix_fmt == " rgb24 " :
h , w = first_frame . shape [ : 2 ]
elif pix_fmt == " yuv420p " :
w = first_frame . shape [ 1 ]
h = 2 * first_frame . shape [ 0 ] / / 3
else :
raise NotImplementedError
compress_proc = subprocess . Popen (
[ " ffmpeg " ,
" -threads " , " 0 " if multithreaded else " 1 " ,
" -y " ,
" -framerate " , str ( framerate ) ,
" -vsync " , " 0 " ,
" -f " , " rawvideo " ,
" -pix_fmt " , pix_fmt ,
" -s " , " %d x %d " % ( w , h ) ,
" -i " , " pipe:0 " ,
" -threads " , " 0 " if multithreaded else " 1 " ,
" -f " , " matroska " ,
" -vcodec " , vcodec ,
" -g " , " 0 " ,
ofn ] ,
stdin = subprocess . PIPE , stderr = open ( " /dev/null " , " wb " ) )
try :
compress_proc . stdin . write ( first_frame . tobytes ( ) )
for frame in frame_gen :
compress_proc . stdin . write ( frame . tobytes ( ) )
compress_proc . stdin . close ( )
except :
compress_proc . kill ( )
raise
assert compress_proc . wait ( ) == 0
if __name__ == " __main__ " :
fn = " cd:/1c79456b0c90f15a/2017-05-10--08-17-00/2/fcamera.hevc "
f = FrameReader ( fn )
# print f.get(0, 1).shape
# print f.get(15, 1).shape
for v in GOPFrameIterator ( f , " yuv420p " ) :
print ( v )