bridge: implement MSGQ to ZMQ bridge with subscriber-based publishing (#32862)

implement MSGQ to ZMQ bridge with subscriber-based publishing

Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
old-commit-hash: 2faa08c2d6
pull/33384/head
Dean Lee 8 months ago committed by GitHub
parent d21917ffb3
commit 51fb5009f1
  1. 2
      cereal/SConscript
  2. 99
      cereal/messaging/bridge.cc
  3. 134
      cereal/messaging/msgq_to_zmq.cc
  4. 37
      cereal/messaging/msgq_to_zmq.h

@ -13,7 +13,7 @@ cereal = env.Library('cereal', [f'gen/cpp/{s}.c++' for s in schema_files])
# Build messaging # Build messaging
services_h = env.Command(['services.h'], ['services.py'], 'python3 ' + cereal_dir.path + '/services.py > $TARGET') services_h = env.Command(['services.h'], ['services.py'], 'python3 ' + cereal_dir.path + '/services.py > $TARGET')
env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[msgq, common]) env.Program('messaging/bridge', ['messaging/bridge.cc', 'messaging/msgq_to_zmq.cc'], LIBS=[msgq, common, 'pthread'])
socketmaster = env.Library('socketmaster', ['messaging/socketmaster.cc']) socketmaster = env.Library('socketmaster', ['messaging/socketmaster.cc'])

