|
|
@ -77,9 +77,9 @@ class ReplayContext: |
|
|
|
def wait_for_recv_called(self): |
|
|
|
def wait_for_recv_called(self): |
|
|
|
messaging.wait_for_one_event(self.all_recv_called_events) |
|
|
|
messaging.wait_for_one_event(self.all_recv_called_events) |
|
|
|
|
|
|
|
|
|
|
|
def wait_for_next_recv(self, end_of_cycle): |
|
|
|
def wait_for_next_recv(self, trigger_empty_recv): |
|
|
|
index = messaging.wait_for_one_event(self.all_recv_called_events) |
|
|
|
index = messaging.wait_for_one_event(self.all_recv_called_events) |
|
|
|
if self.drained_pub is not None and end_of_cycle: |
|
|
|
if self.drained_pub is not None and trigger_empty_recv: |
|
|
|
self.all_recv_called_events[index].clear() |
|
|
|
self.all_recv_called_events[index].clear() |
|
|
|
self.all_recv_ready_events[index].set() |
|
|
|
self.all_recv_ready_events[index].set() |
|
|
|
self.all_recv_called_events[index].wait() |
|
|
|
self.all_recv_called_events[index].wait() |
|
|
@ -419,12 +419,17 @@ def _replay_single_process(cfg, lr, fingerprint): |
|
|
|
for s in sockets.values(): |
|
|
|
for s in sockets.values(): |
|
|
|
messaging.recv_one_or_none(s) |
|
|
|
messaging.recv_one_or_none(s) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# empty recv on drained pub indicates the end of messages, only do that if there're any |
|
|
|
|
|
|
|
trigger_empty_recv = False |
|
|
|
|
|
|
|
if cfg.drained_pub: |
|
|
|
|
|
|
|
trigger_empty_recv = next((True for m in msg_queue if m.which() == cfg.drained_pub), False) |
|
|
|
|
|
|
|
|
|
|
|
for m in msg_queue: |
|
|
|
for m in msg_queue: |
|
|
|
pm.send(m.which(), m.as_builder()) |
|
|
|
pm.send(m.which(), m.as_builder()) |
|
|
|
msg_queue = [] |
|
|
|
msg_queue = [] |
|
|
|
|
|
|
|
|
|
|
|
rc.unlock_sockets() |
|
|
|
rc.unlock_sockets() |
|
|
|
rc.wait_for_next_recv(True) |
|
|
|
rc.wait_for_next_recv(trigger_empty_recv) |
|
|
|
|
|
|
|
|
|
|
|
for s in resp_sockets: |
|
|
|
for s in resp_sockets: |
|
|
|
ms = messaging.drain_sock(sockets[s]) |
|
|
|
ms = messaging.drain_sock(sockets[s]) |
|
|
|