| 
						
						
							
								
							
						
						
					 | 
					 | 
					@ -6,6 +6,9 @@ | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					extern ExitHandler do_exit; | 
					 | 
					 | 
					 | 
					extern ExitHandler do_exit; | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					// Max messages to process per socket per poll
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					constexpr int MAX_MESSAGES_PER_SOCKET = 50; | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					static std::string recv_zmq_msg(void *sock) { | 
					 | 
					 | 
					 | 
					static std::string recv_zmq_msg(void *sock) { | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  zmq_msg_t msg; | 
					 | 
					 | 
					 | 
					  zmq_msg_t msg; | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  zmq_msg_init(&msg); | 
					 | 
					 | 
					 | 
					  zmq_msg_init(&msg); | 
				
			
			
		
	
	
		
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
					 | 
					@ -22,7 +25,7 @@ void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  msgq_context = std::make_unique<MSGQContext>(); | 
					 | 
					 | 
					 | 
					  msgq_context = std::make_unique<MSGQContext>(); | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  // Create ZMQPubSockets for each endpoint
 | 
					 | 
					 | 
					 | 
					  // Create ZMQPubSockets for each endpoint
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  for (auto endpoint : endpoints) { | 
					 | 
					 | 
					 | 
					  for (const auto &endpoint : endpoints) { | 
				
			
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					    auto &socket_pair = socket_pairs.emplace_back(); | 
					 | 
					 | 
					 | 
					    auto &socket_pair = socket_pairs.emplace_back(); | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					    socket_pair.endpoint = endpoint; | 
					 | 
					 | 
					 | 
					    socket_pair.endpoint = endpoint; | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					    socket_pair.pub_sock = std::make_unique<ZMQPubSocket>(); | 
					 | 
					 | 
					 | 
					    socket_pair.pub_sock = std::make_unique<ZMQPubSocket>(); | 
				
			
			
		
	
	
		
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
					 | 
					@ -36,6 +39,7 @@ void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  // Start ZMQ monitoring thread to monitor socket events
 | 
					 | 
					 | 
					 | 
					  // Start ZMQ monitoring thread to monitor socket events
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  std::thread thread(&MsgqToZmq::zmqMonitorThread, this); | 
					 | 
					 | 
					 | 
					  std::thread thread(&MsgqToZmq::zmqMonitorThread, this); | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					  // Main loop for processing messages
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  while (!do_exit) { | 
					 | 
					 | 
					 | 
					  while (!do_exit) { | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					    { | 
					 | 
					 | 
					 | 
					    { | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      std::unique_lock lk(mutex); | 
					 | 
					 | 
					 | 
					      std::unique_lock lk(mutex); | 
				
			
			
		
	
	
		
		
			
				
					| 
						
						
						
							
								
							
						
					 | 
					 | 
					@ -43,12 +47,17 @@ void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      if (do_exit) break; | 
					 | 
					 | 
					 | 
					      if (do_exit) break; | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      for (auto sub_sock : msgq_poller->poll(100)) { | 
					 | 
					 | 
					 | 
					      for (auto sub_sock : msgq_poller->poll(100)) { | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					        std::unique_ptr<Message> msg(sub_sock->receive(true)); | 
					 | 
					 | 
					 | 
					        // Process messages for each socket
 | 
				
			
			
				
				
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					        while (msg && sub2pub[sub_sock]->sendMessage(msg.get()) == -1) { | 
					 | 
					 | 
					 | 
					        for (int i = 0; i < MAX_MESSAGES_PER_SOCKET; ++i) { | 
				
			
			
				
				
			
		
	
		
		
	
		
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					          auto msg = std::unique_ptr<Message>(sub_sock->receive(true)); | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					          if (!msg) break; | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					          while (sub2pub[sub_sock]->sendMessage(msg.get()) == -1) { | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					            if (errno != EINTR) break; | 
					 | 
					 | 
					 | 
					            if (errno != EINTR) break; | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					          } | 
					 | 
					 | 
					 | 
					          } | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					        } | 
					 | 
					 | 
					 | 
					        } | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					      } | 
					 | 
					 | 
					 | 
					      } | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					 | 
					    } | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					    util::sleep_for(1);  // Give zmqMonitorThread a chance to acquire the mutex
 | 
					 | 
					 | 
					 | 
					    util::sleep_for(1);  // Give zmqMonitorThread a chance to acquire the mutex
 | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					  } | 
					 | 
					 | 
					 | 
					  } | 
				
			
			
		
	
		
		
			
				
					
					 | 
					 | 
					 | 
					
 | 
					 | 
					 | 
					 | 
					
 | 
				
			
			
		
	
	
		
		
			
				
					| 
						
							
								
							
						
						
						
					 | 
					 | 
					
  |