@ -1,25 +1,10 @@
#include <algorithm>
#include <cassert> #include <cassert>
#include <csignal>
#include <iostream>
#include <map>
#include <string>
typedef void (*sighandler_t)(int sig);
#include "cereal/messaging/msgq_to_zmq.h"
#include "cereal/services.h" #include "cereal/services.h"
#include "msgq/impl_msgq.h" #include "common/util.h"
#include "msgq/impl_zmq.h"
std::atomic<bool> do_exit = false; ExitHandler do_exit;
static void set_do_exit(int sig) {
do_exit = true;
}
void sigpipe_handler(int sig) {
assert(sig == SIGPIPE);
std::cout << "SIGPIPE received" << std::endl;
}
static std::vector<std::string> get_services(std::string whitelist_str, bool zmq_to_msgq) { static std::vector<std::string> get_services(std::string whitelist_str, bool zmq_to_msgq) {
std::vector<std::string> service_list; std::vector<std::string> service_list;
@ -34,41 +19,22 @@ static std::vector<std::string> get_services(std::string whitelist_str, bool zmq
return service_list; return service_list;
} }
int main(int argc, char** argv) { void msgq_to_zmq(const std::vector<std::string> &endpoints, const std::string &ip) {
signal(SIGPIPE, (sighandler_t)sigpipe_handler); MsgqToZmq bridge;
signal(SIGINT, (sighandler_t)set_do_exit); bridge.run(endpoints, ip);
signal(SIGTERM, (sighandler_t)set_do_exit); }
bool zmq_to_msgq = argc > 2; void zmq_to_msgq(const std::vector<std::string> &endpoints, const std::string &ip) {
std::string ip = zmq_to_msgq ? argv[1] : "127.0.0.1"; auto poller = std::make_unique<ZMQPoller>();
std::string whitelist_str = zmq_to_msgq ? std::string(argv[2]) : ""; auto pub_context = std::make_unique<MSGQContext>();
auto sub_context = std::make_unique<ZMQContext>();
std::map<SubSocket *, PubSocket *> sub2pub;
Poller *poller; for (auto endpoint : endpoints) {
Context *pub_context; auto pub_sock = new MSGQPubSocket();
Context *sub_context; auto sub_sock = new ZMQSubSocket();
if (zmq_to_msgq) { // republishes zmq debugging messages as msgq pub_sock->connect(pub_context.get(), endpoint);
poller = new ZMQPoller(); sub_sock->connect(sub_context.get(), endpoint, ip, false);
pub_context = new MSGQContext();
sub_context = new ZMQContext();
} else {
poller = new MSGQPoller();
pub_context = new ZMQContext();
sub_context = new MSGQContext();
}
std::map<SubSocket*, PubSocket*> sub2pub;
for (auto endpoint : get_services(whitelist_str, zmq_to_msgq)) {
PubSocket * pub_sock;
SubSocket * sub_sock;
if (zmq_to_msgq) {
pub_sock = new MSGQPubSocket();
sub_sock = new ZMQSubSocket();
} else {
pub_sock = new ZMQPubSocket();
sub_sock = new MSGQSubSocket();
}
pub_sock->connect(pub_context, endpoint);
sub_sock->connect(sub_context, endpoint, ip, false);
poller->registerSocket(sub_sock); poller->registerSocket(sub_sock);
sub2pub[sub_sock] = pub_sock; sub2pub[sub_sock] = pub_sock;
@ -76,17 +42,30 @@ int main(int argc, char** argv) {
while (!do_exit) { while (!do_exit) {
for (auto sub_sock : poller->poll(100)) { for (auto sub_sock : poller->poll(100)) {
Message * msg = sub_sock->receive(); std::unique_ptr<Message> msg(sub_sock->receive(true));
if (msg == NULL) continue; if (msg) {
int ret; sub2pub[sub_sock]->sendMessage(msg.get());
do { }
ret = sub2pub[sub_sock]->sendMessage(msg); }
} while (ret == -1 && errno == EINTR && !do_exit); }
assert(ret >= 0 || do_exit);
delete msg;
if (do_exit) break; // Clean up allocated sockets
for (auto &[sub_sock, pub_sock] : sub2pub) {
delete sub_sock;
delete pub_sock;
} }
}
int main(int argc, char **argv) {
bool is_zmq_to_msgq = argc > 2;
std::string ip = is_zmq_to_msgq ? argv[1] : "127.0.0.1";
std::string whitelist_str = is_zmq_to_msgq ? std::string(argv[2]) : "";
std::vector<std::string> endpoints = get_services(whitelist_str, is_zmq_to_msgq);
if (is_zmq_to_msgq) {
zmq_to_msgq(endpoints, ip);
} else {
msgq_to_zmq(endpoints, ip);
} }
return 0; return 0;
} }

@ -0,0 +1,134 @@
#include "cereal/messaging/msgq_to_zmq.h"
#include <cassert>
#include "common/util.h"
extern ExitHandler do_exit;
static std::string recv_zmq_msg(void *sock) {
zmq_msg_t msg;
zmq_msg_init(&msg);
std::string ret;
if (zmq_msg_recv(&msg, sock, 0) > 0) {
ret.assign((char *)zmq_msg_data(&msg), zmq_msg_size(&msg));
}
zmq_msg_close(&msg);
return ret;
}
void MsgqToZmq::run(const std::vector<std::string> &endpoints, const std::string &ip) {
zmq_context = std::make_unique<ZMQContext>();
msgq_context = std::make_unique<MSGQContext>();
// Create ZMQPubSockets for each endpoint
for (auto endpoint : endpoints) {
auto &socket_pair = socket_pairs.emplace_back();
socket_pair.endpoint = endpoint;
socket_pair.pub_sock = std::make_unique<ZMQPubSocket>();
int ret = socket_pair.pub_sock->connect(zmq_context.get(), endpoint);
if (ret != 0) {
printf("Failed to create ZMQ publisher for [%s]: %s\n", endpoint.c_str(), zmq_strerror(zmq_errno()));
return;
}
}
// Start ZMQ monitoring thread to monitor socket events
std::thread thread(&MsgqToZmq::zmqMonitorThread, this);
while (!do_exit) {
{
std::unique_lock lk(mutex);
cv.wait(lk, [this]() { return do_exit || !sub2pub.empty(); });
if (do_exit) break;
for (auto sub_sock : msgq_poller->poll(100)) {
std::unique_ptr<Message> msg(sub_sock->receive(true));
while (msg && sub2pub[sub_sock]->sendMessage(msg.get()) == -1) {
if (errno != EINTR) break;
}
}
}
util::sleep_for(1); // Give zmqMonitorThread a chance to acquire the mutex
}
thread.join();
}
void MsgqToZmq::zmqMonitorThread() {
std::vector<zmq_pollitem_t> pollitems;
// Set up ZMQ monitor for each pub socket
for (int i = 0; i < socket_pairs.size(); ++i) {
std::string addr = "inproc://op-bridge-monitor-" + std::to_string(i);
zmq_socket_monitor(socket_pairs[i].pub_sock->sock, addr.c_str(), ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED);
void *monitor_socket = zmq_socket(zmq_context->getRawContext(), ZMQ_PAIR);
zmq_connect(monitor_socket, addr.c_str());
pollitems.emplace_back(zmq_pollitem_t{.socket = monitor_socket, .events = ZMQ_POLLIN});
}
while (!do_exit) {
int ret = zmq_poll(pollitems.data(), pollitems.size(), 1000);
if (ret < 0) {
if (errno == EINTR) {
// Due to frequent EINTR signals from msgq, introduce a brief delay (200 ms)
// to reduce CPU usage during retry attempts.
util::sleep_for(200);
}
continue;
}
for (int i = 0; i < pollitems.size(); ++i) {
if (pollitems[i].revents & ZMQ_POLLIN) {
// First frame in message contains event number and value
std::string frame = recv_zmq_msg(pollitems[i].socket);
if (frame.empty()) continue;
uint16_t event_type = *(uint16_t *)(frame.data());
// Second frame in message contains event address
frame = recv_zmq_msg(pollitems[i].socket);
if (frame.empty()) continue;
std::unique_lock lk(mutex);
auto &pair = socket_pairs[i];
if (event_type & ZMQ_EVENT_ACCEPTED) {
printf("socket [%s] connected\n", pair.endpoint.c_str());
if (++pair.connected_clients == 1) {
// Create new MSGQ subscriber socket and map to ZMQ publisher
pair.sub_sock = std::make_unique<MSGQSubSocket>();
pair.sub_sock->connect(msgq_context.get(), pair.endpoint, "127.0.0.1");
sub2pub[pair.sub_sock.get()] = pair.pub_sock.get();
registerSockets();
}
} else if (event_type & ZMQ_EVENT_DISCONNECTED) {
printf("socket [%s] disconnected\n", pair.endpoint.c_str());
if (pair.connected_clients == 0 || --pair.connected_clients == 0) {
// Remove MSGQ subscriber socket from mapping and reset it
sub2pub.erase(pair.sub_sock.get());
pair.sub_sock.reset(nullptr);
registerSockets();
}
}
cv.notify_one();
}
}
}
// Clean up monitor sockets
for (int i = 0; i < pollitems.size(); ++i) {
zmq_socket_monitor(socket_pairs[i].pub_sock->sock, nullptr, 0);
zmq_close(pollitems[i].socket);
}
cv.notify_one();
}
void MsgqToZmq::registerSockets() {
msgq_poller = std::make_unique<MSGQPoller>();
for (const auto &socket_pair : socket_pairs) {
if (socket_pair.sub_sock) {
msgq_poller->registerSocket(socket_pair.sub_sock.get());
}
}
}

@ -0,0 +1,37 @@
#pragma once
#include <condition_variable>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#define private public
#include "msgq/impl_msgq.h"
#include "msgq/impl_zmq.h"
class MsgqToZmq {
public:
MsgqToZmq() {}
void run(const std::vector<std::string> &endpoints, const std::string &ip);
protected:
void registerSockets();
void zmqMonitorThread();
struct SocketPair {
std::string endpoint;
std::unique_ptr<ZMQPubSocket> pub_sock;
std::unique_ptr<MSGQSubSocket> sub_sock;
int connected_clients = 0;
};
std::unique_ptr<MSGQContext> msgq_context;
std::unique_ptr<ZMQContext> zmq_context;
std::mutex mutex;
std::condition_variable cv;
std::unique_ptr<MSGQPoller> msgq_poller;
std::map<SubSocket *, ZMQPubSocket *> sub2pub;
std::vector<SocketPair> socket_pairs;
};
Loading…
Cancel
Save