diff --git a/tools/lib/async_generator.py b/tools/lib/async_generator.py deleted file mode 100644 index ef535f27f5..0000000000 --- a/tools/lib/async_generator.py +++ /dev/null @@ -1,352 +0,0 @@ -import functools -import threading -import inspect -import sys -import select -import struct -from math import sqrt -from collections import OrderedDict, deque -from time import time - -from tools.lib.pollable_queue import PollableQueue, Empty, Full, ExistentialError - -EndSentinel = object() - - -def _sync_inner_generator(input_queue, *args, **kwargs): - func = args[0] - args = args[1:] - - get = input_queue.get - while True: - item = get() - if item is EndSentinel: - return - - cookie, value = item - yield cookie, func(value, *args, **kwargs) - - -def _async_streamer_async_inner(input_queue, output_queue, generator_func, args, kwargs): - put = output_queue.put - put_end = True - try: - g = generator_func(input_queue, *args, **kwargs) - for item in g: - put((time(), item)) - g.close() - except ExistentialError: - put_end = False - raise - finally: - if put_end: - put((None, EndSentinel)) - -def _running_mean_var(ltc_stats, x): - old_mean, var = ltc_stats - mean = min(600., 0.98 * old_mean + 0.02 * x) - var = min(5., max(0.1, 0.98 * var + 0.02 * (mean - x) * (old_mean - x))) - return mean, var - -def _find_next_resend(sent_messages, ltc_stats): - if not sent_messages: - return None, None - - oldest_sent_idx = sent_messages._OrderedDict__root[1][2] - send_time, _ = sent_messages[oldest_sent_idx] - - # Assume message has been lost if it is >10 standard deviations from mean. - mean, var = ltc_stats - next_resend_time = send_time + mean + 40. * sqrt(var) - - return oldest_sent_idx, next_resend_time - - -def _do_cleanup(input_queue, output_queue, num_workers, sentinels_received, num_outstanding): - input_fd = input_queue.put_fd() - output_fd = output_queue.get_fd() - - poller = select.epoll() - poller.register(input_fd, select.EPOLLOUT) - poller.register(output_fd, select.EPOLLIN) - - remaining_outputs = [] - end_sentinels_to_send = num_workers - sentinels_received - while sentinels_received < num_workers: - evts = dict(poller.poll(-1 if num_outstanding > 0 else 10.)) - if not evts: - # Workers aren't responding, crash. - break - - if output_fd in evts: - _, maybe_sentinel = output_queue.get() - if maybe_sentinel is EndSentinel: - sentinels_received += 1 - else: - remaining_outputs.append(maybe_sentinel[1]) - num_outstanding -= 1 - - if input_fd in evts: - if end_sentinels_to_send > 0: - input_queue.put_nowait(EndSentinel) - end_sentinels_to_send -= 1 - else: - poller.modify(input_fd, 0) - - # TODO: Raise an exception when a queue thread raises one. - assert sentinels_received == num_workers, (sentinels_received, num_workers) - assert output_queue.empty() - return remaining_outputs - -def _generate_results(input_stream, input_queue, worker_output_queue, output_queue, - num_workers, max_outstanding): - pack_cookie = struct.pack - - # Maps idx -> (send_time, input) - sent_messages = OrderedDict() - oldest_sent_idx = None - next_resend_time = None - ltc_stats = 5., 10. - - # Maps idx -> result - received_messages = {} - next_out = 0 - - # Start things off by pulling the first value. - next_in_item = next(input_stream, EndSentinel) - inputs_remain = next_in_item is not EndSentinel - sentinels_received = 0 - - input_fd = input_queue.put_fd() - worker_output_fd = worker_output_queue.get_fd() - output_fd = output_queue.put_fd() - - poller = select.epoll() - poller.register(input_fd, select.EPOLLOUT) - poller.register(worker_output_fd, select.EPOLLIN) - poller.register(output_fd, 0) - - # Keep sending/retrying until the input stream and sent messages are all done. - while sentinels_received < num_workers and (inputs_remain or sent_messages): - if max_outstanding: - can_send_new = (len(sent_messages) < max_outstanding and - len(received_messages) < max_outstanding and inputs_remain) - else: - can_send_new = inputs_remain - - if (next_resend_time and now >= next_resend_time) or can_send_new: - poller.modify(input_fd, select.EPOLLOUT) - else: - poller.modify(input_fd, 0) - - if next_resend_time: - t = max(0, next_resend_time - now) - evts = dict(poller.poll(t)) - else: - evts = dict(poller.poll()) - now = time() - - if output_fd in evts: - output_queue.put_nowait(received_messages.pop(next_out)) - next_out += 1 - - if next_out not in received_messages: - poller.modify(output_fd, 0) - - if worker_output_fd in evts: - for receive_time, maybe_sentinel in worker_output_queue.get_multiple_nowait(): - # Check for EndSentinel in case of worker crash. - if maybe_sentinel is EndSentinel: - sentinels_received += 1 - continue - idx_bytes, value = maybe_sentinel - idx = struct.unpack(" 10.0: - print("TOOK OVER 10 seconds to fetch %r %f %f" % (frame_info, get_time, get_time2)) - - return prefix, input_data, skip, count - -def _ffmpeg_fcamera_input_for_frame(pair): - cookie, frame_info = pair - try: - return cookie, _ffmpeg_fcamera_input_for_frame_info(frame_info) - except Exception as e: - # Let the caller handle exceptions. - return cookie, e - - -def _feed_ffmpeg_fcamera_input_work_loop(frames, proc_stdin, select_pipe_fd, cookie_queue): - last_prefix = None - """ - with ThreadPoolExecutor(64) as pool: - futures = [] - for f in frames: - futures.append(pool.submit(_ffmpeg_fcamera_input_for_frame, f)) - for f in as_completed(futures): - cookie, data = f.result() - if isinstance(data, Exception): - # Just print exceptions for now. - print(data) - continue - prefix, input_data, skip, count = data - cookie_queue.put((cookie, count)) - - # Write zeros for skipped frames, ones for keep frames. - os.write(select_pipe_fd, b"\x00" * skip + b"\x01" * count) - - if prefix != last_prefix: - proc_stdin.write(prefix) - last_prefix = prefix - - proc_stdin.write(input_data) - """ - num_threads = 64 - for cookie, data in async_generator( - num_threads, 8 * num_threads, 8 * num_threads, - reliable=False)(_ffmpeg_fcamera_input_for_frame)(frames): - if isinstance(data, Exception): - # Just print exceptions for now. - print(data) - continue - prefix, input_data, skip, count = data - cookie_queue.put((cookie, count)) - - # Write zeros for skipped frames, ones for keep frames. - os.write(select_pipe_fd, b"\x00" * skip + b"\x01" * count) - - if prefix != last_prefix: - proc_stdin.write(prefix) - last_prefix = prefix - - proc_stdin.write(input_data) - -_FCAMERA_FEED_SUCCESS = object() -def feed_ffmpeg_fcamera_input(frames, proc_stdin, select_pipe_fd, cookie_queue): - print("Feed started on {}".format(threading.current_thread().name)) - try: - _feed_ffmpeg_fcamera_input_work_loop(frames, proc_stdin, select_pipe_fd, cookie_queue) - cookie_queue.put((_FCAMERA_FEED_SUCCESS, None)) - finally: - # Always close ffmpeg input. - proc_stdin.close() - - def read_file_check_size(f, sz, cookie): buff = bytearray(sz) bytes_read = f.readinto(buff) diff --git a/tools/lib/pollable_queue.py b/tools/lib/pollable_queue.py deleted file mode 100644 index 9ef2db62b2..0000000000 --- a/tools/lib/pollable_queue.py +++ /dev/null @@ -1,107 +0,0 @@ -import os -import select -import fcntl -from itertools import count -from collections import deque - -Empty = OSError -Full = OSError -ExistentialError = OSError - -class PollableQueue(object): - """A Queue that you can poll(). - Only works with a single producer. - """ - def __init__(self, maxlen=None): - with open("/proc/sys/fs/pipe-max-size") as f: - max_maxlen = int(f.read().rstrip()) - - if maxlen is None: - maxlen = max_maxlen - else: - maxlen = min(maxlen, max_maxlen) - - self._maxlen = maxlen - self._q = deque() - self._get_fd, self._put_fd = os.pipe() - fcntl.fcntl(self._get_fd, fcntl.F_SETFL, os.O_NONBLOCK) - fcntl.fcntl(self._put_fd, fcntl.F_SETFL, os.O_NONBLOCK) - - fcntl.fcntl(self._get_fd, fcntl.F_SETLEASE + 7, self._maxlen) - fcntl.fcntl(self._put_fd, fcntl.F_SETLEASE + 7, self._maxlen) - - get_poller = select.epoll() - put_poller = select.epoll() - get_poller.register(self._get_fd, select.EPOLLIN) - put_poller.register(self._put_fd, select.EPOLLOUT) - - self._get_poll = get_poller.poll - self._put_poll = put_poller.poll - - - def get_fd(self): - return self._get_fd - - def put_fd(self): - return self._put_fd - - def put(self, item, block=True, timeout=None): - if block: - while self._put_poll(timeout if timeout is not None else -1): - try: - # TODO: This is broken for multiple push threads when the queue is full. - return self.put_nowait(item) - except OSError as e: - if e.errno != 11: - raise - - raise Full() - else: - return self.put_nowait(item) - - def put_nowait(self, item): - self._q.appendleft(item) - os.write(self._put_fd, b"\x00") - - def get(self, block=True, timeout=None): - if block: - while self._get_poll(timeout if timeout is not None else -1): - try: - return self.get_nowait() - except OSError as e: - if e.errno != 11: - raise - - raise Empty() - else: - return self.get_nowait() - - def get_nowait(self): - os.read(self._get_fd, 1) - return self._q.pop() - - def get_multiple(self, block=True, timeout=None): - if block: - if self._get_poll(timeout if timeout is not None else -1): - return self.get_multiple_nowait() - else: - raise Empty() - else: - return self.get_multiple_nowait() - - def get_multiple_nowait(self, max_messages=None): - num_read = len(os.read(self._get_fd, max_messages or self._maxlen)) - return [self._q.pop() for _ in range(num_read)] - - def empty(self): - return len(self._q) == 0 - - def full(self): - return len(self._q) >= self._maxlen - - def close(self): - os.close(self._get_fd) - os.close(self._put_fd) - - def __len__(self): - return len(self._q) \ No newline at end of file