From 930fa182998c4b458f05ed43224e4ec8f092dc2e Mon Sep 17 00:00:00 2001 From: Dean Lee Date: Fri, 30 Aug 2024 03:49:31 +0800 Subject: [PATCH] bridge: drain each socket up to a fixed number (#33400) drain each socket up to a fixed number --- cereal/messaging/msgq_to_zmq.cc | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/cereal/messaging/msgq_to_zmq.cc b/cereal/messaging/msgq_to_zmq.cc index 87ce02ac4d..a130f7dd04 100644 --- a/cereal/messaging/msgq_to_zmq.cc +++ b/cereal/messaging/msgq_to_zmq.cc @@ -6,6 +6,9 @@ extern ExitHandler do_exit; +// Max messages to process per socket per poll +constexpr int MAX_MESSAGES_PER_SOCKET = 50; + static std::string recv_zmq_msg(void *sock) { zmq_msg_t msg; zmq_msg_init(&msg); @@ -22,7 +25,7 @@ void MsgqToZmq::run(const std::vector &endpoints, const std::string msgq_context = std::make_unique(); // Create ZMQPubSockets for each endpoint - for (auto endpoint : endpoints) { + for (const auto &endpoint : endpoints) { auto &socket_pair = socket_pairs.emplace_back(); socket_pair.endpoint = endpoint; socket_pair.pub_sock = std::make_unique(); @@ -36,6 +39,7 @@ void MsgqToZmq::run(const std::vector &endpoints, const std::string // Start ZMQ monitoring thread to monitor socket events std::thread thread(&MsgqToZmq::zmqMonitorThread, this); + // Main loop for processing messages while (!do_exit) { { std::unique_lock lk(mutex); @@ -43,9 +47,14 @@ void MsgqToZmq::run(const std::vector &endpoints, const std::string if (do_exit) break; for (auto sub_sock : msgq_poller->poll(100)) { - std::unique_ptr msg(sub_sock->receive(true)); - while (msg && sub2pub[sub_sock]->sendMessage(msg.get()) == -1) { - if (errno != EINTR) break; + // Process messages for each socket + for (int i = 0; i < MAX_MESSAGES_PER_SOCKET; ++i) { + auto msg = std::unique_ptr(sub_sock->receive(true)); + if (!msg) break; + + while (sub2pub[sub_sock]->sendMessage(msg.get()) == -1) { + if (errno != EINTR) break; + } } } }