You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							144 lines
						
					
					
						
							4.7 KiB
						
					
					
				
			
		
		
	
	
							144 lines
						
					
					
						
							4.7 KiB
						
					
					
				| #include "cereal/messaging/msgq_to_zmq.h"
 | |
| 
 | |
| #include <cassert>
 | |
| 
 | |
| #include "common/util.h"
 | |
| 
 | |
| extern ExitHandler do_exit;
 | |
| 
 | |
| // Max messages to process per socket per poll
 | |
| constexpr int MAX_MESSAGES_PER_SOCKET = 50;
 | |
| 
 | |
| static std::string recv_zmq_msg(void *sock) {
 | |
|   zmq_msg_t msg;
 | |
|   zmq_msg_init(&msg);
 | |
|   std::string ret;
 | |
|   if (zmq_msg_recv(&msg, sock, 0) > 0) {
 | |
|     ret.assign((char *)zmq_msg_data(&msg), zmq_msg_size(&msg));
 | |
|   }
 | |
|   zmq_msg_close(&msg);
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string &ip) {
 | |
|   zmq_context = std::make_unique<ZMQContext>();
 | |
|   msgq_context = std::make_unique<MSGQContext>();
 | |
| 
 | |
|   // Create ZMQPubSockets for each endpoint
 | |
|   for (const auto &endpoint : endpoints) {
 | |
|     auto &socket_pair = socket_pairs.emplace_back();
 | |
|     socket_pair.endpoint = endpoint;
 | |
|     socket_pair.pub_sock = std::make_unique<ZMQPubSocket>();
 | |
|     int ret = socket_pair.pub_sock->connect(zmq_context.get(), endpoint);
 | |
|     if (ret != 0) {
 | |
|       printf("Failed to create ZMQ publisher for [%s]: %s\n", endpoint.c_str(), zmq_strerror(zmq_errno()));
 | |
|       return;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Start ZMQ monitoring thread to monitor socket events
 | |
|   std::thread thread(&MsgqToZmq::zmqMonitorThread, this);
 | |
| 
 | |
|   // Main loop for processing messages
 | |
|   while (!do_exit) {
 | |
|     {
 | |
|       std::unique_lock lk(mutex);
 | |
|       cv.wait(lk, [this]() { return do_exit || !sub2pub.empty(); });
 | |
|       if (do_exit) break;
 | |
| 
 | |
|       for (auto sub_sock : msgq_poller->poll(100)) {
 | |
|         // Process messages for each socket
 | |
|         ZMQPubSocket *pub_sock = sub2pub.at(sub_sock);
 | |
|         for (int i = 0; i < MAX_MESSAGES_PER_SOCKET; ++i) {
 | |
|           auto msg = std::unique_ptr<Message>(sub_sock->receive(true));
 | |
|           if (!msg) break;
 | |
| 
 | |
|           while (pub_sock->sendMessage(msg.get()) == -1) {
 | |
|             if (errno != EINTR) break;
 | |
|           }
 | |
|         }
 | |
|       }
 | |
|     }
 | |
|     util::sleep_for(1);  // Give zmqMonitorThread a chance to acquire the mutex
 | |
|   }
 | |
| 
 | |
|   thread.join();
 | |
| }
 | |
| 
 | |
| void MsgqToZmq::zmqMonitorThread() {
 | |
|   std::vector<zmq_pollitem_t> pollitems;
 | |
| 
 | |
|   // Set up ZMQ monitor for each pub socket
 | |
|   for (int i = 0; i < socket_pairs.size(); ++i) {
 | |
|     std::string addr = "inproc://op-bridge-monitor-" + std::to_string(i);
 | |
|     zmq_socket_monitor(socket_pairs[i].pub_sock->sock, addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED);
 | |
| 
 | |
|     void *monitor_socket = zmq_socket(zmq_context->getRawContext(), ZMQ_PAIR);
 | |
|     zmq_connect(monitor_socket, addr.c_str());
 | |
|     pollitems.emplace_back(zmq_pollitem_t{.socket = monitor_socket, .events = ZMQ_POLLIN});
 | |
|   }
 | |
| 
 | |
|   while (!do_exit) {
 | |
|     int ret = zmq_poll(pollitems.data(), pollitems.size(), 1000);
 | |
|     if (ret < 0) {
 | |
|       if (errno == EINTR) {
 | |
|         // Due to frequent EINTR signals from msgq, introduce a brief delay (200 ms)
 | |
|         // to reduce CPU usage during retry attempts.
 | |
|         util::sleep_for(200);
 | |
|       }
 | |
|       continue;
 | |
|     }
 | |
| 
 | |
|     for (int i = 0; i < pollitems.size(); ++i) {
 | |
|       if (pollitems[i].revents & ZMQ_POLLIN) {
 | |
|         // First frame in message contains event number and value
 | |
|         std::string frame = recv_zmq_msg(pollitems[i].socket);
 | |
|         if (frame.empty()) continue;
 | |
| 
 | |
|         uint16_t event_type = *(uint16_t *)(frame.data());
 | |
| 
 | |
|         // Second frame in message contains event address
 | |
|         frame = recv_zmq_msg(pollitems[i].socket);
 | |
|         if (frame.empty()) continue;
 | |
| 
 | |
|         std::unique_lock lk(mutex);
 | |
|         auto &pair = socket_pairs[i];
 | |
|         if (event_type & ZMQ_EVENT_ACCEPTED) {
 | |
|           printf("socket [%s] connected\n", pair.endpoint.c_str());
 | |
|           if (++pair.connected_clients == 1) {
 | |
|             // Create new MSGQ subscriber socket and map to ZMQ publisher
 | |
|             pair.sub_sock = std::make_unique<MSGQSubSocket>();
 | |
|             pair.sub_sock->connect(msgq_context.get(), pair.endpoint, "127.0.0.1");
 | |
|             sub2pub[pair.sub_sock.get()] = pair.pub_sock.get();
 | |
|             registerSockets();
 | |
|           }
 | |
|         } else if (event_type & ZMQ_EVENT_DISCONNECTED) {
 | |
|           printf("socket [%s] disconnected\n", pair.endpoint.c_str());
 | |
|           if (pair.connected_clients == 0 || --pair.connected_clients == 0) {
 | |
|             // Remove MSGQ subscriber socket from mapping and reset it
 | |
|             sub2pub.erase(pair.sub_sock.get());
 | |
|             pair.sub_sock.reset(nullptr);
 | |
|             registerSockets();
 | |
|           }
 | |
|         }
 | |
|         cv.notify_one();
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   // Clean up monitor sockets
 | |
|   for (int i = 0; i < pollitems.size(); ++i) {
 | |
|     zmq_socket_monitor(socket_pairs[i].pub_sock->sock, nullptr, 0);
 | |
|     zmq_close(pollitems[i].socket);
 | |
|   }
 | |
|   cv.notify_one();
 | |
| }
 | |
| 
 | |
| void MsgqToZmq::registerSockets() {
 | |
|   msgq_poller = std::make_unique<MSGQPoller>();
 | |
|   for (const auto &socket_pair : socket_pairs) {
 | |
|     if (socket_pair.sub_sock) {
 | |
|       msgq_poller->registerSocket(socket_pair.sub_sock.get());
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 |