|  |  |  | @ -445,31 +445,32 @@ def cpp_replay_process(cfg, lr, fingerprint=None): | 
			
		
	
		
			
				
					|  |  |  |  |   managed_processes[cfg.proc_name].prepare() | 
			
		
	
		
			
				
					|  |  |  |  |   managed_processes[cfg.proc_name].start() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   with Timeout(TIMEOUT): | 
			
		
	
		
			
				
					|  |  |  |  |     while not all(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()): | 
			
		
	
		
			
				
					|  |  |  |  |       time.sleep(0) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     # Make sure all subscribers are connected | 
			
		
	
		
			
				
					|  |  |  |  |     sockets = {s: messaging.sub_sock(s, timeout=2000) for s in sub_sockets} | 
			
		
	
		
			
				
					|  |  |  |  |     for s in sub_sockets: | 
			
		
	
		
			
				
					|  |  |  |  |       messaging.recv_one_or_none(sockets[s]) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |     for i, msg in enumerate(tqdm(pub_msgs, disable=False)): | 
			
		
	
		
			
				
					|  |  |  |  |       pm.send(msg.which(), msg.as_builder()) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) | 
			
		
	
		
			
				
					|  |  |  |  |       for s in resp_sockets: | 
			
		
	
		
			
				
					|  |  |  |  |         response = messaging.recv_one(sockets[s]) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         if response is None: | 
			
		
	
		
			
				
					|  |  |  |  |           print(f"Warning, no response received {i}") | 
			
		
	
		
			
				
					|  |  |  |  |         else: | 
			
		
	
		
			
				
					|  |  |  |  |           log_msgs.append(response) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       if not len(resp_sockets):  # We only need to wait if we didn't already wait for a response | 
			
		
	
		
			
				
					|  |  |  |  |         while not pm.all_readers_updated(msg.which()): | 
			
		
	
		
			
				
					|  |  |  |  |           time.sleep(0) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |   try: | 
			
		
	
		
			
				
					|  |  |  |  |     with Timeout(TIMEOUT): | 
			
		
	
		
			
				
					|  |  |  |  |       while not all(pm.all_readers_updated(s) for s in cfg.pub_sub.keys()): | 
			
		
	
		
			
				
					|  |  |  |  |         time.sleep(0) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       # Make sure all subscribers are connected | 
			
		
	
		
			
				
					|  |  |  |  |       sockets = {s: messaging.sub_sock(s, timeout=2000) for s in sub_sockets} | 
			
		
	
		
			
				
					|  |  |  |  |       for s in sub_sockets: | 
			
		
	
		
			
				
					|  |  |  |  |         messaging.recv_one_or_none(sockets[s]) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |       for i, msg in enumerate(tqdm(pub_msgs, disable=False)): | 
			
		
	
		
			
				
					|  |  |  |  |         pm.send(msg.which(), msg.as_builder()) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         resp_sockets = cfg.pub_sub[msg.which()] if cfg.should_recv_callback is None else cfg.should_recv_callback(msg) | 
			
		
	
		
			
				
					|  |  |  |  |         for s in resp_sockets: | 
			
		
	
		
			
				
					|  |  |  |  |           response = messaging.recv_one(sockets[s]) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |           if response is None: | 
			
		
	
		
			
				
					|  |  |  |  |             print(f"Warning, no response received {i}") | 
			
		
	
		
			
				
					|  |  |  |  |           else: | 
			
		
	
		
			
				
					|  |  |  |  |             log_msgs.append(response) | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
		
			
				
					|  |  |  |  |         if not len(resp_sockets):  # We only need to wait if we didn't already wait for a response | 
			
		
	
		
			
				
					|  |  |  |  |           while not pm.all_readers_updated(msg.which()): | 
			
		
	
		
			
				
					|  |  |  |  |             time.sleep(0) | 
			
		
	
		
			
				
					|  |  |  |  |   finally: | 
			
		
	
		
			
				
					|  |  |  |  |     managed_processes[cfg.proc_name].signal(signal.SIGKILL) | 
			
		
	
		
			
				
					|  |  |  |  |     managed_processes[cfg.proc_name].stop() | 
			
		
	
		
			
				
					|  |  |  |  | 
 | 
			
		
	
	
		
			
				
					|  |  |  | 
 |