import functools import threading import inspect import sys import select import struct from math import sqrt from collections import OrderedDict, deque from time import time from tools.lib.pollable_queue import PollableQueue, Empty, Full, ExistentialError EndSentinel = object() def _sync_inner_generator(input_queue, *args, **kwargs): func = args[0] args = args[1:] get = input_queue.get while True: item = get() if item is EndSentinel: return cookie, value = item yield cookie, func(value, *args, **kwargs) def _async_streamer_async_inner(input_queue, output_queue, generator_func, args, kwargs): put = output_queue.put put_end = True try: g = generator_func(input_queue, *args, **kwargs) for item in g: put((time(), item)) g.close() except ExistentialError: put_end = False raise finally: if put_end: put((None, EndSentinel)) def _running_mean_var(ltc_stats, x): old_mean, var = ltc_stats mean = min(600., 0.98 * old_mean + 0.02 * x) var = min(5., max(0.1, 0.98 * var + 0.02 * (mean - x) * (old_mean - x))) return mean, var def _find_next_resend(sent_messages, ltc_stats): if not sent_messages: return None, None oldest_sent_idx = sent_messages._OrderedDict__root[1][2] send_time, _ = sent_messages[oldest_sent_idx] # Assume message has been lost if it is >10 standard deviations from mean. mean, var = ltc_stats next_resend_time = send_time + mean + 40. * sqrt(var) return oldest_sent_idx, next_resend_time def _do_cleanup(input_queue, output_queue, num_workers, sentinels_received, num_outstanding): input_fd = input_queue.put_fd() output_fd = output_queue.get_fd() poller = select.epoll() poller.register(input_fd, select.EPOLLOUT) poller.register(output_fd, select.EPOLLIN) remaining_outputs = [] end_sentinels_to_send = num_workers - sentinels_received while sentinels_received < num_workers: evts = dict(poller.poll(-1 if num_outstanding > 0 else 10.)) if not evts: # Workers aren't responding, crash. break if output_fd in evts: _, maybe_sentinel = output_queue.get() if maybe_sentinel is EndSentinel: sentinels_received += 1 else: remaining_outputs.append(maybe_sentinel[1]) num_outstanding -= 1 if input_fd in evts: if end_sentinels_to_send > 0: input_queue.put_nowait(EndSentinel) end_sentinels_to_send -= 1 else: poller.modify(input_fd, 0) # TODO: Raise an exception when a queue thread raises one. assert sentinels_received == num_workers, (sentinels_received, num_workers) assert output_queue.empty() return remaining_outputs def _generate_results(input_stream, input_queue, worker_output_queue, output_queue, num_workers, max_outstanding): pack_cookie = struct.pack # Maps idx -> (send_time, input) sent_messages = OrderedDict() oldest_sent_idx = None next_resend_time = None ltc_stats = 5., 10. # Maps idx -> result received_messages = {} next_out = 0 # Start things off by pulling the first value. next_in_item = next(input_stream, EndSentinel) inputs_remain = next_in_item is not EndSentinel sentinels_received = 0 input_fd = input_queue.put_fd() worker_output_fd = worker_output_queue.get_fd() output_fd = output_queue.put_fd() poller = select.epoll() poller.register(input_fd, select.EPOLLOUT) poller.register(worker_output_fd, select.EPOLLIN) poller.register(output_fd, 0) # Keep sending/retrying until the input stream and sent messages are all done. while sentinels_received < num_workers and (inputs_remain or sent_messages): if max_outstanding: can_send_new = (len(sent_messages) < max_outstanding and len(received_messages) < max_outstanding and inputs_remain) else: can_send_new = inputs_remain if (next_resend_time and now >= next_resend_time) or can_send_new: poller.modify(input_fd, select.EPOLLOUT) else: poller.modify(input_fd, 0) if next_resend_time: t = max(0, next_resend_time - now) evts = dict(poller.poll(t)) else: evts = dict(poller.poll()) now = time() if output_fd in evts: output_queue.put_nowait(received_messages.pop(next_out)) next_out += 1 if next_out not in received_messages: poller.modify(output_fd, 0) if worker_output_fd in evts: for receive_time, maybe_sentinel in worker_output_queue.get_multiple_nowait(): # Check for EndSentinel in case of worker crash. if maybe_sentinel is EndSentinel: sentinels_received += 1 continue idx_bytes, value = maybe_sentinel idx = struct.unpack("