diff --git a/selfdrive/test/process_replay/model_replay.py b/selfdrive/test/process_replay/model_replay.py index 201baf6b0e..6b89f07448 100755 --- a/selfdrive/test/process_replay/model_replay.py +++ b/selfdrive/test/process_replay/model_replay.py @@ -201,9 +201,9 @@ if __name__ == "__main__": # load logs lr = list(LogReader(get_url(TEST_ROUTE, SEGMENT, "rlog.zst"))) frs = { - 'roadCameraState': FrameReader(get_url(TEST_ROUTE, SEGMENT, "fcamera.hevc"), readahead=True), - 'driverCameraState': FrameReader(get_url(TEST_ROUTE, SEGMENT, "dcamera.hevc"), readahead=True), - 'wideRoadCameraState': FrameReader(get_url(TEST_ROUTE, SEGMENT, "ecamera.hevc"), readahead=True) + 'roadCameraState': FrameReader(get_url(TEST_ROUTE, SEGMENT, "fcamera.hevc"), pix_fmt='nv12'), + 'driverCameraState': FrameReader(get_url(TEST_ROUTE, SEGMENT, "dcamera.hevc"), pix_fmt='nv12'), + 'wideRoadCameraState': FrameReader(get_url(TEST_ROUTE, SEGMENT, "ecamera.hevc"), pix_fmt='nv12') } log_msgs = [] diff --git a/selfdrive/test/process_replay/process_replay.py b/selfdrive/test/process_replay/process_replay.py index 4f8aa9d99d..7b2ecea517 100755 --- a/selfdrive/test/process_replay/process_replay.py +++ b/selfdrive/test/process_replay/process_replay.py @@ -27,7 +27,7 @@ from openpilot.selfdrive.test.process_replay.vision_meta import meta_from_camera from openpilot.selfdrive.test.process_replay.migration import migrate_all from openpilot.selfdrive.test.process_replay.capture import ProcessOutputCapture from openpilot.tools.lib.logreader import LogIterable -from openpilot.tools.lib.framereader import BaseFrameReader +from openpilot.tools.lib.framereader import FrameReader # Numpy gives different results based on CPU features after version 19 NUMPY_TOLERANCE = 1e-7 @@ -209,6 +209,7 @@ class ProcessContainer: streams_metas = available_streams(all_msgs) for meta in streams_metas: if meta.camera_state in self.cfg.vision_pubs: + assert frs[meta.camera_state].pix_fmt == 'nv12' frame_size = (frs[meta.camera_state].w, frs[meta.camera_state].h) vipc_server.create_buffers(meta.stream, 2, *frame_size) vipc_server.start_listener() @@ -224,7 +225,7 @@ class ProcessContainer: def start( self, params_config: dict[str, Any], environ_config: dict[str, Any], - all_msgs: LogIterable, frs: dict[str, BaseFrameReader] | None, + all_msgs: LogIterable, frs: dict[str, FrameReader] | None, fingerprint: str | None, capture_output: bool ): with self.prefix as p: @@ -266,7 +267,7 @@ class ProcessContainer: self.prefix.clean_dirs() self._clean_env() - def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, BaseFrameReader] | None) -> list[capnp._DynamicStructReader]: + def run_step(self, msg: capnp._DynamicStructReader, frs: dict[str, FrameReader] | None) -> list[capnp._DynamicStructReader]: assert self.rc and self.pm and self.sockets and self.process.proc output_msgs = [] @@ -296,7 +297,7 @@ class ProcessContainer: camera_state = getattr(m, m.which()) camera_meta = meta_from_camera_state(m.which()) assert frs is not None - img = frs[m.which()].get(camera_state.frameId, pix_fmt="nv12")[0] + img = frs[m.which()].get(camera_state.frameId)[0] self.vipc_server.send(camera_meta.stream, img.flatten().tobytes(), camera_state.frameId, camera_state.timestampSof, camera_state.timestampEof) self.msg_queue = [] @@ -652,7 +653,7 @@ def replay_process_with_name(name: str | Iterable[str], lr: LogIterable, *args, def replay_process( - cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, BaseFrameReader] = None, + cfg: ProcessConfig | Iterable[ProcessConfig], lr: LogIterable, frs: dict[str, FrameReader] = None, fingerprint: str = None, return_all_logs: bool = False, custom_params: dict[str, Any] = None, captured_output_store: dict[str, dict[str, str]] = None, disable_progress: bool = False ) -> list[capnp._DynamicStructReader]: @@ -680,7 +681,7 @@ def replay_process( def _replay_multi_process( - cfgs: list[ProcessConfig], lr: LogIterable, frs: dict[str, BaseFrameReader] | None, fingerprint: str | None, + cfgs: list[ProcessConfig], lr: LogIterable, frs: dict[str, FrameReader] | None, fingerprint: str | None, custom_params: dict[str, Any] | None, captured_output_store: dict[str, dict[str, str]] | None, disable_progress: bool ) -> list[capnp._DynamicStructReader]: if fingerprint is not None: diff --git a/selfdrive/test/process_replay/regen.py b/selfdrive/test/process_replay/regen.py index 273659c9ff..ec35a5c3ac 100755 --- a/selfdrive/test/process_replay/regen.py +++ b/selfdrive/test/process_replay/regen.py @@ -3,40 +3,17 @@ import os import argparse import time import capnp -import numpy as np from typing import Any from collections.abc import Iterable from openpilot.selfdrive.test.process_replay.process_replay import CONFIGS, FAKEDATA, ProcessConfig, replay_process, get_process_config, \ check_openpilot_enabled, check_most_messages_valid, get_custom_params_from_lr -from openpilot.selfdrive.test.process_replay.vision_meta import DRIVER_CAMERA_FRAME_SIZES from openpilot.selfdrive.test.update_ci_routes import upload_route -from openpilot.tools.lib.framereader import FrameReader, BaseFrameReader, FrameType +from openpilot.tools.lib.framereader import FrameReader from openpilot.tools.lib.logreader import LogReader, LogIterable, save_log from openpilot.tools.lib.openpilotci import get_url -class DummyFrameReader(BaseFrameReader): - def __init__(self, w: int, h: int, frame_count: int, pix_val: int): - self.pix_val = pix_val - self.w, self.h = w, h - self.frame_count = frame_count - self.frame_type = FrameType.raw - - def get(self, idx, count=1, pix_fmt="rgb24"): - if pix_fmt == "rgb24": - shape = (self.h, self.w, 3) - elif pix_fmt == "nv12" or pix_fmt == "yuv420p": - shape = (int((self.h * self.w) * 3 / 2),) - else: - raise NotImplementedError - - return [np.full(shape, self.pix_val, dtype=np.uint8) for _ in range(count)] - - @staticmethod - def zero_dcamera(): - return DummyFrameReader(*DRIVER_CAMERA_FRAME_SIZES[("tici", "ar0231")], 1200, 0) - def regen_segment( lr: LogIterable, frs: dict[str, Any] = None, @@ -64,7 +41,7 @@ def setup_data_readers( frs['wideRoadCameraState'] = FrameReader(get_url(route, str(sidx), "ecamera.hevc")) if needs_driver_cam: if dummy_driver_cam: - frs['driverCameraState'] = DummyFrameReader.zero_dcamera() + frs['driverCameraState'] = FrameReader(get_url(route, str(sidx), "fcamera.hevc")) # Use fcam as dummy else: device_type = next(str(msg.initData.deviceType) for msg in lr if msg.which() == "initData") assert device_type != "neo", "Driver camera not supported on neo segments. Use dummy dcamera." diff --git a/selfdrive/test/process_replay/test_regen.py b/selfdrive/test/process_replay/test_regen.py index 6c4b48c8d5..5f26daf786 100644 --- a/selfdrive/test/process_replay/test_regen.py +++ b/selfdrive/test/process_replay/test_regen.py @@ -1,6 +1,6 @@ from parameterized import parameterized -from openpilot.selfdrive.test.process_replay.regen import regen_segment, DummyFrameReader +from openpilot.selfdrive.test.process_replay.regen import regen_segment from openpilot.selfdrive.test.process_replay.process_replay import check_openpilot_enabled from openpilot.tools.lib.openpilotci import get_url from openpilot.tools.lib.logreader import LogReader @@ -18,7 +18,7 @@ def ci_setup_data_readers(route, sidx): lr = LogReader(get_url(route, sidx, "rlog.bz2")) frs = { 'roadCameraState': FrameReader(get_url(route, sidx, "fcamera.hevc")), - 'driverCameraState': DummyFrameReader.zero_dcamera() + 'driverCameraState': FrameReader(get_url(route, sidx, "fcamera.hevc")), } if next((True for m in lr if m.which() == "wideRoadCameraState"), False): frs["wideRoadCameraState"] = FrameReader(get_url(route, sidx, "ecamera.hevc")) diff --git a/selfdrive/ui/tests/test_ui/run.py b/selfdrive/ui/tests/test_ui/run.py index c830680aa6..2e0b771fb2 100755 --- a/selfdrive/ui/tests/test_ui/run.py +++ b/selfdrive/ui/tests/test_ui/run.py @@ -283,9 +283,9 @@ def create_screenshots(): driver_img = frames[2] else: with open(frames_cache, 'wb') as f: - road_img = FrameReader(route.camera_paths()[segnum]).get(0, pix_fmt="nv12")[0] - wide_road_img = FrameReader(route.ecamera_paths()[segnum]).get(0, pix_fmt="nv12")[0] - driver_img = FrameReader(route.dcamera_paths()[segnum]).get(0, pix_fmt="nv12")[0] + road_img = FrameReader(route.camera_paths()[segnum], pix_fmt="nv12").get(0)[0] + wide_road_img = FrameReader(route.ecamera_paths()[segnum], pix_fmt="nv12").get(0)[0] + driver_img = FrameReader(route.dcamera_paths()[segnum], pix_fmt="nv12").get(0)[0] pickle.dump([road_img, wide_road_img, driver_img], f) STREAMS.append((VisionStreamType.VISION_STREAM_ROAD, cam.fcam, road_img.flatten().tobytes())) diff --git a/tools/lib/framereader.py b/tools/lib/framereader.py index 0eddc88868..ab34273d26 100644 --- a/tools/lib/framereader.py +++ b/tools/lib/framereader.py @@ -1,62 +1,51 @@ -import json import os -import pickle -import struct import subprocess -import threading -from enum import IntEnum -from functools import wraps +import json +from collections.abc import Iterator import numpy as np from lru import LRU -import _io -from openpilot.tools.lib.cache import cache_path_for_file_path, DEFAULT_CACHE_DIR +from openpilot.tools.lib.filereader import FileReader, resolve_name from openpilot.tools.lib.exceptions import DataUnreadableError from openpilot.tools.lib.vidindex import hevc_index -from openpilot.common.file_helpers import atomic_write_in_dir -from openpilot.tools.lib.filereader import FileReader, resolve_name HEVC_SLICE_B = 0 HEVC_SLICE_P = 1 HEVC_SLICE_I = 2 -class GOPReader: - def get_gop(self, num): - # returns (start_frame_num, num_frames, frames_to_skip, gop_data) - raise NotImplementedError - - -class DoNothingContextManager: - def __enter__(self): - return self - - def __exit__(self, *x): - pass - - -class FrameType(IntEnum): - raw = 1 - h265_stream = 2 - - -def fingerprint_video(fn): +def assert_hvec(fn: str) -> None: with FileReader(fn) as f: header = f.read(4) if len(header) == 0: raise DataUnreadableError(f"{fn} is empty") - elif header == b"\x00\xc0\x12\x00": - return FrameType.raw elif header == b"\x00\x00\x00\x01": - if 'hevc' in fn: - return FrameType.h265_stream - else: + if 'hevc' not in fn: raise NotImplementedError(fn) - else: - raise NotImplementedError(fn) +def decompress_video_data(rawdat, w, h, pix_fmt="rgb24", vid_fmt='hevc') -> np.ndarray: + threads = os.getenv("FFMPEG_THREADS", "0") + args = ["ffmpeg", "-v", "quiet", + "-threads", threads, + "-c:v", "hevc", + "-vsync", "0", + "-f", vid_fmt, + "-flags2", "showall", + "-i", "-", + "-f", "rawvideo", + "-pix_fmt", pix_fmt, + "-"] + dat = subprocess.check_output(args, input=rawdat) + + if pix_fmt == "rgb24": + ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, h, w, 3) + elif pix_fmt in ["nv12", "yuv420p"]: + ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, (h*w*3//2)) + else: + raise NotImplementedError(f"Unsupported pixel format: {pix_fmt}") + return ret def ffprobe(fn, fmt=None): fn = resolve_name(fn) @@ -70,42 +59,21 @@ def ffprobe(fn, fmt=None): ffprobe_output = subprocess.check_output(cmd, input=f.read(4096)) except subprocess.CalledProcessError as e: raise DataUnreadableError(fn) from e - return json.loads(ffprobe_output) +def get_index_data(fn: str, index_data: dict|None = None): + if index_data is None: + index_data = get_video_index(fn) + if index_data is None: + raise DataUnreadableError(f"Failed to index {fn!r}") + stream = index_data["probe"]["streams"][0] + return index_data["index"], index_data["global_prefix"], stream["width"], stream["height"] -def cache_fn(func): - @wraps(func) - def cache_inner(fn, *args, **kwargs): - if kwargs.pop('no_cache', None): - cache_path = None - else: - cache_dir = kwargs.pop('cache_dir', DEFAULT_CACHE_DIR) - cache_path = cache_path_for_file_path(fn, cache_dir) - - if cache_path and os.path.exists(cache_path): - with open(cache_path, "rb") as cache_file: - cache_value = pickle.load(cache_file) - else: - cache_value = func(fn, *args, **kwargs) - if cache_path: - with atomic_write_in_dir(cache_path, mode="wb", overwrite=True) as cache_file: - pickle.dump(cache_value, cache_file, -1) - - return cache_value - - return cache_inner - - -@cache_fn -def index_stream(fn, ft): - if ft != FrameType.h265_stream: - raise NotImplementedError("Only h265 supported") - +def get_video_index(fn): + assert_hvec(fn) frame_types, dat_len, prefix = hevc_index(fn) index = np.array(frame_types + [(0xFFFFFFFF, dat_len)], dtype=np.uint32) probe = ffprobe(fn, "hevc") - return { 'index': index, 'global_prefix': prefix, @@ -113,425 +81,75 @@ def index_stream(fn, ft): } -def get_video_index(fn, frame_type, cache_dir=DEFAULT_CACHE_DIR): - return index_stream(fn, frame_type, cache_dir=cache_dir) - -def read_file_check_size(f, sz, cookie): - buff = bytearray(sz) - bytes_read = f.readinto(buff) - assert bytes_read == sz, (bytes_read, sz) - return buff - - -def rgb24toyuv(rgb): - yuv_from_rgb = np.array([[ 0.299 , 0.587 , 0.114 ], - [-0.14714119, -0.28886916, 0.43601035 ], - [ 0.61497538, -0.51496512, -0.10001026 ]]) - img = np.dot(rgb.reshape(-1, 3), yuv_from_rgb.T).reshape(rgb.shape) - - - - ys = img[:, :, 0] - us = (img[::2, ::2, 1] + img[1::2, ::2, 1] + img[::2, 1::2, 1] + img[1::2, 1::2, 1]) / 4 + 128 - vs = (img[::2, ::2, 2] + img[1::2, ::2, 2] + img[::2, 1::2, 2] + img[1::2, 1::2, 2]) / 4 + 128 - - return ys, us, vs - - -def rgb24toyuv420(rgb): - ys, us, vs = rgb24toyuv(rgb) - - y_len = rgb.shape[0] * rgb.shape[1] - uv_len = y_len // 4 - - yuv420 = np.empty(y_len + 2 * uv_len, dtype=rgb.dtype) - yuv420[:y_len] = ys.reshape(-1) - yuv420[y_len:y_len + uv_len] = us.reshape(-1) - yuv420[y_len + uv_len:y_len + 2 * uv_len] = vs.reshape(-1) - - return yuv420.clip(0, 255).astype('uint8') - - -def rgb24tonv12(rgb): - ys, us, vs = rgb24toyuv(rgb) - - y_len = rgb.shape[0] * rgb.shape[1] - uv_len = y_len // 4 - - nv12 = np.empty(y_len + 2 * uv_len, dtype=rgb.dtype) - nv12[:y_len] = ys.reshape(-1) - nv12[y_len::2] = us.reshape(-1) - nv12[y_len+1::2] = vs.reshape(-1) - - return nv12.clip(0, 255).astype('uint8') - - -def decompress_video_data(rawdat, vid_fmt, w, h, pix_fmt): - threads = os.getenv("FFMPEG_THREADS", "0") - cuda = os.getenv("FFMPEG_CUDA", "0") == "1" - args = ["ffmpeg", "-v", "quiet", - "-threads", threads, - "-hwaccel", "none" if not cuda else "cuda", - "-c:v", "hevc", - "-vsync", "0", - "-f", vid_fmt, - "-flags2", "showall", - "-i", "-", - "-threads", threads, - "-f", "rawvideo", - "-pix_fmt", pix_fmt, - "-"] - dat = subprocess.check_output(args, input=rawdat) - - if pix_fmt == "rgb24": - ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, h, w, 3) - elif pix_fmt == "nv12": - ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, (h*w*3//2)) - elif pix_fmt == "yuv420p": - ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, (h*w*3//2)) - elif pix_fmt == "yuv444p": - ret = np.frombuffer(dat, dtype=np.uint8).reshape(-1, 3, h, w) - else: - raise NotImplementedError - - return ret - - -class BaseFrameReader: - # properties: frame_type, frame_count, w, h - - def __enter__(self): - return self - - def __exit__(self, *args): - self.close() - - def close(self): - pass - - def get(self, num, count=1, pix_fmt="rgb24"): - raise NotImplementedError - - -def FrameReader(fn, cache_dir=DEFAULT_CACHE_DIR, readahead=False, readbehind=False, index_data=None): - frame_type = fingerprint_video(fn) - if frame_type == FrameType.raw: - return RawFrameReader(fn) - elif frame_type in (FrameType.h265_stream,): - if not index_data: - index_data = get_video_index(fn, frame_type, cache_dir) - return StreamFrameReader(fn, frame_type, index_data, readahead=readahead, readbehind=readbehind) - else: - raise NotImplementedError(frame_type) - - -class RawData: - def __init__(self, f): - self.f = _io.FileIO(f, 'rb') - self.lenn = struct.unpack("I", self.f.read(4))[0] - self.count = os.path.getsize(f) / (self.lenn+4) - - def read(self, i): - self.f.seek((self.lenn+4)*i + 4) - return self.f.read(self.lenn) - - -class RawFrameReader(BaseFrameReader): - def __init__(self, fn): - # raw camera +class FfmpegDecoder: + def __init__(self, fn: str, index_data: dict|None = None, + pix_fmt: str = "rgb24"): self.fn = fn - self.frame_type = FrameType.raw - self.rawfile = RawData(self.fn) - self.frame_count = self.rawfile.count - self.w, self.h = 640, 480 - - def load_and_debayer(self, img): - img = np.frombuffer(img, dtype='uint8').reshape(960, 1280) - cimg = np.dstack([img[0::2, 1::2], ((img[0::2, 0::2].astype("uint16") + img[1::2, 1::2].astype("uint16")) >> 1).astype("uint8"), img[1::2, 0::2]]) - return cimg - - def get(self, num, count=1, pix_fmt="yuv420p"): - assert self.frame_count is not None - assert num+count <= self.frame_count - - if pix_fmt not in ("nv12", "yuv420p", "rgb24"): - raise ValueError(f"Unsupported pixel format {pix_fmt!r}") - - app = [] - for i in range(num, num+count): - dat = self.rawfile.read(i) - rgb_dat = self.load_and_debayer(dat) - if pix_fmt == "rgb24": - app.append(rgb_dat) - elif pix_fmt == "nv12": - app.append(rgb24tonv12(rgb_dat)) - elif pix_fmt == "yuv420p": - app.append(rgb24toyuv420(rgb_dat)) - else: - raise NotImplementedError - - return app - - -class VideoStreamDecompressor: - def __init__(self, fn, vid_fmt, w, h, pix_fmt): - self.fn = fn - self.vid_fmt = vid_fmt - self.w = w - self.h = h + self.index, self.prefix, self.w, self.h = get_index_data(fn, index_data) + self.frame_count = len(self.index) - 1 # sentinel row at the end + self.iframes = np.where(self.index[:, 0] == HEVC_SLICE_I)[0] self.pix_fmt = pix_fmt - if pix_fmt in ("nv12", "yuv420p"): - self.out_size = w*h*3//2 # yuv420p - elif pix_fmt in ("rgb24", "yuv444p"): - self.out_size = w*h*3 - else: - raise NotImplementedError - - self.proc = None - self.t = threading.Thread(target=self.write_thread) - self.t.daemon = True - - def write_thread(self): - try: + def _gop_bounds(self, frame_idx: int): + f_b = frame_idx + while f_b > 0 and self.index[f_b, 0] != HEVC_SLICE_I: + f_b -= 1 + f_e = frame_idx + 1 + while f_e < self.frame_count and self.index[f_e, 0] != HEVC_SLICE_I: + f_e += 1 + return f_b, f_e, self.index[f_b, 1], self.index[f_e, 1] + + def _decode_gop(self, raw: bytes) -> Iterator[np.ndarray]: + yield from decompress_video_data(raw, self.w, self.h, self.pix_fmt) + + def get_gop_start(self, frame_idx: int): + return self.iframes[np.searchsorted(self.iframes, frame_idx, side="right") - 1] + + def get_iterator(self, start_fidx: int = 0, end_fidx: int|None = None, + frame_skip: int = 1) -> Iterator[tuple[int, np.ndarray]]: + end_fidx = end_fidx or self.frame_count + fidx = start_fidx + while fidx < end_fidx: + f_b, f_e, off_b, off_e = self._gop_bounds(fidx) with FileReader(self.fn) as f: - while True: - r = f.read(1024*1024) - if len(r) == 0: - break - self.proc.stdin.write(r) - except BrokenPipeError: - pass - finally: - self.proc.stdin.close() - - def read(self): - threads = os.getenv("FFMPEG_THREADS", "0") - cuda = os.getenv("FFMPEG_CUDA", "0") == "1" - cmd = [ - "ffmpeg", - "-threads", threads, - "-hwaccel", "none" if not cuda else "cuda", - "-c:v", "hevc", - # "-avioflags", "direct", - "-analyzeduration", "0", - "-probesize", "32", - "-flush_packets", "0", - # "-fflags", "nobuffer", - "-vsync", "0", - "-f", self.vid_fmt, - "-i", "pipe:0", - "-threads", threads, - "-f", "rawvideo", - "-pix_fmt", self.pix_fmt, - "pipe:1" - ] - self.proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) - try: - self.t.start() - - while True: - dat = self.proc.stdout.read(self.out_size) - if len(dat) == 0: - break - assert len(dat) == self.out_size - if self.pix_fmt == "rgb24": - ret = np.frombuffer(dat, dtype=np.uint8).reshape((self.h, self.w, 3)) - elif self.pix_fmt == "yuv420p": - ret = np.frombuffer(dat, dtype=np.uint8) - elif self.pix_fmt == "nv12": - ret = np.frombuffer(dat, dtype=np.uint8) - elif self.pix_fmt == "yuv444p": - ret = np.frombuffer(dat, dtype=np.uint8).reshape((3, self.h, self.w)) - else: - raise RuntimeError(f"unknown pix_fmt: {self.pix_fmt}") - yield ret - - result_code = self.proc.wait() - assert result_code == 0, result_code - finally: - self.proc.kill() - self.t.join() - -class StreamGOPReader(GOPReader): - def __init__(self, fn, frame_type, index_data): - assert frame_type == FrameType.h265_stream - - self.fn = fn - - self.frame_type = frame_type - self.frame_count = None - self.w, self.h = None, None - - self.prefix = None - self.index = None - - self.index = index_data['index'] - self.prefix = index_data['global_prefix'] - probe = index_data['probe'] - - self.prefix_frame_data = None - self.num_prefix_frames = 0 - self.vid_fmt = "hevc" - - i = 0 - while i < self.index.shape[0] and self.index[i, 0] != HEVC_SLICE_I: - i += 1 - self.first_iframe = i - - assert self.first_iframe == 0 - - self.frame_count = len(self.index) - 1 - - self.w = probe['streams'][0]['width'] - self.h = probe['streams'][0]['height'] - - def _lookup_gop(self, num): - frame_b = num - while frame_b > 0 and self.index[frame_b, 0] != HEVC_SLICE_I: - frame_b -= 1 - - frame_e = num + 1 - while frame_e < (len(self.index) - 1) and self.index[frame_e, 0] != HEVC_SLICE_I: - frame_e += 1 - - offset_b = self.index[frame_b, 1] - offset_e = self.index[frame_e, 1] - - return (frame_b, frame_e, offset_b, offset_e) - - def get_gop(self, num): - frame_b, frame_e, offset_b, offset_e = self._lookup_gop(num) - assert frame_b <= num < frame_e - - num_frames = frame_e - frame_b - - with FileReader(self.fn) as f: - f.seek(offset_b) - rawdat = f.read(offset_e - offset_b) - - if num < self.first_iframe: - assert self.prefix_frame_data - rawdat = self.prefix_frame_data + rawdat - - rawdat = self.prefix + rawdat - - skip_frames = 0 - if num < self.first_iframe: - skip_frames = self.num_prefix_frames - - return frame_b, num_frames, skip_frames, rawdat - - -class GOPFrameReader(BaseFrameReader): - #FrameReader with caching and readahead for formats that are group-of-picture based - - def __init__(self, readahead=False, readbehind=False): - self.open_ = True - - self.readahead = readahead - self.readbehind = readbehind - self.frame_cache = LRU(64) - - if self.readahead: - self.cache_lock = threading.RLock() - self.readahead_last = None - self.readahead_len = 30 - self.readahead_c = threading.Condition() - self.readahead_thread = threading.Thread(target=self._readahead_thread) - self.readahead_thread.daemon = True - self.readahead_thread.start() - else: - self.cache_lock = DoNothingContextManager() - - def close(self): - if not self.open_: - return - self.open_ = False - - if self.readahead: - self.readahead_c.acquire() - self.readahead_c.notify() - self.readahead_c.release() - self.readahead_thread.join() - - def _readahead_thread(self): - while True: - self.readahead_c.acquire() - try: - if not self.open_: - break - self.readahead_c.wait() - finally: - self.readahead_c.release() - if not self.open_: - break - assert self.readahead_last - num, pix_fmt = self.readahead_last - - if self.readbehind: - for k in range(num - 1, max(0, num - self.readahead_len), -1): - self._get_one(k, pix_fmt) - else: - for k in range(num, min(self.frame_count, num + self.readahead_len)): - self._get_one(k, pix_fmt) - - def _get_one(self, num, pix_fmt): - assert num < self.frame_count - - if (num, pix_fmt) in self.frame_cache: - return self.frame_cache[(num, pix_fmt)] - - with self.cache_lock: - if (num, pix_fmt) in self.frame_cache: - return self.frame_cache[(num, pix_fmt)] - - frame_b, num_frames, skip_frames, rawdat = self.get_gop(num) - - ret = decompress_video_data(rawdat, self.vid_fmt, self.w, self.h, pix_fmt) - ret = ret[skip_frames:] - assert ret.shape[0] == num_frames - - for i in range(ret.shape[0]): - self.frame_cache[(frame_b+i, pix_fmt)] = ret[i] - - return self.frame_cache[(num, pix_fmt)] - - def get(self, num, count=1, pix_fmt="rgb24"): - assert self.frame_count is not None - - if num + count > self.frame_count: - raise ValueError(f"{num + count} > {self.frame_count}") - - if pix_fmt not in ("nv12", "yuv420p", "rgb24", "yuv444p"): - raise ValueError(f"Unsupported pixel format {pix_fmt!r}") - - ret = [self._get_one(num + i, pix_fmt) for i in range(count)] - - if self.readahead: - self.readahead_last = (num+count, pix_fmt) - self.readahead_c.acquire() - self.readahead_c.notify() - self.readahead_c.release() - - return ret - - -class StreamFrameReader(StreamGOPReader, GOPFrameReader): - def __init__(self, fn, frame_type, index_data, readahead=False, readbehind=False): - StreamGOPReader.__init__(self, fn, frame_type, index_data) - GOPFrameReader.__init__(self, readahead, readbehind) - - -def GOPFrameIterator(gop_reader, pix_fmt='rgb24'): - dec = VideoStreamDecompressor(gop_reader.fn, gop_reader.vid_fmt, gop_reader.w, gop_reader.h, pix_fmt) - yield from dec.read() - + f.seek(off_b) + raw = self.prefix + f.read(off_e - off_b) + # number of frames to discard inside this GOP before the wanted one + for i, frm in enumerate(decompress_video_data(raw, self.w, self.h, self.pix_fmt)): + fidx = f_b + i + if fidx >= end_fidx: + return + elif fidx >= start_fidx and (fidx - start_fidx) % frame_skip == 0: + yield fidx, frm + fidx += 1 + +def FrameIterator(fn: str, index_data: dict|None=None, + pix_fmt: str = "rgb24", + start_fidx:int=0, end_fidx=None, frame_skip:int=1) -> Iterator[np.ndarray]: + dec = FfmpegDecoder(fn, pix_fmt=pix_fmt, index_data=index_data) + for _, frame in dec.get_iterator(start_fidx=start_fidx, end_fidx=end_fidx, frame_skip=frame_skip): + yield frame + +class FrameReader: + def __init__(self, fn: str, index_data: dict|None = None, + cache_size: int = 30, pix_fmt: str = "rgb24"): + self.decoder = FfmpegDecoder(fn, index_data, pix_fmt) + self.iframes = self.decoder.iframes + self._cache: LRU[int, np.ndarray] = LRU(cache_size) + self.w, self.h, self.frame_count, = self.decoder.w, self.decoder.h, self.decoder.frame_count + self.pix_fmt = pix_fmt -def FrameIterator(fn, pix_fmt='rgb24', **kwargs): - fr = FrameReader(fn, **kwargs) - if isinstance(fr, GOPReader): - yield from GOPFrameIterator(fr, pix_fmt) - else: - for i in range(fr.frame_count): - yield fr.get(i, pix_fmt=pix_fmt)[0] + self.it: Iterator[tuple[int, np.ndarray]] | None = None + self.fidx = -1 + + def get(self, fidx:int) -> list[np.ndarray]: + if fidx in self._cache: # If frame is cached, return it + return [self._cache[fidx]] + read_start = self.decoder.get_gop_start(fidx) + if not self.it or fidx < self.fidx or read_start != self.decoder.get_gop_start(self.fidx): # If the frame is in a different GOP, reset the iterator + self.it = self.decoder.get_iterator(read_start) + self.fidx = -1 + while self.fidx < fidx: + self.fidx, frame = next(self.it) + self._cache[self.fidx] = frame + return [self._cache[fidx]] # TODO: return just frame diff --git a/tools/replay/unlog_ci_segment.py b/tools/replay/unlog_ci_segment.py index adc0b19e9b..4906945c2e 100755 --- a/tools/replay/unlog_ci_segment.py +++ b/tools/replay/unlog_ci_segment.py @@ -47,7 +47,7 @@ def replay(route, segment, loop): if w == 'roadCameraState': try: - img = fr.get(frame_idx[msg.roadCameraState.frameId], pix_fmt="rgb24") + img = fr.get(frame_idx[msg.roadCameraState.frameId]) img = img[0][:, :, ::-1] # Convert RGB to BGR, which is what the camera outputs msg.roadCameraState.image = img.flatten().tobytes() except (KeyError, ValueError): diff --git a/tools/scripts/fetch_image_from_route.py b/tools/scripts/fetch_image_from_route.py index 521f1597ec..fd48ff7d50 100755 --- a/tools/scripts/fetch_image_from_route.py +++ b/tools/scripts/fetch_image_from_route.py @@ -37,7 +37,7 @@ fr = FrameReader(segments[segment]) if frame >= fr.frame_count: raise Exception("frame {frame} not found, got {fr.frame_count} frames") -im = Image.fromarray(fr.get(frame, count=1, pix_fmt="rgb24")[0]) +im = Image.fromarray(fr.get(frame)[0]) fn = f"uxxx_{route.replace('|', '_')}_{segment}_{frame}.png" im.save(fn) print(f"saved {fn}")