You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
152 lines
3.3 KiB
152 lines
3.3 KiB
5 years ago
|
# distutils: language = c++
|
||
|
# cython: c_string_encoding=ascii, language_level=3
|
||
|
|
||
|
import sys
|
||
|
from libcpp.string cimport string
|
||
|
from libcpp cimport bool
|
||
|
from libc cimport errno
|
||
|
|
||
|
|
||
|
from messaging cimport Context as cppContext
|
||
|
from messaging cimport SubSocket as cppSubSocket
|
||
|
from messaging cimport PubSocket as cppPubSocket
|
||
|
from messaging cimport Poller as cppPoller
|
||
|
from messaging cimport Message as cppMessage
|
||
|
|
||
|
|
||
|
class MessagingError(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class MultiplePublishersError(MessagingError):
|
||
|
pass
|
||
|
|
||
|
|
||
|
cdef class Context:
|
||
|
cdef cppContext * context
|
||
|
|
||
|
def __cinit__(self):
|
||
|
self.context = cppContext.create()
|
||
|
|
||
|
def term(self):
|
||
|
del self.context
|
||
|
self.context = NULL
|
||
|
|
||
|
def __dealloc__(self):
|
||
|
pass
|
||
|
# Deleting the context will hang if sockets are still active
|
||
|
# TODO: Figure out a way to make sure the context is closed last
|
||
|
# del self.context
|
||
|
|
||
|
|
||
|
cdef class Poller:
|
||
|
cdef cppPoller * poller
|
||
|
cdef list sub_sockets
|
||
|
|
||
|
def __cinit__(self):
|
||
|
self.sub_sockets = []
|
||
|
self.poller = cppPoller.create()
|
||
|
|
||
|
def __dealloc__(self):
|
||
|
del self.poller
|
||
|
|
||
|
def registerSocket(self, SubSocket socket):
|
||
|
self.sub_sockets.append(socket)
|
||
|
self.poller.registerSocket(socket.socket)
|
||
|
|
||
|
def poll(self, timeout):
|
||
|
sockets = []
|
||
|
cdef int t = timeout
|
||
|
|
||
|
with nogil:
|
||
|
result = self.poller.poll(t)
|
||
|
|
||
|
for s in result:
|
||
|
socket = SubSocket()
|
||
|
socket.setPtr(s)
|
||
|
sockets.append(socket)
|
||
|
|
||
|
return sockets
|
||
|
|
||
|
cdef class SubSocket:
|
||
|
cdef cppSubSocket * socket
|
||
|
cdef bool is_owner
|
||
|
|
||
|
def __cinit__(self):
|
||
|
self.socket = cppSubSocket.create()
|
||
|
self.is_owner = True
|
||
|
|
||
|
if self.socket == NULL:
|
||
|
raise MessagingError
|
||
|
|
||
|
def __dealloc__(self):
|
||
|
if self.is_owner:
|
||
|
del self.socket
|
||
|
|
||
|
cdef setPtr(self, cppSubSocket * ptr):
|
||
|
if self.is_owner:
|
||
|
del self.socket
|
||
|
|
||
|
self.is_owner = False
|
||
|
self.socket = ptr
|
||
|
|
||
|
def connect(self, Context context, string endpoint, string address=b"127.0.0.1", bool conflate=False):
|
||
|
r = self.socket.connect(context.context, endpoint, address, conflate)
|
||
|
|
||
|
if r != 0:
|
||
|
if errno.errno == errno.EADDRINUSE:
|
||
|
raise MultiplePublishersError
|
||
|
else:
|
||
|
raise MessagingError
|
||
|
|
||
|
def setTimeout(self, int timeout):
|
||
|
self.socket.setTimeout(timeout)
|
||
|
|
||
|
def receive(self, bool non_blocking=False):
|
||
|
msg = self.socket.receive(non_blocking)
|
||
|
|
||
|
if msg == NULL:
|
||
|
# If a blocking read returns no message check errno if SIGINT was caught in the C++ code
|
||
|
if errno.errno == errno.EINTR:
|
||
|
print("SIGINT received, exiting")
|
||
|
sys.exit(1)
|
||
|
|
||
|
return None
|
||
|
else:
|
||
|
sz = msg.getSize()
|
||
|
m = msg.getData()[:sz]
|
||
|
del msg
|
||
|
|
||
|
return m
|
||
|
|
||
|
|
||
|
cdef class PubSocket:
|
||
|
cdef cppPubSocket * socket
|
||
|
|
||
|
def __cinit__(self):
|
||
|
self.socket = cppPubSocket.create()
|
||
|
if self.socket == NULL:
|
||
|
raise MessagingError
|
||
|
|
||
|
def __dealloc__(self):
|
||
|
del self.socket
|
||
|
|
||
|
def connect(self, Context context, string endpoint):
|
||
|
r = self.socket.connect(context.context, endpoint)
|
||
|
|
||
|
if r != 0:
|
||
|
if errno.errno == errno.EADDRINUSE:
|
||
|
raise MultiplePublishersError
|
||
|
else:
|
||
|
raise MessagingError
|
||
|
|
||
|
def send(self, string data):
|
||
|
length = len(data)
|
||
|
r = self.socket.send(<char*>data.c_str(), length)
|
||
|
|
||
|
if r != length:
|
||
|
if errno.errno == errno.EADDRINUSE:
|
||
|
raise MultiplePublishersError
|
||
|
else:
|
||
|
raise MessagingError
|