# include "cereal/messaging/msgq_to_zmq.h"
# include <cassert>
# include "common/util.h"
extern ExitHandler do_exit ;
// Max messages to process per socket per poll
constexpr int MAX_MESSAGES_PER_SOCKET = 50 ;
static std : : string recv_zmq_msg ( void * sock ) {
zmq_msg_t msg ;
zmq_msg_init ( & msg ) ;
std : : string ret ;
if ( zmq_msg_recv ( & msg , sock , 0 ) > 0 ) {
ret . assign ( ( char * ) zmq_msg_data ( & msg ) , zmq_msg_size ( & msg ) ) ;
}
zmq_msg_close ( & msg ) ;
return ret ;
}
void MsgqToZmq : : run ( const std : : vector < std : : string > & endpoints , const std : : string & ip ) {
zmq_context = std : : make_unique < ZMQContext > ( ) ;
msgq_context = std : : make_unique < MSGQContext > ( ) ;
// Create ZMQPubSockets for each endpoint
for ( const auto & endpoint : endpoints ) {
auto & socket_pair = socket_pairs . emplace_back ( ) ;
socket_pair . endpoint = endpoint ;
socket_pair . pub_sock = std : : make_unique < ZMQPubSocket > ( ) ;
int ret = socket_pair . pub_sock - > connect ( zmq_context . get ( ) , endpoint ) ;
if ( ret ! = 0 ) {
printf ( " Failed to create ZMQ publisher for [%s]: %s \n " , endpoint . c_str ( ) , zmq_strerror ( zmq_errno ( ) ) ) ;
return ;
}
}
// Start ZMQ monitoring thread to monitor socket events
std : : thread thread ( & MsgqToZmq : : zmqMonitorThread , this ) ;
// Main loop for processing messages
while ( ! do_exit ) {
{
std : : unique_lock lk ( mutex ) ;
cv . wait ( lk , [ this ] ( ) { return do_exit | | ! sub2pub . empty ( ) ; } ) ;
if ( do_exit ) break ;
for ( auto sub_sock : msgq_poller - > poll ( 100 ) ) {
// Process messages for each socket
ZMQPubSocket * pub_sock = sub2pub . at ( sub_sock ) ;
for ( int i = 0 ; i < MAX_MESSAGES_PER_SOCKET ; + + i ) {
auto msg = std : : unique_ptr < Message > ( sub_sock - > receive ( true ) ) ;
if ( ! msg ) break ;
while ( pub_sock - > sendMessage ( msg . get ( ) ) = = - 1 ) {
if ( errno ! = EINTR ) break ;
}
}
}
}
util : : sleep_for ( 1 ) ; // Give zmqMonitorThread a chance to acquire the mutex
}
thread . join ( ) ;
}
void MsgqToZmq : : zmqMonitorThread ( ) {
std : : vector < zmq_pollitem_t > pollitems ;
// Set up ZMQ monitor for each pub socket
for ( int i = 0 ; i < socket_pairs . size ( ) ; + + i ) {
std : : string addr = " inproc://op-bridge-monitor- " + std : : to_string ( i ) ;
zmq_socket_monitor ( socket_pairs [ i ] . pub_sock - > sock , addr . c_str ( ) , ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED ) ;
void * monitor_socket = zmq_socket ( zmq_context - > getRawContext ( ) , ZMQ_PAIR ) ;
zmq_connect ( monitor_socket , addr . c_str ( ) ) ;
pollitems . emplace_back ( zmq_pollitem_t { . socket = monitor_socket , . events = ZMQ_POLLIN } ) ;
}
while ( ! do_exit ) {
int ret = zmq_poll ( pollitems . data ( ) , pollitems . size ( ) , 1000 ) ;
if ( ret < 0 ) {
if ( errno = = EINTR ) {
// Due to frequent EINTR signals from msgq, introduce a brief delay (200 ms)
// to reduce CPU usage during retry attempts.
util : : sleep_for ( 200 ) ;
}
continue ;
}
for ( int i = 0 ; i < pollitems . size ( ) ; + + i ) {
if ( pollitems [ i ] . revents & ZMQ_POLLIN ) {
// First frame in message contains event number and value
std : : string frame = recv_zmq_msg ( pollitems [ i ] . socket ) ;
if ( frame . empty ( ) ) continue ;
uint16_t event_type = * ( uint16_t * ) ( frame . data ( ) ) ;
// Second frame in message contains event address
frame = recv_zmq_msg ( pollitems [ i ] . socket ) ;
if ( frame . empty ( ) ) continue ;
std : : unique_lock lk ( mutex ) ;
auto & pair = socket_pairs [ i ] ;
if ( event_type & ZMQ_EVENT_ACCEPTED ) {
printf ( " socket [%s] connected \n " , pair . endpoint . c_str ( ) ) ;
if ( + + pair . connected_clients = = 1 ) {
// Create new MSGQ subscriber socket and map to ZMQ publisher
pair . sub_sock = std : : make_unique < MSGQSubSocket > ( ) ;
pair . sub_sock - > connect ( msgq_context . get ( ) , pair . endpoint , " 127.0.0.1 " ) ;
sub2pub [ pair . sub_sock . get ( ) ] = pair . pub_sock . get ( ) ;
registerSockets ( ) ;
}
} else if ( event_type & ZMQ_EVENT_DISCONNECTED ) {
printf ( " socket [%s] disconnected \n " , pair . endpoint . c_str ( ) ) ;
if ( pair . connected_clients = = 0 | | - - pair . connected_clients = = 0 ) {
// Remove MSGQ subscriber socket from mapping and reset it
sub2pub . erase ( pair . sub_sock . get ( ) ) ;
pair . sub_sock . reset ( nullptr ) ;
registerSockets ( ) ;
}
}
cv . notify_one ( ) ;
}
}
}
// Clean up monitor sockets
for ( int i = 0 ; i < pollitems . size ( ) ; + + i ) {
zmq_socket_monitor ( socket_pairs [ i ] . pub_sock - > sock , nullptr , 0 ) ;
zmq_close ( pollitems [ i ] . socket ) ;
}
cv . notify_one ( ) ;
}
void MsgqToZmq : : registerSockets ( ) {
msgq_poller = std : : make_unique < MSGQPoller > ( ) ;
for ( const auto & socket_pair : socket_pairs ) {
if ( socket_pair . sub_sock ) {
msgq_poller - > registerSocket ( socket_pair . sub_sock . get ( ) ) ;
}
}
}