bridge: drain each socket up to a fixed number (#33400)

drain each socket up to a fixed number
pull/33245/head
Dean Lee 1 year ago committed by GitHub
parent 5bfad23442
commit 930fa18299
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 17
      cereal/messaging/msgq_to_zmq.cc

@ -6,6 +6,9 @@
extern ExitHandler do_exit;
// Max messages to process per socket per poll
constexpr int MAX_MESSAGES_PER_SOCKET = 50;
static std::string recv_zmq_msg(void *sock) {
zmq_msg_t msg;
zmq_msg_init(&msg);
@ -22,7 +25,7 @@ void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string
msgq_context = std::make_unique<MSGQContext>();
// Create ZMQPubSockets for each endpoint
for (auto endpoint : endpoints) {
for (const auto &endpoint : endpoints) {
auto &socket_pair = socket_pairs.emplace_back();
socket_pair.endpoint = endpoint;
socket_pair.pub_sock = std::make_unique<ZMQPubSocket>();
@ -36,6 +39,7 @@ void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string
// Start ZMQ monitoring thread to monitor socket events
std::thread thread(&MsgqToZmq::zmqMonitorThread, this);
// Main loop for processing messages
while (!do_exit) {
{
std::unique_lock lk(mutex);
@ -43,9 +47,14 @@ void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string
if (do_exit) break;
for (auto sub_sock : msgq_poller->poll(100)) {
std::unique_ptr<Message> msg(sub_sock->receive(true));
while (msg && sub2pub[sub_sock]->sendMessage(msg.get()) == -1) {
if (errno != EINTR) break;
// Process messages for each socket
for (int i = 0; i < MAX_MESSAGES_PER_SOCKET; ++i) {
auto msg = std::unique_ptr<Message>(sub_sock->receive(true));
if (!msg) break;
while (sub2pub[sub_sock]->sendMessage(msg.get()) == -1) {
if (errno != EINTR) break;
}
}
}
}

Loading…
Cancel
Save