import json import os import pickle import struct import subprocess import threading from enum import IntEnum from functools import wraps import numpy as np from lru import LRU import _io from openpilot.tools.lib.cache import cache_path_for_file_path, DEFAULT_CACHE_DIR from openpilot.tools.lib.exceptions import DataUnreadableError from openpilot.tools.lib.vidindex import hevc_index from openpilot.common.file_helpers import atomic_write_in_dir from openpilot.tools.lib.filereader import FileReader, resolve_name HEVC_SLICE_B = 0 HEVC_SLICE_P = 1 HEVC_SLICE_I = 2 class GOPReader: def get_gop(self, num): # returns (start_frame_num, num_frames, frames_to_skip, gop_data) raise NotImplementedError class DoNothingContextManager: def __enter__(self): return self def __exit__(self, *x): pass class FrameType(IntEnum): raw = 1 h265_stream = 2 def fingerprint_video(fn): with FileReader(fn) as f: header = f.read(4) if len(header) == 0: raise DataUnreadableError(f"{fn} is empty") elif header == b"\x00\xc0\x12\x00": return FrameType.raw elif header == b"\x00\x00\x00\x01": if 'hevc' in fn: return FrameType.h265_stream else: raise NotImplementedError(fn) else: raise NotImplementedError(fn) def ffprobe(fn, fmt=None): fn = resolve_name(fn) cmd = ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams"] if fmt: cmd += ["-f", fmt] cmd += ["-i", "-"] try: with FileReader(fn) as f: ffprobe_output = subprocess.check_output(cmd, input=f.read(4096)) except subprocess.CalledProcessError as e: raise DataUnreadableError(fn) from e return json.loads(ffprobe_output) def cache_fn(func): @wraps(func) def cache_inner(fn, *args, **kwargs): if kwargs.pop('no_cache', None): cache_path = None else: cache_dir = kwargs.pop('cache_dir', DEFAULT_CACHE_DIR) cache_path = cache_path_for_file_path(fn, cache_dir) if cache_path and os.path.exists(cache_path): with open(cache_path, "rb") as cache_file: cache_value = pickle.load(cache_file) else: cache_value = func(fn, *args, **kwargs) if cache_path: with atomic_write_in_dir(cache_path, mode="wb", overwrite=True) as cache_file: pickle.dump(cache_value, cache_file, -1) return cache_value return cache_inner @cache_fn def index_stream(fn, ft): if ft != FrameType.h265_stream: raise NotImplementedError("Only h265 supported") frame_types, dat_len, prefix = hevc_index(fn) index = np.array(frame_types + [(0xFFFFFFFF, dat_len)], dtype=np.uint32) probe = ffprobe(fn, "hevc") return { 'index': index, 'global_prefix': prefix, 'probe': probe } def get_video_index(fn, frame_type, cache_dir=DEFAULT_CACHE_DIR): return index_stream(fn, frame_type, cache_dir=cache_dir) def read_file_check_size(f, sz, cookie): buff = bytearray(sz) bytes_read = f.readinto(buff) assert bytes_read == sz, (bytes_read, sz) return buff def rgb24toyuv(rgb): yuv_from_rgb = np.array([[ 0.299 , 0.587 , 0.114 ], [-0.14714119, -0.28886916, 0.43601035 ], [ 0.61497538, -0.51496512, -0.10001026 ]]) img = np.dot(rgb.reshape(-1, 3), yuv_from_rgb.T).reshape(rgb.shape) ys = img[:, :, 0] us = (img[::2, ::2, 1] + img[1::2, ::2, 1] + img[::2, 1::2, 1] + img[1::2, 1::2, 1]) / 4 + 128 vs = (img[::2, ::2, 2] + img[1::2, ::2, 2] + img[::2, 1::2, 2] + img[1::2, 1::2, 2]) / 4 + 128 return ys, us, vs def rgb24toyuv420(rgb): ys, us, vs = rgb24toyuv(rgb) y_len = rgb.shape[0] * rgb.shape[1] uv_len = y_len // 4 yuv420 = np.empty(y_len + 2 * uv_len, dtype=rgb.dtype) yuv420[:y_len] = ys.reshape(-1) yuv420[y_len:y_len + uv_len] = us.reshape(-1) yuv420[y_len + uv_len:y_len + 2 * uv_len] = vs.reshape(-1) return yuv420.clip(0, 255).astype('uint8') def rgb24tonv12(rgb): ys, us, vs = rgb24toyuv(rgb) y_len = rgb.shape[0] * rgb.shape[1] uv_len = y_len // 4 nv12 = np.empty(y_len + 2 * uv_len, dtype=rgb.dtype) nv12[:y_len] = ys.reshape(-1) nv12[y_len::2] = us.reshape(-1) nv12[y_len+1::2] = vs.reshape(-1) return nv12.clip(0, 255).astype('uint8') def decompress_video_data(rawdat, vid_fmt, w, h, pix_fmt): threads = os.getenv("FFMPEG_THREADS", "0") cuda = os.getenv("FFMPEG_CUDA", "0") == "1" args = ["ffmpeg", "-v", "quiet", "-threads", threads, "-hwaccel", "none" if not cuda else "cuda", "-c:v", "hevc", "-vsync", "0", "-f", vid_fmt, "-flags2", "showall", "-i", "-", "-threads", threads, "-f", "rawvideo", "-pix_fmt", pix_fmt, "-"] dat = subprocess.check_output(args, input=rawdat) if pix_fmt == "rgb24": ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, h, w, 3) elif pix_fmt == "nv12": ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, (h*w*3//2)) elif pix_fmt == "yuv420p": ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, (h*w*3//2)) elif pix_fmt == "yuv444p": ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, 3, h, w) else: raise NotImplementedError return ret class BaseFrameReader: # properties: frame_type, frame_count, w, h def __enter__(self): return self def __exit__(self, *args): self.close() def close(self): pass def get(self, num, count=1, pix_fmt="yuv420p"): raise NotImplementedError def FrameReader(fn, cache_dir=DEFAULT_CACHE_DIR, readahead=False, readbehind=False, index_data=None): frame_type = fingerprint_video(fn) if frame_type == FrameType.raw: return RawFrameReader(fn) elif frame_type in (FrameType.h265_stream,): if not index_data: index_data = get_video_index(fn, frame_type, cache_dir) return StreamFrameReader(fn, frame_type, index_data, readahead=readahead, readbehind=readbehind) else: raise NotImplementedError(frame_type) class RawData: def __init__(self, f): self.f = _io.FileIO(f, 'rb') self.lenn = struct.unpack("I", self.f.read(4))[0] self.count = os.path.getsize(f) / (self.lenn+4) def read(self, i): self.f.seek((self.lenn+4)*i + 4) return self.f.read(self.lenn) class RawFrameReader(BaseFrameReader): def __init__(self, fn): # raw camera self.fn = fn self.frame_type = FrameType.raw self.rawfile = RawData(self.fn) self.frame_count = self.rawfile.count self.w, self.h = 640, 480 def load_and_debayer(self, img): img = np.frombuffer(img, dtype='uint8').reshape(960, 1280) cimg = np.dstack([img[0::2, 1::2], ((img[0::2, 0::2].astype("uint16") + img[1::2, 1::2].astype("uint16")) >> 1).astype("uint8"), img[1::2, 0::2]]) return cimg def get(self, num, count=1, pix_fmt="yuv420p"): assert self.frame_count is not None assert num+count <= self.frame_count if pix_fmt not in ("nv12", "yuv420p", "rgb24"): raise ValueError(f"Unsupported pixel format {pix_fmt!r}") app = [] for i in range(num, num+count): dat = self.rawfile.read(i) rgb_dat = self.load_and_debayer(dat) if pix_fmt == "rgb24": app.append(rgb_dat) elif pix_fmt == "nv12": app.append(rgb24tonv12(rgb_dat)) elif pix_fmt == "yuv420p": app.append(rgb24toyuv420(rgb_dat)) else: raise NotImplementedError return app class VideoStreamDecompressor: def __init__(self, fn, vid_fmt, w, h, pix_fmt): self.fn = fn self.vid_fmt = vid_fmt self.w = w self.h = h self.pix_fmt = pix_fmt if pix_fmt in ("nv12", "yuv420p"): self.out_size = w*h*3//2 # yuv420p elif pix_fmt in ("rgb24", "yuv444p"): self.out_size = w*h*3 else: raise NotImplementedError self.proc = None self.t = threading.Thread(target=self.write_thread) self.t.daemon = True def write_thread(self): try: with FileReader(self.fn) as f: while True: r = f.read(1024*1024) if len(r) == 0: break self.proc.stdin.write(r) except BrokenPipeError: pass finally: self.proc.stdin.close() def read(self): threads = os.getenv("FFMPEG_THREADS", "0") cuda = os.getenv("FFMPEG_CUDA", "0") == "1" cmd = [ "ffmpeg", "-threads", threads, "-hwaccel", "none" if not cuda else "cuda", "-c:v", "hevc", # "-avioflags", "direct", "-analyzeduration", "0", "-probesize", "32", "-flush_packets", "0", # "-fflags", "nobuffer", "-vsync", "0", "-f", self.vid_fmt, "-i", "pipe:0", "-threads", threads, "-f", "rawvideo", "-pix_fmt", self.pix_fmt, "pipe:1" ] self.proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) try: self.t.start() while True: dat = self.proc.stdout.read(self.out_size) if len(dat) == 0: break assert len(dat) == self.out_size if self.pix_fmt == "rgb24": ret = np.frombuffer(dat, dtype=np.uint8).reshape((self.h, self.w, 3)) elif self.pix_fmt == "yuv420p": ret = np.frombuffer(dat, dtype=np.uint8) elif self.pix_fmt == "nv12": ret = np.frombuffer(dat, dtype=np.uint8) elif self.pix_fmt == "yuv444p": ret = np.frombuffer(dat, dtype=np.uint8).reshape((3, self.h, self.w)) else: raise RuntimeError(f"unknown pix_fmt: {self.pix_fmt}") yield ret result_code = self.proc.wait() assert result_code == 0, result_code finally: self.proc.kill() self.t.join() class StreamGOPReader(GOPReader): def __init__(self, fn, frame_type, index_data): assert frame_type == FrameType.h265_stream self.fn = fn self.frame_type = frame_type self.frame_count = None self.w, self.h = None, None self.prefix = None self.index = None self.index = index_data['index'] self.prefix = index_data['global_prefix'] probe = index_data['probe'] self.prefix_frame_data = None self.num_prefix_frames = 0 self.vid_fmt = "hevc" i = 0 while i < self.index.shape[0] and self.index[i, 0] != HEVC_SLICE_I: i += 1 self.first_iframe = i assert self.first_iframe == 0 self.frame_count = len(self.index) - 1 self.w = probe['streams'][0]['width'] self.h = probe['streams'][0]['height'] def _lookup_gop(self, num): frame_b = num while frame_b > 0 and self.index[frame_b, 0] != HEVC_SLICE_I: frame_b -= 1 frame_e = num + 1 while frame_e < (len(self.index) - 1) and self.index[frame_e, 0] != HEVC_SLICE_I: frame_e += 1 offset_b = self.index[frame_b, 1] offset_e = self.index[frame_e, 1] return (frame_b, frame_e, offset_b, offset_e) def get_gop(self, num): frame_b, frame_e, offset_b, offset_e = self._lookup_gop(num) assert frame_b <= num < frame_e num_frames = frame_e - frame_b with FileReader(self.fn) as f: f.seek(offset_b) rawdat = f.read(offset_e - offset_b) if num < self.first_iframe: assert self.prefix_frame_data rawdat = self.prefix_frame_data + rawdat rawdat = self.prefix + rawdat skip_frames = 0 if num < self.first_iframe: skip_frames = self.num_prefix_frames return frame_b, num_frames, skip_frames, rawdat class GOPFrameReader(BaseFrameReader): #FrameReader with caching and readahead for formats that are group-of-picture based def __init__(self, readahead=False, readbehind=False): self.open_ = True self.readahead = readahead self.readbehind = readbehind self.frame_cache = LRU(64) if self.readahead: self.cache_lock = threading.RLock() self.readahead_last = None self.readahead_len = 30 self.readahead_c = threading.Condition() self.readahead_thread = threading.Thread(target=self._readahead_thread) self.readahead_thread.daemon = True self.readahead_thread.start() else: self.cache_lock = DoNothingContextManager() def close(self): if not self.open_: return self.open_ = False if self.readahead: self.readahead_c.acquire() self.readahead_c.notify() self.readahead_c.release() self.readahead_thread.join() def _readahead_thread(self): while True: self.readahead_c.acquire() try: if not self.open_: break self.readahead_c.wait() finally: self.readahead_c.release() if not self.open_: break assert self.readahead_last num, pix_fmt = self.readahead_last if self.readbehind: for k in range(num - 1, max(0, num - self.readahead_len), -1): self._get_one(k, pix_fmt) else: for k in range(num, min(self.frame_count, num + self.readahead_len)): self._get_one(k, pix_fmt) def _get_one(self, num, pix_fmt): assert num < self.frame_count if (num, pix_fmt) in self.frame_cache: return self.frame_cache[(num, pix_fmt)] with self.cache_lock: if (num, pix_fmt) in self.frame_cache: return self.frame_cache[(num, pix_fmt)] frame_b, num_frames, skip_frames, rawdat = self.get_gop(num) ret = decompress_video_data(rawdat, self.vid_fmt, self.w, self.h, pix_fmt) ret = ret[skip_frames:] assert ret.shape[0] == num_frames for i in range(ret.shape[0]): self.frame_cache[(frame_b+i, pix_fmt)] = ret[i] return self.frame_cache[(num, pix_fmt)] def get(self, num, count=1, pix_fmt="yuv420p"): assert self.frame_count is not None if num + count > self.frame_count: raise ValueError(f"{num + count} > {self.frame_count}") if pix_fmt not in ("nv12", "yuv420p", "rgb24", "yuv444p"): raise ValueError(f"Unsupported pixel format {pix_fmt!r}") ret = [self._get_one(num + i, pix_fmt) for i in range(count)] if self.readahead: self.readahead_last = (num+count, pix_fmt) self.readahead_c.acquire() self.readahead_c.notify() self.readahead_c.release() return ret class StreamFrameReader(StreamGOPReader, GOPFrameReader): def __init__(self, fn, frame_type, index_data, readahead=False, readbehind=False): StreamGOPReader.__init__(self, fn, frame_type, index_data) GOPFrameReader.__init__(self, readahead, readbehind) def GOPFrameIterator(gop_reader, pix_fmt): dec = VideoStreamDecompressor(gop_reader.fn, gop_reader.vid_fmt, gop_reader.w, gop_reader.h, pix_fmt) yield from dec.read() def FrameIterator(fn, pix_fmt, **kwargs): fr = FrameReader(fn, **kwargs) if isinstance(fr, GOPReader): yield from GOPFrameIterator(fr, pix_fmt) else: for i in range(fr.frame_count): yield fr.get(i, pix_fmt=pix_fmt)[0] class NumpyFrameReader: def __init__(self, name, w, h, cache_size): self.name = name self.pos = -1 self.frames = None self.w = w self.h = h self.cache_size = cache_size def close(self): pass def get(self, num, count=1, pix_fmt="nv12"): num -= 1 q = num // self.cache_size if q != self.pos: del self.frames self.pos = q self.frames = np.load(f'{self.name}_{self.pos}.npy') return [self.frames[num % self.cache_size]]