commit
5087c7684e
35 changed files with 19871 additions and 71 deletions
@ -0,0 +1 @@ |
||||
.sconsign.dblite |
@ -0,0 +1,19 @@ |
||||
from ubuntu:16.04 |
||||
|
||||
RUN apt-get update && apt-get install -y libzmq3-dev clang wget git autoconf libtool curl make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python-openssl |
||||
|
||||
RUN curl -L https://github.com/pyenv/pyenv-installer/raw/master/bin/pyenv-installer | bash |
||||
ENV PATH="/root/.pyenv/bin:/root/.pyenv/shims:${PATH}" |
||||
RUN pyenv install 3.7.3 |
||||
RUN pyenv global 3.7.3 |
||||
RUN pyenv rehash |
||||
RUN pip3 install pyyaml==5.1.2 Cython==0.29.14 scons==3.1.1 pycapnp==0.6.4 |
||||
|
||||
WORKDIR /project/cereal |
||||
COPY install_capnp.sh . |
||||
RUN ./install_capnp.sh |
||||
|
||||
ENV PYTHONPATH=/project |
||||
|
||||
COPY . . |
||||
RUN scons -c && scons -j$(nproc) |
@ -1,62 +0,0 @@ |
||||
PWD := $(shell pwd)
|
||||
|
||||
SRCS := log.capnp car.capnp
|
||||
|
||||
GENS := gen/cpp/car.capnp.c++ gen/cpp/log.capnp.c++
|
||||
JS := gen/js/car.capnp.js gen/js/log.capnp.js
|
||||
|
||||
UNAME_M ?= $(shell uname -m)
|
||||
|
||||
GENS += gen/c/car.capnp.c gen/c/log.capnp.c gen/c/include/c++.capnp.h gen/c/include/java.capnp.h
|
||||
|
||||
ifeq ($(UNAME_M),x86_64) |
||||
|
||||
ifneq (, $(shell which capnpc-java)) |
||||
GENS += gen/java/Car.java gen/java/Log.java
|
||||
else |
||||
$(warning capnpc-java not found, skipping java build) |
||||
endif |
||||
|
||||
endif |
||||
|
||||
|
||||
ifeq ($(UNAME_M),aarch64) |
||||
CAPNPC=PATH=$(PWD)/../phonelibs/capnp-cpp/aarch64/bin/:$$PATH capnpc
|
||||
else |
||||
CAPNPC=capnpc
|
||||
endif |
||||
|
||||
.PHONY: all |
||||
all: $(GENS) |
||||
js: $(JS) |
||||
|
||||
.PHONY: clean |
||||
clean: |
||||
rm -rf gen
|
||||
rm -rf node_modules
|
||||
rm -rf package-lock.json
|
||||
|
||||
gen/c/%.capnp.c: %.capnp |
||||
@echo "[ CAPNPC C ] $@"
|
||||
mkdir -p gen/c/
|
||||
$(CAPNPC) '$<' -o c:gen/c/
|
||||
|
||||
gen/js/%.capnp.js: %.capnp |
||||
@echo "[ CAPNPC JavaScript ] $@"
|
||||
mkdir -p gen/js/
|
||||
sh ./generate_javascript.sh
|
||||
|
||||
gen/cpp/%.capnp.c++: %.capnp |
||||
@echo "[ CAPNPC C++ ] $@"
|
||||
mkdir -p gen/cpp/
|
||||
$(CAPNPC) '$<' -o c++:gen/cpp/
|
||||
|
||||
gen/java/Car.java gen/java/Log.java: $(SRCS) |
||||
@echo "[ CAPNPC java ] $@"
|
||||
mkdir -p gen/java/
|
||||
$(CAPNPC) $^ -o java:gen/java
|
||||
|
||||
# c-capnproto needs some empty headers
|
||||
gen/c/include/c++.capnp.h gen/c/include/java.capnp.h: |
||||
mkdir -p gen/c/include
|
||||
touch '$@'
|
@ -0,0 +1,68 @@ |
||||
Import('env', 'arch', 'zmq') |
||||
|
||||
gen_dir = Dir('gen') |
||||
messaging_dir = Dir('messaging') |
||||
|
||||
# TODO: remove src-prefix and cereal from command string. can we set working directory? |
||||
env.Command(["gen/c/include/c++.capnp.h", "gen/c/include/java.capnp.h"], [], "mkdir -p " + gen_dir.path + "/c/include && touch $TARGETS") |
||||
env.Command( |
||||
['gen/c/car.capnp.c', 'gen/c/log.capnp.c', 'gen/c/car.capnp.h', 'gen/c/log.capnp.h'], |
||||
['car.capnp', 'log.capnp'], |
||||
'capnpc $SOURCES --src-prefix=cereal -o c:' + gen_dir.path + '/c/') |
||||
env.Command( |
||||
['gen/cpp/car.capnp.c++', 'gen/cpp/log.capnp.c++', 'gen/cpp/car.capnp.h', 'gen/cpp/log.capnp.h'], |
||||
['car.capnp', 'log.capnp'], |
||||
'capnpc $SOURCES --src-prefix=cereal -o c++:' + gen_dir.path + '/cpp/') |
||||
import shutil |
||||
if shutil.which('capnpc-java'): |
||||
env.Command( |
||||
['gen/java/Car.java', 'gen/java/Log.java'], |
||||
['car.capnp', 'log.capnp'], |
||||
'capnpc $SOURCES --src-prefix=cereal -o java:' + gen_dir.path + '/java/') |
||||
|
||||
# TODO: remove non shared cereal and messaging |
||||
cereal_objects = env.SharedObject([ |
||||
'gen/c/car.capnp.c', |
||||
'gen/c/log.capnp.c', |
||||
'gen/cpp/car.capnp.c++', |
||||
'gen/cpp/log.capnp.c++', |
||||
]) |
||||
|
||||
env.Library('cereal', cereal_objects) |
||||
env.SharedLibrary('cereal_shared', cereal_objects) |
||||
|
||||
cereal_dir = Dir('.') |
||||
services_h = env.Command( |
||||
['services.h'], |
||||
['service_list.yaml', 'services.py'], |
||||
'python3 ' + cereal_dir.path + '/services.py > $TARGET') |
||||
|
||||
messaging_objects = env.SharedObject([ |
||||
'messaging/messaging.cc', |
||||
'messaging/impl_zmq.cc', |
||||
'messaging/impl_msgq.cc', |
||||
'messaging/msgq.cc', |
||||
]) |
||||
|
||||
messaging_lib = env.Library('messaging', messaging_objects) |
||||
Depends('messaging/impl_zmq.cc', services_h) |
||||
|
||||
# note, this rebuilds the deps shared, zmq is statically linked to make APK happy |
||||
# TODO: get APK to load system zmq to remove the static link |
||||
shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else [] |
||||
env.SharedLibrary('messaging_shared', messaging_objects, LIBS=shared_lib_shared_lib) |
||||
|
||||
env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging_lib, 'zmq']) |
||||
Depends('messaging/bridge.cc', services_h) |
||||
|
||||
# different target? |
||||
#env.Program('messaging/demo', ['messaging/demo.cc'], LIBS=[messaging_lib, 'zmq']) |
||||
|
||||
|
||||
env.Command(['messaging/messaging_pyx.so'], |
||||
[messaging_lib, 'messaging/messaging_pyx_setup.py', 'messaging/messaging_pyx.pyx', 'messaging/messaging.pxd'], |
||||
"cd " + messaging_dir.path + " && python3 messaging_pyx_setup.py build_ext --inplace") |
||||
|
||||
|
||||
if GetOption('test'): |
||||
env.Program('messaging/test_runner', ['messaging/test_runner.cc', 'messaging/msgq_tests.cc'], LIBS=[messaging_lib]) |
@ -0,0 +1,49 @@ |
||||
import os |
||||
import subprocess |
||||
|
||||
zmq = 'zmq' |
||||
arch = subprocess.check_output(["uname", "-m"], encoding='utf8').rstrip() |
||||
|
||||
cereal_dir = Dir('.') |
||||
|
||||
cpppath = [ |
||||
cereal_dir, |
||||
'/usr/lib/include', |
||||
] |
||||
|
||||
AddOption('--test', |
||||
action='store_true', |
||||
help='build test files') |
||||
|
||||
AddOption('--asan', |
||||
action='store_true', |
||||
help='turn on ASAN') |
||||
|
||||
ccflags_asan = ["-fsanitize=address", "-fno-omit-frame-pointer"] if GetOption('asan') else [] |
||||
ldflags_asan = ["-fsanitize=address"] if GetOption('asan') else [] |
||||
|
||||
env = Environment( |
||||
ENV=os.environ, |
||||
CC='clang', |
||||
CXX='clang++', |
||||
CCFLAGS=[ |
||||
"-g", |
||||
"-fPIC", |
||||
"-O2", |
||||
"-Werror=implicit-function-declaration", |
||||
"-Werror=incompatible-pointer-types", |
||||
"-Werror=int-conversion", |
||||
"-Werror=return-type", |
||||
"-Werror=format-extra-args", |
||||
] + ccflags_asan, |
||||
LDFLAGS=ldflags_asan, |
||||
LINKFLAGS=ldflags_asan, |
||||
|
||||
CFLAGS="-std=gnu11", |
||||
CXXFLAGS="-std=c++14", |
||||
CPPPATH=cpppath, |
||||
) |
||||
|
||||
|
||||
Export('env', 'zmq', 'arch') |
||||
SConscript(['SConscript']) |
@ -0,0 +1,14 @@ |
||||
pr: none |
||||
|
||||
pool: |
||||
vmImage: 'ubuntu-16.04' |
||||
|
||||
steps: |
||||
- script: | |
||||
set -e |
||||
docker build -t cereal . |
||||
docker run cereal bash -c "scons --test --asan -j$(nproc) && messaging/test_runner" |
||||
docker run cereal bash -c "ZMQ=1 python -m unittest discover ." |
||||
docker run cereal bash -c "MSGQ=1 python -m unittest discover ." |
||||
|
||||
displayName: 'Run Tests' |
@ -0,0 +1,10 @@ |
||||
demo |
||||
bridge |
||||
test_runner |
||||
*.o |
||||
*.os |
||||
*.d |
||||
*.a |
||||
*.so |
||||
messaging_pyx.cpp |
||||
build/ |
@ -0,0 +1,219 @@ |
||||
# must be build with scons |
||||
from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error |
||||
from .messaging_pyx import MultiplePublishersError, MessagingError # pylint: disable=no-name-in-module, import-error |
||||
|
||||
assert MultiplePublishersError |
||||
assert MessagingError |
||||
|
||||
from cereal import log |
||||
from cereal.services import service_list |
||||
|
||||
# sec_since_boot is faster, but allow to run standalone too |
||||
try: |
||||
from common.realtime import sec_since_boot |
||||
except ImportError: |
||||
import time |
||||
sec_since_boot = time.time |
||||
print("Warning, using python time.time() instead of faster sec_since_boot") |
||||
|
||||
context = Context() |
||||
|
||||
def new_message(): |
||||
dat = log.Event.new_message() |
||||
dat.logMonoTime = int(sec_since_boot() * 1e9) |
||||
dat.valid = True |
||||
return dat |
||||
|
||||
def pub_sock(endpoint): |
||||
sock = PubSocket() |
||||
sock.connect(context, endpoint) |
||||
return sock |
||||
|
||||
def sub_sock(endpoint, poller=None, addr="127.0.0.1", conflate=False, timeout=None): |
||||
sock = SubSocket() |
||||
addr = addr.encode('utf8') |
||||
sock.connect(context, endpoint, addr, conflate) |
||||
|
||||
if timeout is not None: |
||||
sock.setTimeout(timeout) |
||||
|
||||
if poller is not None: |
||||
poller.registerSocket(sock) |
||||
return sock |
||||
|
||||
|
||||
def drain_sock_raw(sock, wait_for_one=False): |
||||
"""Receive all message currently available on the queue""" |
||||
ret = [] |
||||
while 1: |
||||
if wait_for_one and len(ret) == 0: |
||||
dat = sock.receive() |
||||
else: |
||||
dat = sock.receive(non_blocking=True) |
||||
|
||||
if dat is None: |
||||
break |
||||
|
||||
ret.append(dat) |
||||
|
||||
return ret |
||||
|
||||
def drain_sock(sock, wait_for_one=False): |
||||
"""Receive all message currently available on the queue""" |
||||
ret = [] |
||||
while 1: |
||||
if wait_for_one and len(ret) == 0: |
||||
dat = sock.receive() |
||||
else: |
||||
dat = sock.receive(non_blocking=True) |
||||
|
||||
if dat is None: # Timeout hit |
||||
break |
||||
|
||||
dat = log.Event.from_bytes(dat) |
||||
ret.append(dat) |
||||
|
||||
return ret |
||||
|
||||
|
||||
# TODO: print when we drop packets? |
||||
def recv_sock(sock, wait=False): |
||||
"""Same as drain sock, but only returns latest message. Consider using conflate instead.""" |
||||
dat = None |
||||
|
||||
while 1: |
||||
if wait and dat is None: |
||||
rcv = sock.receive() |
||||
else: |
||||
rcv = sock.receive(non_blocking=True) |
||||
|
||||
if rcv is None: # Timeout hit |
||||
break |
||||
|
||||
dat = rcv |
||||
|
||||
if dat is not None: |
||||
dat = log.Event.from_bytes(dat) |
||||
|
||||
return dat |
||||
|
||||
def recv_one(sock): |
||||
dat = sock.receive() |
||||
if dat is not None: |
||||
dat = log.Event.from_bytes(dat) |
||||
return dat |
||||
|
||||
def recv_one_or_none(sock): |
||||
dat = sock.receive(non_blocking=True) |
||||
if dat is not None: |
||||
dat = log.Event.from_bytes(dat) |
||||
return dat |
||||
|
||||
def recv_one_retry(sock): |
||||
"""Keep receiving until we get a message""" |
||||
while True: |
||||
dat = sock.receive() |
||||
if dat is not None: |
||||
return log.Event.from_bytes(dat) |
||||
|
||||
def get_one_can(logcan): |
||||
while True: |
||||
can = recv_one_retry(logcan) |
||||
if len(can.can) > 0: |
||||
return can |
||||
|
||||
class SubMaster(): |
||||
def __init__(self, services, ignore_alive=None, addr="127.0.0.1"): |
||||
self.poller = Poller() |
||||
self.frame = -1 |
||||
self.updated = {s : False for s in services} |
||||
self.rcv_time = {s : 0. for s in services} |
||||
self.rcv_frame = {s : 0 for s in services} |
||||
self.alive = {s : False for s in services} |
||||
self.sock = {} |
||||
self.freq = {} |
||||
self.data = {} |
||||
self.logMonoTime = {} |
||||
self.valid = {} |
||||
|
||||
if ignore_alive is not None: |
||||
self.ignore_alive = ignore_alive |
||||
else: |
||||
self.ignore_alive = [] |
||||
|
||||
for s in services: |
||||
if addr is not None: |
||||
self.sock[s] = sub_sock(s, poller=self.poller, addr=addr, conflate=True) |
||||
self.freq[s] = service_list[s].frequency |
||||
|
||||
data = new_message() |
||||
if s in ['can', 'sensorEvents', 'liveTracks', 'sendCan', |
||||
'ethernetData', 'cellInfo', 'wifiScan', |
||||
'trafficEvents', 'orbObservation', 'carEvents']: |
||||
data.init(s, 0) |
||||
else: |
||||
data.init(s) |
||||
self.data[s] = getattr(data, s) |
||||
self.logMonoTime[s] = 0 |
||||
self.valid[s] = data.valid |
||||
|
||||
def __getitem__(self, s): |
||||
return self.data[s] |
||||
|
||||
def update(self, timeout=1000): |
||||
msgs = [] |
||||
for sock in self.poller.poll(timeout): |
||||
msgs.append(recv_one_or_none(sock)) |
||||
self.update_msgs(sec_since_boot(), msgs) |
||||
|
||||
def update_msgs(self, cur_time, msgs): |
||||
# TODO: add optional input that specify the service to wait for |
||||
self.frame += 1 |
||||
self.updated = dict.fromkeys(self.updated, False) |
||||
for msg in msgs: |
||||
if msg is None: |
||||
continue |
||||
|
||||
s = msg.which() |
||||
self.updated[s] = True |
||||
self.rcv_time[s] = cur_time |
||||
self.rcv_frame[s] = self.frame |
||||
self.data[s] = getattr(msg, s) |
||||
self.logMonoTime[s] = msg.logMonoTime |
||||
self.valid[s] = msg.valid |
||||
|
||||
for s in self.data: |
||||
# arbitrary small number to avoid float comparison. If freq is 0, we can skip the check |
||||
if self.freq[s] > 1e-5: |
||||
# alive if delay is within 10x the expected frequency |
||||
self.alive[s] = (cur_time - self.rcv_time[s]) < (10. / self.freq[s]) |
||||
else: |
||||
self.alive[s] = True |
||||
|
||||
def all_alive(self, service_list=None): |
||||
if service_list is None: # check all |
||||
service_list = self.alive.keys() |
||||
return all(self.alive[s] for s in service_list if s not in self.ignore_alive) |
||||
|
||||
def all_valid(self, service_list=None): |
||||
if service_list is None: # check all |
||||
service_list = self.valid.keys() |
||||
return all(self.valid[s] for s in service_list) |
||||
|
||||
def all_alive_and_valid(self, service_list=None): |
||||
if service_list is None: # check all |
||||
service_list = self.alive.keys() |
||||
return self.all_alive(service_list=service_list) and self.all_valid(service_list=service_list) |
||||
|
||||
|
||||
class PubMaster(): |
||||
def __init__(self, services): |
||||
self.sock = {} |
||||
for s in services: |
||||
self.sock[s] = pub_sock(s) |
||||
|
||||
def send(self, s, dat): |
||||
# accept either bytes or capnp builder |
||||
if not isinstance(dat, bytes): |
||||
dat = dat.to_bytes() |
||||
self.sock[s].send(dat) |
@ -0,0 +1,62 @@ |
||||
#include <iostream> |
||||
#include <string> |
||||
#include <cassert> |
||||
#include <csignal> |
||||
#include <map> |
||||
|
||||
#include "services.h" |
||||
|
||||
#include "impl_msgq.hpp" |
||||
#include "impl_zmq.hpp" |
||||
|
||||
void sigpipe_handler(int sig) { |
||||
assert(sig == SIGPIPE); |
||||
std::cout << "SIGPIPE received" << std::endl; |
||||
} |
||||
|
||||
static std::vector<std::string> get_services() { |
||||
std::vector<std::string> name_list; |
||||
|
||||
for (const auto& it : services) { |
||||
std::string name = it.name; |
||||
if (name == "plusFrame" || name == "uiLayoutState") continue; |
||||
name_list.push_back(name); |
||||
} |
||||
|
||||
return name_list; |
||||
} |
||||
|
||||
|
||||
int main(void){ |
||||
signal(SIGPIPE, (sighandler_t)sigpipe_handler); |
||||
|
||||
auto endpoints = get_services(); |
||||
|
||||
std::map<SubSocket*, PubSocket*> sub2pub; |
||||
|
||||
Context *zmq_context = new ZMQContext(); |
||||
Context *msgq_context = new MSGQContext(); |
||||
Poller *poller = new MSGQPoller(); |
||||
|
||||
for (auto endpoint: endpoints){ |
||||
SubSocket * msgq_sock = new MSGQSubSocket(); |
||||
msgq_sock->connect(msgq_context, endpoint, "127.0.0.1", false); |
||||
poller->registerSocket(msgq_sock); |
||||
|
||||
PubSocket * zmq_sock = new ZMQPubSocket(); |
||||
zmq_sock->connect(zmq_context, endpoint); |
||||
|
||||
sub2pub[msgq_sock] = zmq_sock; |
||||
} |
||||
|
||||
|
||||
while (true){ |
||||
for (auto sub_sock : poller->poll(100)){ |
||||
Message * msg = sub_sock->receive(); |
||||
if (msg == NULL) continue; |
||||
sub2pub[sub_sock]->sendMessage(msg); |
||||
delete msg; |
||||
} |
||||
} |
||||
return 0; |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,50 @@ |
||||
#include <iostream> |
||||
#include <cstddef> |
||||
#include <chrono> |
||||
#include <thread> |
||||
#include <cassert> |
||||
|
||||
#include "messaging.hpp" |
||||
#include "impl_zmq.hpp" |
||||
|
||||
#define MSGS 1e5 |
||||
|
||||
int main() { |
||||
Context * c = Context::create(); |
||||
SubSocket * sub_sock = SubSocket::create(c, "controlsState"); |
||||
PubSocket * pub_sock = PubSocket::create(c, "controlsState"); |
||||
|
||||
char data[8]; |
||||
|
||||
Poller * poller = Poller::create({sub_sock}); |
||||
|
||||
auto start = std::chrono::steady_clock::now(); |
||||
|
||||
for (uint64_t i = 0; i < MSGS; i++){ |
||||
*(uint64_t*)data = i; |
||||
pub_sock->send(data, 8); |
||||
|
||||
auto r = poller->poll(100); |
||||
|
||||
for (auto p : r){ |
||||
Message * m = p->receive(); |
||||
uint64_t ii = *(uint64_t*)m->getData(); |
||||
assert(i == ii); |
||||
delete m; |
||||
} |
||||
} |
||||
|
||||
|
||||
auto end = std::chrono::steady_clock::now(); |
||||
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count() / 1e9; |
||||
double throughput = ((double) MSGS / (double) elapsed); |
||||
std::cout << throughput << " msg/s" << std::endl; |
||||
|
||||
delete poller; |
||||
delete sub_sock; |
||||
delete pub_sock; |
||||
delete c; |
||||
|
||||
|
||||
return 0; |
||||
} |
@ -0,0 +1,30 @@ |
||||
import time |
||||
|
||||
from messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error |
||||
|
||||
MSGS = 1e5 |
||||
|
||||
if __name__ == "__main__": |
||||
c = Context() |
||||
sub_sock = SubSocket() |
||||
pub_sock = PubSocket() |
||||
|
||||
sub_sock.connect(c, "controlsState") |
||||
pub_sock.connect(c, "controlsState") |
||||
|
||||
|
||||
poller = Poller() |
||||
poller.registerSocket(sub_sock) |
||||
|
||||
t = time.time() |
||||
for i in range(int(MSGS)): |
||||
bts = i.to_bytes(4, 'little') |
||||
pub_sock.send(bts) |
||||
|
||||
for s in poller.poll(100): |
||||
dat = s.receive() |
||||
ii = int.from_bytes(dat, 'little') |
||||
assert(i == ii) |
||||
|
||||
dt = time.time() - t |
||||
print("%.1f msg/s" % (MSGS / dt)) |
@ -0,0 +1,190 @@ |
||||
#include <cassert> |
||||
#include <cstring> |
||||
#include <iostream> |
||||
#include <cstdlib> |
||||
#include <csignal> |
||||
#include <cerrno> |
||||
|
||||
|
||||
#include "impl_msgq.hpp" |
||||
|
||||
volatile sig_atomic_t msgq_do_exit = 0; |
||||
|
||||
void sig_handler(int signal) { |
||||
assert(signal == SIGINT || signal == SIGTERM); |
||||
msgq_do_exit = 1; |
||||
} |
||||
|
||||
|
||||
MSGQContext::MSGQContext() { |
||||
} |
||||
|
||||
MSGQContext::~MSGQContext() { |
||||
} |
||||
|
||||
void MSGQMessage::init(size_t sz) { |
||||
size = sz; |
||||
data = new char[size]; |
||||
} |
||||
|
||||
void MSGQMessage::init(char * d, size_t sz) { |
||||
size = sz; |
||||
data = new char[size]; |
||||
memcpy(data, d, size); |
||||
} |
||||
|
||||
void MSGQMessage::takeOwnership(char * d, size_t sz) { |
||||
size = sz; |
||||
data = d; |
||||
} |
||||
|
||||
void MSGQMessage::close() { |
||||
if (size > 0){ |
||||
delete[] data; |
||||
} |
||||
size = 0; |
||||
} |
||||
|
||||
MSGQMessage::~MSGQMessage() { |
||||
this->close(); |
||||
} |
||||
|
||||
|
||||
int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){ |
||||
assert(context); |
||||
assert(address == "127.0.0.1"); |
||||
|
||||
q = new msgq_queue_t; |
||||
int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE); |
||||
if (r != 0){ |
||||
return r; |
||||
} |
||||
|
||||
msgq_init_subscriber(q); |
||||
|
||||
if (conflate){ |
||||
q->read_conflate = true; |
||||
} |
||||
|
||||
timeout = -1; |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
|
||||
Message * MSGQSubSocket::receive(bool non_blocking){ |
||||
msgq_do_exit = 0; |
||||
|
||||
void (*prev_handler_sigint)(int); |
||||
void (*prev_handler_sigterm)(int); |
||||
if (!non_blocking){ |
||||
prev_handler_sigint = std::signal(SIGINT, sig_handler); |
||||
prev_handler_sigterm = std::signal(SIGTERM, sig_handler); |
||||
} |
||||
|
||||
msgq_msg_t msg; |
||||
|
||||
MSGQMessage *r = NULL; |
||||
r = NULL; |
||||
|
||||
int rc = msgq_msg_recv(&msg, q); |
||||
|
||||
// Hack to implement blocking read with a poller. Don't use this
|
||||
while (!non_blocking && rc == 0 && msgq_do_exit == 0){ |
||||
msgq_pollitem_t items[1]; |
||||
items[0].q = q; |
||||
|
||||
int t = (timeout != -1) ? timeout : 100; |
||||
|
||||
int n = msgq_poll(items, 1, t); |
||||
rc = msgq_msg_recv(&msg, q); |
||||
|
||||
// The poll indicated a message was ready, but the receive failed. Try again
|
||||
if (n == 1 && rc == 0){ |
||||
continue; |
||||
} |
||||
|
||||
if (timeout != -1){ |
||||
break; |
||||
} |
||||
} |
||||
|
||||
if (rc > 0){ |
||||
r = new MSGQMessage; |
||||
r->takeOwnership(msg.data, msg.size); |
||||
} |
||||
errno = msgq_do_exit ? EINTR : 0; |
||||
|
||||
if (!non_blocking){ |
||||
std::signal(SIGINT, prev_handler_sigint); |
||||
std::signal(SIGTERM, prev_handler_sigterm); |
||||
} |
||||
|
||||
return (Message*)r; |
||||
} |
||||
|
||||
void MSGQSubSocket::setTimeout(int t){ |
||||
timeout = t; |
||||
} |
||||
|
||||
MSGQSubSocket::~MSGQSubSocket(){ |
||||
if (q != NULL){ |
||||
msgq_close_queue(q); |
||||
delete q; |
||||
} |
||||
} |
||||
|
||||
int MSGQPubSocket::connect(Context *context, std::string endpoint){ |
||||
assert(context); |
||||
|
||||
q = new msgq_queue_t; |
||||
msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE); |
||||
msgq_init_publisher(q); |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
int MSGQPubSocket::sendMessage(Message *message){ |
||||
msgq_msg_t msg; |
||||
msg.data = message->getData(); |
||||
msg.size = message->getSize(); |
||||
|
||||
return msgq_msg_send(&msg, q); |
||||
} |
||||
|
||||
int MSGQPubSocket::send(char *data, size_t size){ |
||||
msgq_msg_t msg; |
||||
msg.data = data; |
||||
msg.size = size; |
||||
|
||||
return msgq_msg_send(&msg, q); |
||||
} |
||||
|
||||
MSGQPubSocket::~MSGQPubSocket(){ |
||||
if (q != NULL){ |
||||
msgq_close_queue(q); |
||||
delete q; |
||||
} |
||||
} |
||||
|
||||
|
||||
void MSGQPoller::registerSocket(SubSocket * socket){ |
||||
assert(num_polls + 1 < MAX_POLLERS); |
||||
polls[num_polls].q = (msgq_queue_t*)socket->getRawSocket(); |
||||
|
||||
sockets.push_back(socket); |
||||
num_polls++; |
||||
} |
||||
|
||||
std::vector<SubSocket*> MSGQPoller::poll(int timeout){ |
||||
std::vector<SubSocket*> r; |
||||
|
||||
msgq_poll(polls, num_polls, timeout); |
||||
for (size_t i = 0; i < num_polls; i++){ |
||||
if (polls[i].revents){ |
||||
r.push_back(sockets[i]); |
||||
} |
||||
} |
||||
|
||||
return r; |
||||
} |
@ -0,0 +1,64 @@ |
||||
#pragma once |
||||
#include "messaging.hpp" |
||||
#include "msgq.hpp" |
||||
#include <zmq.h> |
||||
#include <string> |
||||
|
||||
#define MAX_POLLERS 128 |
||||
|
||||
class MSGQContext : public Context { |
||||
private: |
||||
void * context = NULL; |
||||
public: |
||||
MSGQContext(); |
||||
void * getRawContext() {return context;} |
||||
~MSGQContext(); |
||||
}; |
||||
|
||||
class MSGQMessage : public Message { |
||||
private: |
||||
char * data; |
||||
size_t size; |
||||
public: |
||||
void init(size_t size); |
||||
void init(char *data, size_t size); |
||||
void takeOwnership(char *data, size_t size); |
||||
size_t getSize(){return size;} |
||||
char * getData(){return data;} |
||||
void close(); |
||||
~MSGQMessage(); |
||||
}; |
||||
|
||||
class MSGQSubSocket : public SubSocket { |
||||
private: |
||||
msgq_queue_t * q = NULL; |
||||
int timeout; |
||||
public: |
||||
int connect(Context *context, std::string endpoint, std::string address, bool conflate=false); |
||||
void setTimeout(int timeout); |
||||
void * getRawSocket() {return (void*)q;} |
||||
Message *receive(bool non_blocking=false); |
||||
~MSGQSubSocket(); |
||||
}; |
||||
|
||||
class MSGQPubSocket : public PubSocket { |
||||
private: |
||||
msgq_queue_t * q = NULL; |
||||
public: |
||||
int connect(Context *context, std::string endpoint); |
||||
int sendMessage(Message *message); |
||||
int send(char *data, size_t size); |
||||
~MSGQPubSocket(); |
||||
}; |
||||
|
||||
class MSGQPoller : public Poller { |
||||
private: |
||||
std::vector<SubSocket*> sockets; |
||||
msgq_pollitem_t polls[MAX_POLLERS]; |
||||
size_t num_polls = 0; |
||||
|
||||
public: |
||||
void registerSocket(SubSocket *socket); |
||||
std::vector<SubSocket*> poll(int timeout); |
||||
~MSGQPoller(){}; |
||||
}; |
@ -0,0 +1,155 @@ |
||||
#include <cassert> |
||||
#include <cstring> |
||||
#include <iostream> |
||||
#include <cstdlib> |
||||
#include <cerrno> |
||||
|
||||
#include <zmq.h> |
||||
|
||||
#include "services.h" |
||||
#include "impl_zmq.hpp" |
||||
|
||||
static int get_port(std::string endpoint) { |
||||
int port = -1; |
||||
for (const auto& it : services) { |
||||
std::string name = it.name; |
||||
if (name == endpoint) { |
||||
port = it.port; |
||||
break; |
||||
} |
||||
} |
||||
|
||||
assert(port >= 0); |
||||
return port; |
||||
} |
||||
|
||||
ZMQContext::ZMQContext() { |
||||
context = zmq_ctx_new(); |
||||
} |
||||
|
||||
ZMQContext::~ZMQContext() { |
||||
zmq_ctx_term(context); |
||||
} |
||||
|
||||
void ZMQMessage::init(size_t sz) { |
||||
size = sz; |
||||
data = new char[size]; |
||||
} |
||||
|
||||
void ZMQMessage::init(char * d, size_t sz) { |
||||
size = sz; |
||||
data = new char[size]; |
||||
memcpy(data, d, size); |
||||
} |
||||
|
||||
void ZMQMessage::close() { |
||||
if (size > 0){ |
||||
delete[] data; |
||||
} |
||||
size = 0; |
||||
} |
||||
|
||||
ZMQMessage::~ZMQMessage() { |
||||
this->close(); |
||||
} |
||||
|
||||
|
||||
int ZMQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){ |
||||
sock = zmq_socket(context->getRawContext(), ZMQ_SUB); |
||||
if (sock == NULL){ |
||||
return -1; |
||||
} |
||||
|
||||
zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0); |
||||
|
||||
if (conflate){ |
||||
int arg = 1; |
||||
zmq_setsockopt(sock, ZMQ_CONFLATE, &arg, sizeof(int)); |
||||
} |
||||
|
||||
int reconnect_ivl = 500; |
||||
zmq_setsockopt(sock, ZMQ_RECONNECT_IVL_MAX, &reconnect_ivl, sizeof(reconnect_ivl)); |
||||
|
||||
full_endpoint = "tcp://" + address + ":"; |
||||
full_endpoint += std::to_string(get_port(endpoint)); |
||||
|
||||
return zmq_connect(sock, full_endpoint.c_str()); |
||||
} |
||||
|
||||
|
||||
Message * ZMQSubSocket::receive(bool non_blocking){ |
||||
zmq_msg_t msg; |
||||
assert(zmq_msg_init(&msg) == 0); |
||||
|
||||
int flags = non_blocking ? ZMQ_DONTWAIT : 0; |
||||
int rc = zmq_msg_recv(&msg, sock, flags); |
||||
Message *r = NULL; |
||||
|
||||
if (rc >= 0){ |
||||
// Make a copy to ensure the data is aligned
|
||||
r = new ZMQMessage; |
||||
r->init((char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); |
||||
} |
||||
|
||||
zmq_msg_close(&msg); |
||||
return r; |
||||
} |
||||
|
||||
void ZMQSubSocket::setTimeout(int timeout){ |
||||
zmq_setsockopt(sock, ZMQ_RCVTIMEO, &timeout, sizeof(int)); |
||||
} |
||||
|
||||
ZMQSubSocket::~ZMQSubSocket(){ |
||||
zmq_close(sock); |
||||
} |
||||
|
||||
int ZMQPubSocket::connect(Context *context, std::string endpoint){ |
||||
sock = zmq_socket(context->getRawContext(), ZMQ_PUB); |
||||
if (sock == NULL){ |
||||
return -1; |
||||
} |
||||
|
||||
full_endpoint = "tcp://*:"; |
||||
full_endpoint += std::to_string(get_port(endpoint)); |
||||
|
||||
return zmq_bind(sock, full_endpoint.c_str()); |
||||
} |
||||
|
||||
int ZMQPubSocket::sendMessage(Message *message){ |
||||
return zmq_send(sock, message->getData(), message->getSize(), ZMQ_DONTWAIT); |
||||
} |
||||
|
||||
int ZMQPubSocket::send(char *data, size_t size){ |
||||
return zmq_send(sock, data, size, ZMQ_DONTWAIT); |
||||
} |
||||
|
||||
ZMQPubSocket::~ZMQPubSocket(){ |
||||
zmq_close(sock); |
||||
} |
||||
|
||||
|
||||
void ZMQPoller::registerSocket(SubSocket * socket){ |
||||
assert(num_polls + 1 < MAX_POLLERS); |
||||
polls[num_polls].socket = socket->getRawSocket(); |
||||
polls[num_polls].events = ZMQ_POLLIN; |
||||
|
||||
sockets.push_back(socket); |
||||
num_polls++; |
||||
} |
||||
|
||||
std::vector<SubSocket*> ZMQPoller::poll(int timeout){ |
||||
std::vector<SubSocket*> r; |
||||
|
||||
int rc = zmq_poll(polls, num_polls, timeout); |
||||
if (rc < 0){ |
||||
return r; |
||||
} |
||||
|
||||
for (size_t i = 0; i < num_polls; i++){ |
||||
if (polls[i].revents){ |
||||
r.push_back(sockets[i]); |
||||
} |
||||
} |
||||
|
||||
return r; |
||||
} |
@ -0,0 +1,63 @@ |
||||
#pragma once |
||||
#include "messaging.hpp" |
||||
#include <zmq.h> |
||||
#include <string> |
||||
|
||||
#define MAX_POLLERS 128 |
||||
|
||||
class ZMQContext : public Context { |
||||
private: |
||||
void * context = NULL; |
||||
public: |
||||
ZMQContext(); |
||||
void * getRawContext() {return context;} |
||||
~ZMQContext(); |
||||
}; |
||||
|
||||
class ZMQMessage : public Message { |
||||
private: |
||||
char * data; |
||||
size_t size; |
||||
public: |
||||
void init(size_t size); |
||||
void init(char *data, size_t size); |
||||
size_t getSize(){return size;} |
||||
char * getData(){return data;} |
||||
void close(); |
||||
~ZMQMessage(); |
||||
}; |
||||
|
||||
class ZMQSubSocket : public SubSocket { |
||||
private: |
||||
void * sock; |
||||
std::string full_endpoint; |
||||
public: |
||||
int connect(Context *context, std::string endpoint, std::string address, bool conflate=false); |
||||
void setTimeout(int timeout); |
||||
void * getRawSocket() {return sock;} |
||||
Message *receive(bool non_blocking=false); |
||||
~ZMQSubSocket(); |
||||
}; |
||||
|
||||
class ZMQPubSocket : public PubSocket { |
||||
private: |
||||
void * sock; |
||||
std::string full_endpoint; |
||||
public: |
||||
int connect(Context *context, std::string endpoint); |
||||
int sendMessage(Message *message); |
||||
int send(char *data, size_t size); |
||||
~ZMQPubSocket(); |
||||
}; |
||||
|
||||
class ZMQPoller : public Poller { |
||||
private: |
||||
std::vector<SubSocket*> sockets; |
||||
zmq_pollitem_t polls[MAX_POLLERS]; |
||||
size_t num_polls = 0; |
||||
|
||||
public: |
||||
void registerSocket(SubSocket *socket); |
||||
std::vector<SubSocket*> poll(int timeout); |
||||
~ZMQPoller(){}; |
||||
}; |
@ -0,0 +1,117 @@ |
||||
#include "messaging.hpp" |
||||
#include "impl_zmq.hpp" |
||||
#include "impl_msgq.hpp" |
||||
|
||||
Context * Context::create(){ |
||||
Context * c; |
||||
if (std::getenv("MSGQ")){ |
||||
c = new MSGQContext(); |
||||
} else { |
||||
c = new ZMQContext(); |
||||
} |
||||
return c; |
||||
} |
||||
|
||||
SubSocket * SubSocket::create(){ |
||||
SubSocket * s; |
||||
if (std::getenv("MSGQ")){ |
||||
s = new MSGQSubSocket(); |
||||
} else { |
||||
s = new ZMQSubSocket(); |
||||
} |
||||
return s; |
||||
} |
||||
|
||||
SubSocket * SubSocket::create(Context * context, std::string endpoint){ |
||||
SubSocket *s = SubSocket::create(); |
||||
int r = s->connect(context, endpoint, "127.0.0.1"); |
||||
|
||||
if (r == 0) { |
||||
return s; |
||||
} else { |
||||
delete s; |
||||
return NULL; |
||||
} |
||||
} |
||||
|
||||
SubSocket * SubSocket::create(Context * context, std::string endpoint, std::string address){ |
||||
SubSocket *s = SubSocket::create(); |
||||
int r = s->connect(context, endpoint, address); |
||||
|
||||
if (r == 0) { |
||||
return s; |
||||
} else { |
||||
delete s; |
||||
return NULL; |
||||
} |
||||
} |
||||
|
||||
SubSocket * SubSocket::create(Context * context, std::string endpoint, std::string address, bool conflate){ |
||||
SubSocket *s = SubSocket::create(); |
||||
int r = s->connect(context, endpoint, address, conflate); |
||||
|
||||
if (r == 0) { |
||||
return s; |
||||
} else { |
||||
delete s; |
||||
return NULL; |
||||
} |
||||
} |
||||
|
||||
PubSocket * PubSocket::create(){ |
||||
PubSocket * s; |
||||
if (std::getenv("MSGQ")){ |
||||
s = new MSGQPubSocket(); |
||||
} else { |
||||
s = new ZMQPubSocket(); |
||||
} |
||||
return s; |
||||
} |
||||
|
||||
PubSocket * PubSocket::create(Context * context, std::string endpoint){ |
||||
PubSocket *s = PubSocket::create(); |
||||
int r = s->connect(context, endpoint); |
||||
|
||||
if (r == 0) { |
||||
return s; |
||||
} else { |
||||
delete s; |
||||
return NULL; |
||||
} |
||||
} |
||||
|
||||
Poller * Poller::create(){ |
||||
Poller * p; |
||||
if (std::getenv("MSGQ")){ |
||||
p = new MSGQPoller(); |
||||
} else { |
||||
p = new ZMQPoller(); |
||||
} |
||||
return p; |
||||
} |
||||
|
||||
Poller * Poller::create(std::vector<SubSocket*> sockets){ |
||||
Poller * p = Poller::create(); |
||||
|
||||
for (auto s : sockets){ |
||||
p->registerSocket(s); |
||||
} |
||||
return p; |
||||
} |
||||
|
||||
extern "C" Context * messaging_context_create() { |
||||
return Context::create(); |
||||
} |
||||
|
||||
extern "C" SubSocket * messaging_subsocket_create(Context* context, const char* endpoint) { |
||||
return SubSocket::create(context, std::string(endpoint)); |
||||
} |
||||
|
||||
extern "C" PubSocket * messaging_pubsocket_create(Context* context, const char* endpoint) { |
||||
return PubSocket::create(context, std::string(endpoint)); |
||||
} |
||||
|
||||
extern "C" Poller * messaging_poller_create(SubSocket** sockets, int size) { |
||||
std::vector<SubSocket*> socketsVec(sockets, sockets + size); |
||||
return Poller::create(socketsVec); |
||||
} |
@ -0,0 +1,56 @@ |
||||
#pragma once |
||||
#include <cstddef> |
||||
#include <vector> |
||||
#include <string> |
||||
|
||||
#define MSG_MULTIPLE_PUBLISHERS 100 |
||||
|
||||
class Context { |
||||
public: |
||||
virtual void * getRawContext() = 0; |
||||
static Context * create(); |
||||
virtual ~Context(){}; |
||||
}; |
||||
|
||||
class Message { |
||||
public: |
||||
virtual void init(size_t size) = 0; |
||||
virtual void init(char * data, size_t size) = 0; |
||||
virtual void close() = 0; |
||||
virtual size_t getSize() = 0; |
||||
virtual char * getData() = 0; |
||||
virtual ~Message(){}; |
||||
}; |
||||
|
||||
|
||||
class SubSocket { |
||||
public: |
||||
virtual int connect(Context *context, std::string endpoint, std::string address, bool conflate=false) = 0; |
||||
virtual void setTimeout(int timeout) = 0; |
||||
virtual Message *receive(bool non_blocking=false) = 0; |
||||
virtual void * getRawSocket() = 0; |
||||
static SubSocket * create(); |
||||
static SubSocket * create(Context * context, std::string endpoint); |
||||
static SubSocket * create(Context * context, std::string endpoint, std::string address); |
||||
static SubSocket * create(Context * context, std::string endpoint, std::string address, bool conflate); |
||||
virtual ~SubSocket(){}; |
||||
}; |
||||
|
||||
class PubSocket { |
||||
public: |
||||
virtual int connect(Context *context, std::string endpoint) = 0; |
||||
virtual int sendMessage(Message *message) = 0; |
||||
virtual int send(char *data, size_t size) = 0; |
||||
static PubSocket * create(); |
||||
static PubSocket * create(Context * context, std::string endpoint); |
||||
virtual ~PubSocket(){}; |
||||
}; |
||||
|
||||
class Poller { |
||||
public: |
||||
virtual void registerSocket(SubSocket *socket) = 0; |
||||
virtual std::vector<SubSocket*> poll(int timeout) = 0; |
||||
static Poller * create(); |
||||
static Poller * create(std::vector<SubSocket*> sockets); |
||||
virtual ~Poller(){}; |
||||
}; |
@ -0,0 +1,39 @@ |
||||
# distutils: language = c++ |
||||
#cython: language_level=3 |
||||
|
||||
from libcpp.string cimport string |
||||
from libcpp.vector cimport vector |
||||
from libcpp cimport bool |
||||
|
||||
|
||||
cdef extern from "messaging.hpp": |
||||
cdef cppclass Context: |
||||
@staticmethod |
||||
Context * create() |
||||
|
||||
cdef cppclass Message: |
||||
void init(size_t) |
||||
void init(char *, size_t) |
||||
void close() |
||||
size_t getSize() |
||||
char *getData() |
||||
|
||||
cdef cppclass SubSocket: |
||||
@staticmethod |
||||
SubSocket * create() |
||||
int connect(Context *, string, string, bool) |
||||
Message * receive(bool) |
||||
void setTimeout(int) |
||||
|
||||
cdef cppclass PubSocket: |
||||
@staticmethod |
||||
PubSocket * create() |
||||
int connect(Context *, string) |
||||
int sendMessage(Message *) |
||||
int send(char *, size_t) |
||||
|
||||
cdef cppclass Poller: |
||||
@staticmethod |
||||
Poller * create() |
||||
void registerSocket(SubSocket *) |
||||
vector[SubSocket*] poll(int) nogil |
@ -0,0 +1,151 @@ |
||||
# distutils: language = c++ |
||||
# cython: c_string_encoding=ascii, language_level=3 |
||||
|
||||
import sys |
||||
from libcpp.string cimport string |
||||
from libcpp cimport bool |
||||
from libc cimport errno |
||||
|
||||
|
||||
from messaging cimport Context as cppContext |
||||
from messaging cimport SubSocket as cppSubSocket |
||||
from messaging cimport PubSocket as cppPubSocket |
||||
from messaging cimport Poller as cppPoller |
||||
from messaging cimport Message as cppMessage |
||||
|
||||
|
||||
class MessagingError(Exception): |
||||
pass |
||||
|
||||
|
||||
class MultiplePublishersError(MessagingError): |
||||
pass |
||||
|
||||
|
||||
cdef class Context: |
||||
cdef cppContext * context |
||||
|
||||
def __cinit__(self): |
||||
self.context = cppContext.create() |
||||
|
||||
def term(self): |
||||
del self.context |
||||
self.context = NULL |
||||
|
||||
def __dealloc__(self): |
||||
pass |
||||
# Deleting the context will hang if sockets are still active |
||||
# TODO: Figure out a way to make sure the context is closed last |
||||
# del self.context |
||||
|
||||
|
||||
cdef class Poller: |
||||
cdef cppPoller * poller |
||||
cdef list sub_sockets |
||||
|
||||
def __cinit__(self): |
||||
self.sub_sockets = [] |
||||
self.poller = cppPoller.create() |
||||
|
||||
def __dealloc__(self): |
||||
del self.poller |
||||
|
||||
def registerSocket(self, SubSocket socket): |
||||
self.sub_sockets.append(socket) |
||||
self.poller.registerSocket(socket.socket) |
||||
|
||||
def poll(self, timeout): |
||||
sockets = [] |
||||
cdef int t = timeout |
||||
|
||||
with nogil: |
||||
result = self.poller.poll(t) |
||||
|
||||
for s in result: |
||||
socket = SubSocket() |
||||
socket.setPtr(s) |
||||
sockets.append(socket) |
||||
|
||||
return sockets |
||||
|
||||
cdef class SubSocket: |
||||
cdef cppSubSocket * socket |
||||
cdef bool is_owner |
||||
|
||||
def __cinit__(self): |
||||
self.socket = cppSubSocket.create() |
||||
self.is_owner = True |
||||
|
||||
if self.socket == NULL: |
||||
raise MessagingError |
||||
|
||||
def __dealloc__(self): |
||||
if self.is_owner: |
||||
del self.socket |
||||
|
||||
cdef setPtr(self, cppSubSocket * ptr): |
||||
if self.is_owner: |
||||
del self.socket |
||||
|
||||
self.is_owner = False |
||||
self.socket = ptr |
||||
|
||||
def connect(self, Context context, string endpoint, string address=b"127.0.0.1", bool conflate=False): |
||||
r = self.socket.connect(context.context, endpoint, address, conflate) |
||||
|
||||
if r != 0: |
||||
if errno.errno == errno.EADDRINUSE: |
||||
raise MultiplePublishersError |
||||
else: |
||||
raise MessagingError |
||||
|
||||
def setTimeout(self, int timeout): |
||||
self.socket.setTimeout(timeout) |
||||
|
||||
def receive(self, bool non_blocking=False): |
||||
msg = self.socket.receive(non_blocking) |
||||
|
||||
if msg == NULL: |
||||
# If a blocking read returns no message check errno if SIGINT was caught in the C++ code |
||||
if errno.errno == errno.EINTR: |
||||
print("SIGINT received, exiting") |
||||
sys.exit(1) |
||||
|
||||
return None |
||||
else: |
||||
sz = msg.getSize() |
||||
m = msg.getData()[:sz] |
||||
del msg |
||||
|
||||
return m |
||||
|
||||
|
||||
cdef class PubSocket: |
||||
cdef cppPubSocket * socket |
||||
|
||||
def __cinit__(self): |
||||
self.socket = cppPubSocket.create() |
||||
if self.socket == NULL: |
||||
raise MessagingError |
||||
|
||||
def __dealloc__(self): |
||||
del self.socket |
||||
|
||||
def connect(self, Context context, string endpoint): |
||||
r = self.socket.connect(context.context, endpoint) |
||||
|
||||
if r != 0: |
||||
if errno.errno == errno.EADDRINUSE: |
||||
raise MultiplePublishersError |
||||
else: |
||||
raise MessagingError |
||||
|
||||
def send(self, string data): |
||||
length = len(data) |
||||
r = self.socket.send(<char*>data.c_str(), length) |
||||
|
||||
if r != length: |
||||
if errno.errno == errno.EADDRINUSE: |
||||
raise MultiplePublishersError |
||||
else: |
||||
raise MessagingError |
@ -0,0 +1,56 @@ |
||||
import os |
||||
import subprocess |
||||
import sysconfig |
||||
from distutils.core import Extension, setup # pylint: disable=import-error,no-name-in-module |
||||
|
||||
from Cython.Build import cythonize |
||||
from Cython.Distutils import build_ext |
||||
|
||||
|
||||
def get_ext_filename_without_platform_suffix(filename): |
||||
name, ext = os.path.splitext(filename) |
||||
ext_suffix = sysconfig.get_config_var('EXT_SUFFIX') |
||||
|
||||
if ext_suffix == ext: |
||||
return filename |
||||
|
||||
ext_suffix = ext_suffix.replace(ext, '') |
||||
idx = name.find(ext_suffix) |
||||
|
||||
if idx == -1: |
||||
return filename |
||||
else: |
||||
return name[:idx] + ext |
||||
|
||||
|
||||
class BuildExtWithoutPlatformSuffix(build_ext): |
||||
def get_ext_filename(self, ext_name): |
||||
filename = super().get_ext_filename(ext_name) |
||||
return get_ext_filename_without_platform_suffix(filename) |
||||
|
||||
|
||||
sourcefiles = ['messaging_pyx.pyx'] |
||||
extra_compile_args = ["-std=c++11"] |
||||
libraries = ['zmq'] |
||||
ARCH = subprocess.check_output(["uname", "-m"], encoding='utf8').rstrip() # pylint: disable=unexpected-keyword-arg |
||||
|
||||
if ARCH == "aarch64": |
||||
extra_compile_args += ["-Wno-deprecated-register"] |
||||
libraries += ['gnustl_shared'] |
||||
|
||||
setup(name='CAN parser', |
||||
cmdclass={'build_ext': BuildExtWithoutPlatformSuffix}, |
||||
ext_modules=cythonize( |
||||
Extension( |
||||
"messaging_pyx", |
||||
language="c++", |
||||
sources=sourcefiles, |
||||
extra_compile_args=extra_compile_args, |
||||
libraries=libraries, |
||||
extra_objects=[ |
||||
os.path.join(os.path.dirname(os.path.realpath(__file__)), '../', 'libmessaging.a'), |
||||
] |
||||
) |
||||
), |
||||
nthreads=4, |
||||
) |
@ -0,0 +1,441 @@ |
||||
#include <iostream> |
||||
#include <cassert> |
||||
#include <cerrno> |
||||
#include <cmath> |
||||
#include <cstring> |
||||
#include <cstdint> |
||||
#include <chrono> |
||||
#include <algorithm> |
||||
#include <cstdlib> |
||||
#include <csignal> |
||||
#include <random> |
||||
|
||||
#include <poll.h> |
||||
#include <sys/ioctl.h> |
||||
#include <sys/mman.h> |
||||
#include <sys/stat.h> |
||||
#include <sys/types.h> |
||||
#include <sys/syscall.h> |
||||
#include <fcntl.h> |
||||
#include <unistd.h> |
||||
|
||||
#include <stdio.h> |
||||
|
||||
#include "msgq.hpp" |
||||
|
||||
void sigusr1_handler(int signal) { |
||||
assert(signal == SIGUSR1); |
||||
} |
||||
|
||||
uint64_t msgq_get_uid(void){ |
||||
std::random_device rd("/dev/urandom"); |
||||
std::uniform_int_distribution<uint64_t> distribution(0,std::numeric_limits<uint32_t>::max()); |
||||
|
||||
uint64_t uid = distribution(rd) << 32 | syscall(SYS_gettid); |
||||
return uid; |
||||
} |
||||
|
||||
int msgq_msg_init_size(msgq_msg_t * msg, size_t size){ |
||||
msg->size = size; |
||||
msg->data = new(std::nothrow) char[size]; |
||||
|
||||
return (msg->data == NULL) ? -1 : 0; |
||||
} |
||||
|
||||
|
||||
int msgq_msg_init_data(msgq_msg_t * msg, char * data, size_t size) { |
||||
int r = msgq_msg_init_size(msg, size); |
||||
|
||||
if (r == 0) |
||||
memcpy(msg->data, data, size); |
||||
|
||||
return r; |
||||
} |
||||
|
||||
int msgq_msg_close(msgq_msg_t * msg){ |
||||
if (msg->size > 0) |
||||
delete[] msg->data; |
||||
|
||||
msg->size = 0; |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
void msgq_reset_reader(msgq_queue_t * q){ |
||||
int id = q->reader_id; |
||||
q->read_valids[id]->store(true); |
||||
q->read_pointers[id]->store(*q->write_pointer); |
||||
} |
||||
|
||||
void msgq_wait_for_subscriber(msgq_queue_t *q){ |
||||
while (*q->num_readers == 0){ |
||||
; |
||||
} |
||||
|
||||
return; |
||||
} |
||||
|
||||
|
||||
|
||||
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ |
||||
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
|
||||
|
||||
std::signal(SIGUSR1, sigusr1_handler); |
||||
|
||||
const char * prefix = "/dev/shm/"; |
||||
char * full_path = new char[strlen(path) + strlen(prefix) + 1]; |
||||
strcpy(full_path, prefix); |
||||
strcat(full_path, path); |
||||
|
||||
auto fd = open(full_path, O_RDWR | O_CREAT, 0777); |
||||
delete[] full_path; |
||||
|
||||
if (fd < 0) |
||||
return -1; |
||||
|
||||
int rc = ftruncate(fd, size + sizeof(msgq_header_t)); |
||||
if (rc < 0) |
||||
return -1; |
||||
|
||||
char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); |
||||
close(fd); |
||||
|
||||
if (mem == NULL) |
||||
return -1; |
||||
|
||||
q->mmap_p = mem; |
||||
|
||||
msgq_header_t *header = (msgq_header_t *)mem; |
||||
|
||||
// Setup pointers to header segment
|
||||
q->num_readers = reinterpret_cast<std::atomic<uint64_t>*>(&header->num_readers); |
||||
q->write_pointer = reinterpret_cast<std::atomic<uint64_t>*>(&header->write_pointer); |
||||
q->write_uid = reinterpret_cast<std::atomic<uint64_t>*>(&header->write_uid); |
||||
|
||||
for (size_t i = 0; i < NUM_READERS; i++){ |
||||
q->read_pointers[i] = reinterpret_cast<std::atomic<uint64_t>*>(&header->read_pointers[i]); |
||||
q->read_valids[i] = reinterpret_cast<std::atomic<uint64_t>*>(&header->read_valids[i]); |
||||
q->read_uids[i] = reinterpret_cast<std::atomic<uint64_t>*>(&header->read_uids[i]); |
||||
} |
||||
|
||||
q->data = mem + sizeof(msgq_header_t); |
||||
q->size = size; |
||||
q->reader_id = -1; |
||||
|
||||
q->endpoint = path; |
||||
q->read_conflate = false; |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
void msgq_close_queue(msgq_queue_t *q){ |
||||
if (q->mmap_p != NULL){ |
||||
munmap(q->mmap_p, q->size + sizeof(msgq_header_t)); |
||||
} |
||||
} |
||||
|
||||
|
||||
void msgq_init_publisher(msgq_queue_t * q) { |
||||
std::cout << "Starting publisher" << std::endl; |
||||
uint64_t uid = msgq_get_uid(); |
||||
|
||||
*q->write_uid = uid; |
||||
*q->num_readers = 0; |
||||
|
||||
for (size_t i = 0; i < NUM_READERS; i++){ |
||||
*q->read_valids[i] = false; |
||||
*q->read_uids[i] = 0; |
||||
} |
||||
|
||||
q->write_uid_local = uid; |
||||
} |
||||
|
||||
void msgq_init_subscriber(msgq_queue_t * q) { |
||||
assert(q != NULL); |
||||
assert(q->num_readers != NULL); |
||||
|
||||
uint64_t uid = msgq_get_uid(); |
||||
|
||||
// Get reader id
|
||||
while (true){ |
||||
uint64_t cur_num_readers = *q->num_readers; |
||||
uint64_t new_num_readers = cur_num_readers + 1; |
||||
|
||||
// No more slots available. Reset all subscribers to kick out inactive ones
|
||||
if (new_num_readers > NUM_READERS){ |
||||
std::cout << "Warning, evicting all subscribers!" << std::endl; |
||||
*q->num_readers = 0; |
||||
|
||||
for (size_t i = 0; i < NUM_READERS; i++){ |
||||
*q->read_valids[i] = false; |
||||
|
||||
uint64_t old_uid = *q->read_uids[i]; |
||||
*q->read_uids[i] = 0; |
||||
|
||||
// Wake up reader in case they are in a poll
|
||||
syscall(SYS_tkill, old_uid & 0xFFFFFFFF, SIGUSR1); |
||||
} |
||||
|
||||
continue; |
||||
} |
||||
|
||||
// Use atomic compare and swap to handle race condition
|
||||
// where two subscribers start at the same time
|
||||
if (std::atomic_compare_exchange_strong(q->num_readers, |
||||
&cur_num_readers, |
||||
new_num_readers)){ |
||||
q->reader_id = cur_num_readers; |
||||
q->read_uid_local = uid; |
||||
|
||||
// We start with read_valid = false,
|
||||
// on the first read the read pointer will be synchronized with the write pointer
|
||||
*q->read_valids[cur_num_readers] = false; |
||||
*q->read_pointers[cur_num_readers] = 0; |
||||
*q->read_uids[cur_num_readers] = uid; |
||||
break; |
||||
} |
||||
} |
||||
|
||||
std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl; |
||||
msgq_reset_reader(q); |
||||
} |
||||
|
||||
int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ |
||||
// Die if we are no longer the active publisher
|
||||
if (q->write_uid_local != *q->write_uid){ |
||||
std::cout << "Killing old publisher: " << q->endpoint << std::endl; |
||||
errno = EADDRINUSE; |
||||
return -1; |
||||
} |
||||
|
||||
uint64_t total_msg_size = ALIGN(msg->size + sizeof(int64_t)); |
||||
|
||||
// We need to fit at least three messages in the queue,
|
||||
// then we can always safely access the last message
|
||||
assert(3 * total_msg_size <= q->size); |
||||
|
||||
uint64_t num_readers = *q->num_readers; |
||||
|
||||
uint32_t write_cycles, write_pointer; |
||||
UNPACK64(write_cycles, write_pointer, *q->write_pointer); |
||||
|
||||
char *p = q->data + write_pointer; // add base offset
|
||||
|
||||
// Check remaining space
|
||||
// Always leave space for a wraparound tag for the next message, including alignment
|
||||
int64_t remaining_space = q->size - write_pointer - total_msg_size - sizeof(int64_t); |
||||
if (remaining_space <= 0){ |
||||
// Write -1 size tag indicating wraparound
|
||||
*(int64_t*)p = -1; |
||||
|
||||
// Invalidate all readers that are beyond the write pointer
|
||||
// TODO: should we handle the case where a new reader shows up while this is running?
|
||||
for (uint64_t i = 0; i < num_readers; i++){ |
||||
uint64_t read_pointer = *q->read_pointers[i]; |
||||
uint64_t read_cycles = read_pointer >> 32; |
||||
read_pointer &= 0xFFFFFFFF; |
||||
|
||||
if ((read_pointer > write_pointer) && (read_cycles != write_cycles)) { |
||||
*q->read_valids[i] = false; |
||||
} |
||||
} |
||||
|
||||
// Update global and local copies of write pointer and write_cycles
|
||||
write_pointer = 0; |
||||
write_cycles = write_cycles + 1; |
||||
PACK64(*q->write_pointer, write_cycles, write_pointer); |
||||
|
||||
// Set actual pointer to the beginning of the data segment
|
||||
p = q->data; |
||||
} |
||||
|
||||
// Invalidate readers that are in the area that will be written
|
||||
uint64_t start = write_pointer; |
||||
uint64_t end = ALIGN(start + sizeof(int64_t) + msg->size); |
||||
|
||||
for (uint64_t i = 0; i < num_readers; i++){ |
||||
uint32_t read_cycles, read_pointer; |
||||
UNPACK64(read_cycles, read_pointer, *q->read_pointers[i]); |
||||
|
||||
if ((read_pointer >= start) && (read_pointer < end) && (read_cycles != write_cycles)) { |
||||
*q->read_valids[i] = false; |
||||
} |
||||
} |
||||
|
||||
|
||||
// Write size tag
|
||||
std::atomic<int64_t> *size_p = reinterpret_cast<std::atomic<int64_t>*>(p); |
||||
*size_p = msg->size; |
||||
|
||||
// Copy data
|
||||
memcpy(p + sizeof(int64_t), msg->data, msg->size); |
||||
__sync_synchronize(); |
||||
|
||||
// Update write pointer
|
||||
uint32_t new_ptr = ALIGN(write_pointer + msg->size + sizeof(int64_t)); |
||||
PACK64(*q->write_pointer, write_cycles, new_ptr); |
||||
|
||||
// Notify readers
|
||||
for (uint64_t i = 0; i < num_readers; i++){ |
||||
uint64_t reader_uid = *q->read_uids[i]; |
||||
|
||||
syscall(SYS_tkill, reader_uid & 0xFFFFFFFF, SIGUSR1); |
||||
} |
||||
|
||||
return msg->size; |
||||
} |
||||
|
||||
|
||||
int msgq_msg_ready(msgq_queue_t * q){ |
||||
start: |
||||
int id = q->reader_id; |
||||
assert(id >= 0); // Make sure subscriber is initialized
|
||||
|
||||
if (q->read_uid_local != *q->read_uids[id]){ |
||||
std::cout << q->endpoint << ": Reader was evicted, reconnecting" << std::endl; |
||||
msgq_init_subscriber(q); |
||||
goto start; |
||||
} |
||||
|
||||
// Check valid
|
||||
if (!*q->read_valids[id]){ |
||||
msgq_reset_reader(q); |
||||
goto start; |
||||
} |
||||
|
||||
uint32_t read_cycles, read_pointer; |
||||
UNPACK64(read_cycles, read_pointer, *q->read_pointers[id]); |
||||
|
||||
uint32_t write_cycles, write_pointer; |
||||
UNPACK64(write_cycles, write_pointer, *q->write_pointer); |
||||
|
||||
// Check if new message is available
|
||||
return (read_pointer != write_pointer); |
||||
} |
||||
|
||||
int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){ |
||||
start: |
||||
int id = q->reader_id; |
||||
assert(id >= 0); // Make sure subscriber is initialized
|
||||
|
||||
if (q->read_uid_local != *q->read_uids[id]){ |
||||
std::cout << q->endpoint << ": Reader was evicted, reconnecting" << std::endl; |
||||
msgq_init_subscriber(q); |
||||
goto start; |
||||
} |
||||
|
||||
// Check valid
|
||||
if (!*q->read_valids[id]){ |
||||
msgq_reset_reader(q); |
||||
goto start; |
||||
} |
||||
|
||||
uint32_t read_cycles, read_pointer; |
||||
UNPACK64(read_cycles, read_pointer, *q->read_pointers[id]); |
||||
|
||||
uint32_t write_cycles, write_pointer; |
||||
UNPACK64(write_cycles, write_pointer, *q->write_pointer); |
||||
|
||||
char * p = q->data + read_pointer; |
||||
|
||||
// Check if new message is available
|
||||
if (read_pointer == write_pointer) { |
||||
msg->size = 0; |
||||
return 0; |
||||
} |
||||
|
||||
// Read potential message size
|
||||
std::atomic<int64_t> *size_p = reinterpret_cast<std::atomic<int64_t>*>(p); |
||||
std::int64_t size = *size_p; |
||||
|
||||
// Check if the size that was read is valid
|
||||
if (!*q->read_valids[id]){ |
||||
msgq_reset_reader(q); |
||||
goto start; |
||||
} |
||||
|
||||
// If size is -1 the buffer was full, and we need to wrap around
|
||||
if (size == -1){ |
||||
read_cycles++; |
||||
PACK64(*q->read_pointers[id], read_cycles, 0); |
||||
goto start; |
||||
} |
||||
|
||||
// crashing is better than passing garbage data to the consumer
|
||||
// the size will have weird value if it was overwritten by data accidentally
|
||||
assert((uint64_t)size < q->size); |
||||
assert(size > 0); |
||||
|
||||
uint32_t new_read_pointer = ALIGN(read_pointer + sizeof(std::int64_t) + size); |
||||
|
||||
// If conflate is true, check if this is the latest message, else start over
|
||||
if (q->read_conflate){ |
||||
if (new_read_pointer != write_pointer){ |
||||
// Update read pointer
|
||||
PACK64(*q->read_pointers[id], read_cycles, new_read_pointer); |
||||
goto start; |
||||
} |
||||
} |
||||
|
||||
// Copy message
|
||||
if (msgq_msg_init_size(msg, size) < 0) |
||||
return -1; |
||||
|
||||
__sync_synchronize(); |
||||
memcpy(msg->data, p + sizeof(int64_t), size); |
||||
__sync_synchronize(); |
||||
|
||||
// Update read pointer
|
||||
PACK64(*q->read_pointers[id], read_cycles, new_read_pointer); |
||||
|
||||
// Check if the actual data that was copied is valid
|
||||
if (!*q->read_valids[id]){ |
||||
msgq_msg_close(msg); |
||||
msgq_reset_reader(q); |
||||
goto start; |
||||
} |
||||
|
||||
|
||||
return msg->size; |
||||
} |
||||
|
||||
|
||||
|
||||
int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ |
||||
assert(timeout >= 0); |
||||
|
||||
int num = 0; |
||||
|
||||
// Check if messages ready
|
||||
for (size_t i = 0; i < nitems; i++) { |
||||
items[i].revents = msgq_msg_ready(items[i].q); |
||||
if (items[i].revents) num++; |
||||
} |
||||
|
||||
int ms = (timeout == -1) ? 100 : timeout; |
||||
struct timespec ts; |
||||
ts.tv_sec = ms / 1000; |
||||
ts.tv_nsec = (ms % 1000) * 1000 * 1000; |
||||
|
||||
|
||||
while (num == 0) { |
||||
int ret; |
||||
|
||||
ret = nanosleep(&ts, &ts); |
||||
|
||||
// Check if messages ready
|
||||
for (size_t i = 0; i < nitems; i++) { |
||||
if (items[i].revents == 0 && msgq_msg_ready(items[i].q)){ |
||||
num += 1; |
||||
items[i].revents = 1; |
||||
} |
||||
} |
||||
|
||||
// exit if we had a timeout and the sleep finished
|
||||
if (timeout != -1 && ret == 0){ |
||||
break; |
||||
} |
||||
} |
||||
|
||||
return num; |
||||
} |
@ -0,0 +1,66 @@ |
||||
#pragma once |
||||
#include <cstdint> |
||||
#include <cstring> |
||||
#include <string> |
||||
#include <atomic> |
||||
|
||||
#define DEFAULT_SEGMENT_SIZE (10 * 1024 * 1024) |
||||
#define NUM_READERS 8 |
||||
#define ALIGN(n) ((n + (8 - 1)) & -8) |
||||
|
||||
#define UNPACK64(higher, lower, input) do {uint64_t tmp = input; higher = tmp >> 32; lower = tmp & 0xFFFFFFFF;} while (0) |
||||
#define PACK64(output, higher, lower) output = ((uint64_t)higher << 32 ) | ((uint64_t)lower & 0xFFFFFFFF) |
||||
|
||||
struct msgq_header_t { |
||||
uint64_t num_readers; |
||||
uint64_t write_pointer; |
||||
uint64_t write_uid; |
||||
uint64_t read_pointers[NUM_READERS]; |
||||
uint64_t read_valids[NUM_READERS]; |
||||
uint64_t read_uids[NUM_READERS]; |
||||
}; |
||||
|
||||
struct msgq_queue_t { |
||||
std::atomic<uint64_t> *num_readers; |
||||
std::atomic<uint64_t> *write_pointer; |
||||
std::atomic<uint64_t> *write_uid; |
||||
std::atomic<uint64_t> *read_pointers[NUM_READERS]; |
||||
std::atomic<uint64_t> *read_valids[NUM_READERS]; |
||||
std::atomic<uint64_t> *read_uids[NUM_READERS]; |
||||
char * mmap_p; |
||||
char * data; |
||||
size_t size; |
||||
int reader_id; |
||||
uint64_t read_uid_local; |
||||
uint64_t write_uid_local; |
||||
|
||||
bool read_conflate; |
||||
std::string endpoint; |
||||
}; |
||||
|
||||
struct msgq_msg_t { |
||||
size_t size; |
||||
char * data; |
||||
}; |
||||
|
||||
struct msgq_pollitem_t { |
||||
msgq_queue_t *q; |
||||
int revents; |
||||
}; |
||||
|
||||
void msgq_wait_for_subscriber(msgq_queue_t *q); |
||||
void msgq_reset_reader(msgq_queue_t *q); |
||||
|
||||
int msgq_msg_init_size(msgq_msg_t *msg, size_t size); |
||||
int msgq_msg_init_data(msgq_msg_t *msg, char * data, size_t size); |
||||
int msgq_msg_close(msgq_msg_t *msg); |
||||
|
||||
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size); |
||||
void msgq_close_queue(msgq_queue_t *q); |
||||
void msgq_init_publisher(msgq_queue_t * q); |
||||
void msgq_init_subscriber(msgq_queue_t * q); |
||||
|
||||
int msgq_msg_send(msgq_msg_t *msg, msgq_queue_t *q); |
||||
int msgq_msg_recv(msgq_msg_t *msg, msgq_queue_t *q); |
||||
int msgq_msg_ready(msgq_queue_t * q); |
||||
int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout); |
@ -0,0 +1,56 @@ |
||||
# MSGQ: A lock free single producer multi consumer message queue |
||||
|
||||
[](https://dev.azure.com/commaai/default/_build/latest?definitionId=21&branchName=master) |
||||
|
||||
## What is MSGQ? |
||||
MSGQ is a system to pass messages from a single producer to multiple consumers. All the consumers need to be able to receive all the messages. It is designed to be a high performance replacement for ZMQ-like SUB/PUB patterns. It uses a ring buffer in shared memory to efficiently read and write data. Each read requires a copy. Writing can be done without a copy, as long as the size of the data is known in advance. |
||||
|
||||
## Storage |
||||
The storage for the queue consists of an area of metadata, and the actual buffer. The metadata contains: |
||||
|
||||
1. A counter to the number of readers that are active |
||||
2. A pointer to the head of the queue for writing. From now on referred to as *write pointer* |
||||
3. A cycle counter for the writer. This counter is incremented when the writer wraps around |
||||
4. N pointers, pointing to the current read position for all the readers. From now on referred to as *read pointer* |
||||
5. N counters, counting the number of cycles for all the readers |
||||
6. N booleans, indicating validity for all the readers. From now on referred to as *validity flag* |
||||
|
||||
The counter and the pointer are both 32 bit values, packed into 64 bit so they can be read and written atomically. |
||||
|
||||
The data buffer is a ring buffer. All messages are prefixed by an 8 byte size field, followed by the data. A size of -1 indicates a wrap-around, and means the next message is stored at the beginning of the buffer. |
||||
|
||||
|
||||
## Writing |
||||
Writing involves the following steps: |
||||
|
||||
1. Check if the area that is to be written overlaps with any of the read pointers, mark those readers as invalid by clearing the validity flag. |
||||
2. Write the message |
||||
3. Increase the write pointer by the size of the message |
||||
|
||||
In case there is not enough space at the end of the buffer, a special empty message with a prefix of -1 is written. The cycle counter is incremented by one. In this case step 1 will check there are no read pointers pointing to the remainder of the buffer. Then another write cycle will start with the actual message. |
||||
|
||||
There always needs to be 8 bytes of empty space at the end of the buffer. By doing this there is always space to write the -1. |
||||
|
||||
## Reset reader |
||||
When the reader is lagging too much behind the read pointer becomes invalid and no longer points to the beginning of a valid message. To reset a reader to the current write pointer, the following steps are performed: |
||||
|
||||
1. Set valid flag |
||||
2. Set read cycle counter to that of the writer |
||||
3. Set read pointer to write pointer |
||||
|
||||
## Reading |
||||
Reading involves the following steps: |
||||
|
||||
1. Read the size field at the current read pointer |
||||
2. Read the validity flag |
||||
3. Copy the data out of the buffer |
||||
4. Increase the read pointer by the size of the message |
||||
5. Check the validity flag again |
||||
|
||||
Before starting the copy, the valid flag is checked. This is to prevent a race condition where the size prefix was invalid, and the read could read outside of the buffer. Make sure that step 1 and 2 are not reordered by your compiler or CPU. |
||||
|
||||
If a writer overwrites the data while it's being copied out, the data will be invalid. Therefore the validity flag is also checked after reading it. The order of step 4 and 5 does not matter. |
||||
|
||||
If at steps 2 or 5 the validity flag is not set, the reader is reset. Any data that was already read is discarded. After the reader is reset, the reading starts from the beginning. |
||||
|
||||
If a message with size -1 is encountered, step 3 and 4 are replaced by increasing the cycle counter and setting the read pointer to the beginning of the buffer. After that another read is performed. |
@ -0,0 +1,395 @@ |
||||
#include "catch2/catch.hpp" |
||||
#include "msgq.hpp" |
||||
|
||||
TEST_CASE("ALIGN"){ |
||||
REQUIRE(ALIGN(0) == 0); |
||||
REQUIRE(ALIGN(1) == 8); |
||||
REQUIRE(ALIGN(7) == 8); |
||||
REQUIRE(ALIGN(8) == 8); |
||||
REQUIRE(ALIGN(99999) == 100000); |
||||
} |
||||
|
||||
TEST_CASE("msgq_msg_init_size"){ |
||||
const size_t msg_size = 30; |
||||
msgq_msg_t msg; |
||||
|
||||
msgq_msg_init_size(&msg, msg_size); |
||||
REQUIRE(msg.size == msg_size); |
||||
|
||||
msgq_msg_close(&msg); |
||||
} |
||||
|
||||
TEST_CASE("msgq_msg_init_data"){ |
||||
const size_t msg_size = 30; |
||||
char * data = new char[msg_size]; |
||||
|
||||
for (size_t i = 0; i < msg_size; i++){ |
||||
data[i] = i; |
||||
} |
||||
|
||||
msgq_msg_t msg; |
||||
msgq_msg_init_data(&msg, data, msg_size); |
||||
|
||||
REQUIRE(msg.size == msg_size); |
||||
REQUIRE(memcmp(msg.data, data, msg_size) == 0); |
||||
|
||||
delete[] data; |
||||
msgq_msg_close(&msg); |
||||
} |
||||
|
||||
|
||||
TEST_CASE("msgq_init_subscriber"){ |
||||
remove("/dev/shm/test_queue"); |
||||
msgq_queue_t q; |
||||
msgq_new_queue(&q, "test_queue", 1024); |
||||
REQUIRE(*q.num_readers == 0); |
||||
|
||||
q.reader_id = 1; |
||||
*q.read_valids[0] = false; |
||||
*q.read_pointers[0] = ((uint64_t)1 << 32); |
||||
|
||||
*q.write_pointer = 255; |
||||
|
||||
msgq_init_subscriber(&q); |
||||
REQUIRE(q.read_conflate == false); |
||||
REQUIRE(*q.read_valids[0] == true); |
||||
REQUIRE((*q.read_pointers[0] >> 32) == 0); |
||||
REQUIRE((*q.read_pointers[0] & 0xFFFFFFFF) == 255); |
||||
} |
||||
|
||||
TEST_CASE("msgq_msg_send first message"){ |
||||
remove("/dev/shm/test_queue"); |
||||
msgq_queue_t q; |
||||
msgq_new_queue(&q, "test_queue", 1024); |
||||
msgq_init_publisher(&q); |
||||
|
||||
REQUIRE(*q.write_pointer == 0); |
||||
|
||||
size_t msg_size = 128; |
||||
|
||||
SECTION("Aligned message size"){ |
||||
} |
||||
SECTION("Unaligned message size"){ |
||||
msg_size--; |
||||
} |
||||
|
||||
char * data = new char[msg_size]; |
||||
|
||||
for (size_t i = 0; i < msg_size; i++){ |
||||
data[i] = i; |
||||
} |
||||
|
||||
msgq_msg_t msg; |
||||
msgq_msg_init_data(&msg, data, msg_size); |
||||
|
||||
|
||||
msgq_msg_send(&msg, &q); |
||||
REQUIRE(*(int64_t*)q.data == msg_size); // Check size tag
|
||||
REQUIRE(*q.write_pointer == 128 + sizeof(int64_t)); |
||||
REQUIRE(memcmp(q.data + sizeof(int64_t), data, msg_size) == 0); |
||||
|
||||
delete[] data; |
||||
msgq_msg_close(&msg); |
||||
} |
||||
|
||||
TEST_CASE("msgq_msg_send test wraparound"){ |
||||
remove("/dev/shm/test_queue"); |
||||
msgq_queue_t q; |
||||
msgq_new_queue(&q, "test_queue", 1024); |
||||
msgq_init_publisher(&q); |
||||
|
||||
REQUIRE((*q.write_pointer & 0xFFFFFFFF) == 0); |
||||
REQUIRE((*q.write_pointer >> 32) == 0); |
||||
|
||||
const size_t msg_size = 120; |
||||
msgq_msg_t msg; |
||||
msgq_msg_init_size(&msg, msg_size); |
||||
|
||||
for (int i = 0; i < 8; i++) { |
||||
msgq_msg_send(&msg, &q); |
||||
} |
||||
// Check 8th message was written at the beginning
|
||||
REQUIRE((*q.write_pointer & 0xFFFFFFFF) == msg_size + sizeof(int64_t)); |
||||
|
||||
// Check cycle count
|
||||
REQUIRE((*q.write_pointer >> 32) == 1); |
||||
|
||||
// Check wraparound tag
|
||||
char * tag_location = q.data; |
||||
tag_location += 7 * (msg_size + sizeof(int64_t)); |
||||
REQUIRE(*(int64_t*)tag_location == -1); |
||||
|
||||
msgq_msg_close(&msg); |
||||
} |
||||
|
||||
TEST_CASE("msgq_msg_recv test wraparound"){ |
||||
remove("/dev/shm/test_queue"); |
||||
msgq_queue_t q_pub, q_sub; |
||||
msgq_new_queue(&q_pub, "test_queue", 1024); |
||||
msgq_new_queue(&q_sub, "test_queue", 1024); |
||||
|
||||
msgq_init_publisher(&q_pub); |
||||
msgq_init_subscriber(&q_sub); |
||||
|
||||
REQUIRE((*q_pub.write_pointer >> 32) == 0); |
||||
REQUIRE((*q_sub.read_pointers[0] >> 32) == 0); |
||||
|
||||
const size_t msg_size = 120; |
||||
msgq_msg_t msg1; |
||||
msgq_msg_init_size(&msg1, msg_size); |
||||
|
||||
|
||||
SECTION("Check cycle counter after reset") { |
||||
for (int i = 0; i < 8; i++) { |
||||
msgq_msg_send(&msg1, &q_pub); |
||||
} |
||||
|
||||
msgq_msg_t msg2; |
||||
msgq_msg_recv(&msg2, &q_sub); |
||||
REQUIRE(msg2.size == 0); // Reader had to reset
|
||||
msgq_msg_close(&msg2); |
||||
|
||||
} |
||||
SECTION("Check cycle counter while keeping up with writer") { |
||||
for (int i = 0; i < 8; i++) { |
||||
msgq_msg_send(&msg1, &q_pub); |
||||
|
||||
msgq_msg_t msg2; |
||||
msgq_msg_recv(&msg2, &q_sub); |
||||
REQUIRE(msg2.size > 0); |
||||
msgq_msg_close(&msg2); |
||||
} |
||||
} |
||||
|
||||
REQUIRE((*q_sub.read_pointers[0] >> 32) == 1); |
||||
msgq_msg_close(&msg1); |
||||
} |
||||
|
||||
TEST_CASE("msgq_msg_send test invalidation"){ |
||||
remove("/dev/shm/test_queue"); |
||||
msgq_queue_t q_pub, q_sub; |
||||
msgq_new_queue(&q_pub, "test_queue", 1024); |
||||
msgq_new_queue(&q_sub, "test_queue", 1024); |
||||
|
||||
msgq_init_publisher(&q_pub); |
||||
msgq_init_subscriber(&q_sub); |
||||
*q_sub.write_pointer = (uint64_t)1 << 32; |
||||
|
||||
REQUIRE(*q_sub.read_valids[0] == true); |
||||
|
||||
SECTION("read pointer in tag"){ |
||||
*q_sub.read_pointers[0] = 0; |
||||
} |
||||
SECTION("read pointer in data section"){ |
||||
*q_sub.read_pointers[0] = 64; |
||||
} |
||||
SECTION("read pointer in wraparound section"){ |
||||
*q_pub.write_pointer = ((uint64_t)1 << 32) | 1000; // Writer is one cycle ahead
|
||||
*q_sub.read_pointers[0] = 1020; |
||||
} |
||||
|
||||
msgq_msg_t msg; |
||||
msgq_msg_init_size(&msg, 128); |
||||
msgq_msg_send(&msg, &q_pub); |
||||
|
||||
REQUIRE(*q_sub.read_valids[0] == false); |
||||
|
||||
msgq_msg_close(&msg); |
||||
} |
||||
|
||||
TEST_CASE("msgq_init_subscriber init 2 subscribers"){ |
||||
remove("/dev/shm/test_queue"); |
||||
msgq_queue_t q1, q2; |
||||
msgq_new_queue(&q1, "test_queue", 1024); |
||||
msgq_new_queue(&q2, "test_queue", 1024); |
||||
|
||||
*q1.num_readers = 0; |
||||
|
||||
REQUIRE(*q1.num_readers == 0); |
||||
REQUIRE(*q2.num_readers == 0); |
||||
|
||||
msgq_init_subscriber(&q1); |
||||
REQUIRE(*q1.num_readers == 1); |
||||
REQUIRE(*q2.num_readers == 1); |
||||
REQUIRE(q1.reader_id == 0); |
||||
|
||||
msgq_init_subscriber(&q2); |
||||
REQUIRE(*q1.num_readers == 2); |
||||
REQUIRE(*q2.num_readers == 2); |
||||
REQUIRE(q2.reader_id == 1); |
||||
} |
||||
|
||||
|
||||
TEST_CASE("Write 1 msg, read 1 msg", "[integration]"){ |
||||
remove("/dev/shm/test_queue"); |
||||
const size_t msg_size = 128; |
||||
msgq_queue_t writer, reader; |
||||
|
||||
msgq_new_queue(&writer, "test_queue", 1024); |
||||
msgq_new_queue(&reader, "test_queue", 1024); |
||||
|
||||
msgq_init_publisher(&writer); |
||||
msgq_init_subscriber(&reader); |
||||
|
||||
// Build 128 byte message
|
||||
msgq_msg_t outgoing_msg; |
||||
msgq_msg_init_size(&outgoing_msg, msg_size); |
||||
|
||||
for (size_t i = 0; i < msg_size; i++){ |
||||
outgoing_msg.data[i] = i; |
||||
} |
||||
|
||||
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); |
||||
|
||||
msgq_msg_t incoming_msg1; |
||||
REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size); |
||||
REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0); |
||||
|
||||
// Verify that there are no more messages
|
||||
msgq_msg_t incoming_msg2; |
||||
REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == 0); |
||||
|
||||
msgq_msg_close(&outgoing_msg); |
||||
msgq_msg_close(&incoming_msg1); |
||||
msgq_msg_close(&incoming_msg2); |
||||
} |
||||
|
||||
TEST_CASE("Write 2 msg, read 2 msg - conflate = false", "[integration]"){ |
||||
remove("/dev/shm/test_queue"); |
||||
const size_t msg_size = 128; |
||||
msgq_queue_t writer, reader; |
||||
|
||||
msgq_new_queue(&writer, "test_queue", 1024); |
||||
msgq_new_queue(&reader, "test_queue", 1024); |
||||
|
||||
msgq_init_publisher(&writer); |
||||
msgq_init_subscriber(&reader); |
||||
|
||||
// Build 128 byte message
|
||||
msgq_msg_t outgoing_msg; |
||||
msgq_msg_init_size(&outgoing_msg, msg_size); |
||||
|
||||
for (size_t i = 0; i < msg_size; i++){ |
||||
outgoing_msg.data[i] = i; |
||||
} |
||||
|
||||
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); |
||||
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); |
||||
|
||||
msgq_msg_t incoming_msg1; |
||||
REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size); |
||||
REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0); |
||||
|
||||
msgq_msg_t incoming_msg2; |
||||
REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == msg_size); |
||||
REQUIRE(memcmp(incoming_msg2.data, outgoing_msg.data, msg_size) == 0); |
||||
|
||||
msgq_msg_close(&outgoing_msg); |
||||
msgq_msg_close(&incoming_msg1); |
||||
msgq_msg_close(&incoming_msg2); |
||||
} |
||||
|
||||
TEST_CASE("Write 2 msg, read 2 msg - conflate = true", "[integration]"){ |
||||
remove("/dev/shm/test_queue"); |
||||
const size_t msg_size = 128; |
||||
msgq_queue_t writer, reader; |
||||
|
||||
msgq_new_queue(&writer, "test_queue", 1024); |
||||
msgq_new_queue(&reader, "test_queue", 1024); |
||||
|
||||
msgq_init_publisher(&writer); |
||||
msgq_init_subscriber(&reader); |
||||
reader.read_conflate = true; |
||||
|
||||
// Build 128 byte message
|
||||
msgq_msg_t outgoing_msg; |
||||
msgq_msg_init_size(&outgoing_msg, msg_size); |
||||
|
||||
for (size_t i = 0; i < msg_size; i++){ |
||||
outgoing_msg.data[i] = i; |
||||
} |
||||
|
||||
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); |
||||
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); |
||||
|
||||
msgq_msg_t incoming_msg1; |
||||
REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size); |
||||
REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0); |
||||
|
||||
// Verify that there are no more messages
|
||||
msgq_msg_t incoming_msg2; |
||||
REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == 0); |
||||
|
||||
msgq_msg_close(&outgoing_msg); |
||||
msgq_msg_close(&incoming_msg1); |
||||
msgq_msg_close(&incoming_msg2); |
||||
} |
||||
|
||||
TEST_CASE("1 publisher, 1 slow subscriber", "[integration]"){ |
||||
remove("/dev/shm/test_queue"); |
||||
msgq_queue_t writer, reader; |
||||
|
||||
msgq_new_queue(&writer, "test_queue", 1024); |
||||
msgq_new_queue(&reader, "test_queue", 1024); |
||||
|
||||
msgq_init_publisher(&writer); |
||||
msgq_init_subscriber(&reader); |
||||
|
||||
int n_received = 0; |
||||
int n_skipped = 0; |
||||
|
||||
for (uint64_t i = 0; i < 1e5; i++) { |
||||
msgq_msg_t outgoing_msg; |
||||
msgq_msg_init_data(&outgoing_msg, (char*)&i, sizeof(uint64_t)); |
||||
msgq_msg_send(&outgoing_msg, &writer); |
||||
msgq_msg_close(&outgoing_msg); |
||||
|
||||
if (i % 10 == 0){ |
||||
msgq_msg_t msg1; |
||||
msgq_msg_recv(&msg1, &reader); |
||||
|
||||
if (msg1.size == 0){ |
||||
n_skipped++; |
||||
} else { |
||||
n_received++; |
||||
} |
||||
msgq_msg_close(&msg1); |
||||
} |
||||
} |
||||
|
||||
// TODO: verify these numbers by hand
|
||||
REQUIRE(n_received == 8572); |
||||
REQUIRE(n_skipped == 1428); |
||||
} |
||||
|
||||
TEST_CASE("1 publisher, 2 subscribers", "[integration]"){ |
||||
remove("/dev/shm/test_queue"); |
||||
msgq_queue_t writer, reader1, reader2; |
||||
|
||||
msgq_new_queue(&writer, "test_queue", 1024); |
||||
msgq_new_queue(&reader1, "test_queue", 1024); |
||||
msgq_new_queue(&reader2, "test_queue", 1024); |
||||
|
||||
msgq_init_publisher(&writer); |
||||
msgq_init_subscriber(&reader1); |
||||
msgq_init_subscriber(&reader2); |
||||
|
||||
for (uint64_t i = 0; i < 1024 * 3; i++) { |
||||
msgq_msg_t outgoing_msg; |
||||
msgq_msg_init_data(&outgoing_msg, (char*)&i, sizeof(uint64_t)); |
||||
msgq_msg_send(&outgoing_msg, &writer); |
||||
|
||||
msgq_msg_t msg1, msg2; |
||||
msgq_msg_recv(&msg1, &reader1); |
||||
msgq_msg_recv(&msg2, &reader2); |
||||
|
||||
REQUIRE(msg1.size == sizeof(uint64_t)); |
||||
REQUIRE(msg2.size == sizeof(uint64_t)); |
||||
REQUIRE(*(uint64_t*)msg1.data == i); |
||||
REQUIRE(*(uint64_t*)msg2.data == i); |
||||
|
||||
msgq_msg_close(&outgoing_msg); |
||||
msgq_msg_close(&msg1); |
||||
msgq_msg_close(&msg2); |
||||
} |
||||
} |
@ -0,0 +1,14 @@ |
||||
from messaging_pyx import Context, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error |
||||
|
||||
if __name__ == "__main__": |
||||
c = Context() |
||||
pub_sock = PubSocket() |
||||
pub_sock.connect(c, "controlsState") |
||||
|
||||
for i in range(int(1e10)): |
||||
print(i) |
||||
sub_sock = SubSocket() |
||||
sub_sock.connect(c, "controlsState") |
||||
|
||||
pub_sock.send(b'a') |
||||
print(sub_sock.receive()) |
@ -0,0 +1,2 @@ |
||||
#define CATCH_CONFIG_MAIN |
||||
#include "catch2/catch.hpp" |
@ -0,0 +1,142 @@ |
||||
import unittest |
||||
import time |
||||
import cereal.messaging as messaging |
||||
|
||||
import concurrent.futures |
||||
|
||||
|
||||
def poller(): |
||||
context = messaging.Context() |
||||
|
||||
p = messaging.Poller() |
||||
|
||||
sub = messaging.SubSocket() |
||||
sub.connect(context, 'controlsState') |
||||
p.registerSocket(sub) |
||||
|
||||
socks = p.poll(10000) |
||||
r = [s.receive(non_blocking=True) for s in socks] |
||||
|
||||
return r |
||||
|
||||
|
||||
class TestPoller(unittest.TestCase): |
||||
def test_poll_once(self): |
||||
context = messaging.Context() |
||||
|
||||
pub = messaging.PubSocket() |
||||
pub.connect(context, 'controlsState') |
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as e: |
||||
poll = e.submit(poller) |
||||
|
||||
time.sleep(0.1) # Slow joiner syndrome |
||||
|
||||
# Send message |
||||
pub.send("a") |
||||
|
||||
# Wait for poll result |
||||
result = poll.result() |
||||
|
||||
del pub |
||||
context.term() |
||||
|
||||
self.assertEqual(result, [b"a"]) |
||||
|
||||
def test_poll_and_create_many_subscribers(self): |
||||
context = messaging.Context() |
||||
|
||||
pub = messaging.PubSocket() |
||||
pub.connect(context, 'controlsState') |
||||
|
||||
with concurrent.futures.ThreadPoolExecutor() as e: |
||||
poll = e.submit(poller) |
||||
|
||||
time.sleep(0.1) # Slow joiner syndrome |
||||
c = messaging.Context() |
||||
for _ in range(10): |
||||
messaging.SubSocket().connect(c, 'controlsState') |
||||
|
||||
time.sleep(0.1) |
||||
|
||||
# Send message |
||||
pub.send("a") |
||||
|
||||
# Wait for poll result |
||||
result = poll.result() |
||||
|
||||
del pub |
||||
context.term() |
||||
|
||||
self.assertEqual(result, [b"a"]) |
||||
|
||||
def test_multiple_publishers_exception(self): |
||||
context = messaging.Context() |
||||
|
||||
with self.assertRaises(messaging.MultiplePublishersError): |
||||
pub1 = messaging.PubSocket() |
||||
pub1.connect(context, 'controlsState') |
||||
|
||||
pub2 = messaging.PubSocket() |
||||
pub2.connect(context, 'controlsState') |
||||
|
||||
pub1.send("a") |
||||
|
||||
del pub1 |
||||
del pub2 |
||||
context.term() |
||||
|
||||
def test_multiple_messages(self): |
||||
context = messaging.Context() |
||||
|
||||
pub = messaging.PubSocket() |
||||
pub.connect(context, 'controlsState') |
||||
|
||||
sub = messaging.SubSocket() |
||||
sub.connect(context, 'controlsState') |
||||
|
||||
time.sleep(0.1) # Slow joiner |
||||
|
||||
for i in range(100): |
||||
pub.send(str(i)) |
||||
|
||||
msg_seen = False |
||||
i = 0 |
||||
while True: |
||||
r = sub.receive(non_blocking=True) |
||||
|
||||
if r is not None: |
||||
self.assertEqual(str(i), r.decode('utf8')) |
||||
|
||||
msg_seen = True |
||||
i += 1 |
||||
|
||||
if r is None and msg_seen: # ZMQ sometimes receives nothing on the first receive |
||||
break |
||||
|
||||
del pub |
||||
del sub |
||||
context.term() |
||||
|
||||
def test_conflate(self): |
||||
context = messaging.Context() |
||||
|
||||
pub = messaging.PubSocket() |
||||
pub.connect(context, 'controlsState') |
||||
|
||||
sub = messaging.SubSocket() |
||||
sub.connect(context, 'controlsState', conflate=True) |
||||
|
||||
time.sleep(0.1) # Slow joiner |
||||
pub.send('a') |
||||
pub.send('b') |
||||
|
||||
self.assertEqual(b'b', sub.receive()) |
||||
|
||||
del pub |
||||
del sub |
||||
context.term() |
||||
|
||||
|
||||
if __name__ == "__main__": |
||||
unittest.main() |
@ -0,0 +1,163 @@ |
||||
# TODO: these port numbers are hardcoded in c, fix this |
||||
|
||||
# LogRotate: 8001 is a PUSH PULL socket between loggerd and visiond |
||||
|
||||
# all ZMQ pub sub: port, should_log, frequency, (qlog_decimation) |
||||
|
||||
# frame syncing packet |
||||
frame: [8002, true, 20., 1] |
||||
# accel, gyro, and compass |
||||
sensorEvents: [8003, true, 100., 100] |
||||
# GPS data, also global timestamp |
||||
gpsNMEA: [8004, true, 9.] # 9 msgs each sec |
||||
# CPU+MEM+GPU+BAT temps |
||||
thermal: [8005, true, 2., 1] |
||||
# List(CanData), list of can messages |
||||
can: [8006, true, 100.] |
||||
controlsState: [8007, true, 100., 100] |
||||
#liveEvent: [8008, true, 0.] |
||||
model: [8009, true, 20., 5] |
||||
features: [8010, true, 0.] |
||||
health: [8011, true, 2., 1] |
||||
radarState: [8012, true, 20.] |
||||
#liveUI: [8014, true, 0.] |
||||
encodeIdx: [8015, true, 20.] |
||||
liveTracks: [8016, true, 20.] |
||||
sendcan: [8017, true, 100.] |
||||
logMessage: [8018, true, 0.] |
||||
liveCalibration: [8019, true, 5.] |
||||
androidLog: [8020, true, 0.] |
||||
carState: [8021, true, 100., 10] |
||||
# 8022 is reserved for sshd |
||||
carControl: [8023, true, 100., 10] |
||||
plan: [8024, true, 20.] |
||||
liveLocation: [8025, true, 0.] |
||||
gpsLocation: [8026, true, 1., 1] |
||||
ethernetData: [8027, true, 0.] |
||||
navUpdate: [8028, true, 0.] |
||||
qcomGnss: [8029, true, 0.] |
||||
lidarPts: [8030, true, 0.] |
||||
procLog: [8031, true, 0.5] |
||||
gpsLocationExternal: [8032, true, 10., 1] |
||||
ubloxGnss: [8033, true, 10.] |
||||
clocks: [8034, true, 1., 1] |
||||
liveMpc: [8035, false, 20.] |
||||
liveLongitudinalMpc: [8036, false, 20.] |
||||
navStatus: [8038, true, 0.] |
||||
gpsLocationTrimble: [8039, true, 0.] |
||||
trimbleGnss: [8041, true, 0.] |
||||
ubloxRaw: [8042, true, 20.] |
||||
gpsPlannerPoints: [8043, true, 0.] |
||||
gpsPlannerPlan: [8044, true, 0.] |
||||
applanixRaw: [8046, true, 0.] |
||||
orbLocation: [8047, true, 0.] |
||||
trafficEvents: [8048, true, 0.] |
||||
liveLocationTiming: [8049, true, 0.] |
||||
orbslamCorrection: [8050, true, 0.] |
||||
liveLocationCorrected: [8051, true, 0.] |
||||
orbObservation: [8052, true, 0.] |
||||
applanixLocation: [8053, true, 0.] |
||||
liveLocationKalman: [8054, true, 0.] |
||||
uiNavigationEvent: [8055, true, 0.] |
||||
orbOdometry: [8057, true, 0.] |
||||
orbFeatures: [8058, false, 0.] |
||||
orbKeyFrame: [8059, true, 0.] |
||||
uiLayoutState: [8060, true, 0.] |
||||
frontEncodeIdx: [8061, true, 5.] |
||||
orbFeaturesSummary: [8062, true, 0.] |
||||
driverMonitoring: [8063, true, 5., 1] |
||||
liveParameters: [8064, true, 10.] |
||||
liveMapData: [8065, true, 0.] |
||||
cameraOdometry: [8066, true, 5.] |
||||
pathPlan: [8067, true, 20.] |
||||
kalmanOdometry: [8068, true, 0.] |
||||
thumbnail: [8069, true, 0.2, 1] |
||||
carEvents: [8070, true, 1., 1] |
||||
carParams: [8071, true, 0.02, 1] |
||||
frontFrame: [8072, true, 10.] |
||||
|
||||
testModel: [8040, false, 0.] |
||||
testLiveLocation: [8045, false, 0.] |
||||
testJoystick: [8056, false, 0.] |
||||
|
||||
# 8080 is reserved for slave testing daemon |
||||
# 8762 is reserved for logserver |
||||
|
||||
# manager -- base process to manage starting and stopping of all others |
||||
# subscribes: thermal |
||||
|
||||
# **** processes that communicate with the outside world **** |
||||
|
||||
# thermald -- decides when to start and stop onroad |
||||
# subscribes: health, location |
||||
# publishes: thermal |
||||
|
||||
# boardd -- communicates with the car |
||||
# subscribes: sendcan |
||||
# publishes: can, health, ubloxRaw |
||||
|
||||
# sensord -- publishes IMU and Magnetometer |
||||
# publishes: sensorEvents |
||||
|
||||
# gpsd -- publishes EON's gps |
||||
# publishes: gpsNMEA |
||||
|
||||
# visiond -- talks to the cameras, runs the model, saves the videos |
||||
# publishes: frame, model, driverMonitoring, cameraOdometry, thumbnail |
||||
|
||||
# **** stateful data transformers **** |
||||
|
||||
# plannerd -- decides where to drive the car |
||||
# subscribes: carState, model, radarState, controlsState, liveParameters |
||||
# publishes: plan, pathPlan, liveMpc, liveLongitudinalMpc |
||||
|
||||
# controlsd -- drives the car by sending CAN messages to panda |
||||
# subscribes: can, thermal, health, plan, pathPlan, driverMonitoring, liveCalibration |
||||
# publishes: carState, carControl, sendcan, controlsState, carEvents, carParams |
||||
|
||||
# radard -- processes the radar and vision data |
||||
# subscribes: can, controlsState, model, liveParameters |
||||
# publishes: radarState, liveTracks |
||||
|
||||
# params_learner -- learns vehicle params by observing the vehicle dynamics |
||||
# subscribes: controlsState, sensorEvents, cameraOdometry |
||||
# publishes: liveParameters |
||||
|
||||
# calibrationd -- reads posenet and applies a temporal filter on the frame region to look at |
||||
# subscribes: cameraOdometry |
||||
# publishes: liveCalibration |
||||
|
||||
# ubloxd -- read raw ublox data and converts them in readable format |
||||
# subscribes: ubloxRaw |
||||
# publishes: ubloxGnss |
||||
|
||||
# **** LOGGING SERVICE **** |
||||
|
||||
# loggerd |
||||
# subscribes: EVERYTHING |
||||
|
||||
# **** NON VITAL SERVICES **** |
||||
|
||||
# ui |
||||
# subscribes: thermal, model, controlsState, uiLayout, liveCalibration, radarState, liveMpc, plusFrame, liveMapData |
||||
|
||||
# uploader |
||||
# communicates through file system with loggerd |
||||
|
||||
# deleter |
||||
# communicates through file system with loggerd and uploader |
||||
|
||||
# logmessaged -- central logging service, can log to cloud |
||||
# publishes: logMessage |
||||
|
||||
# logcatd -- fetches logcat info from android |
||||
# publishes: androidLog |
||||
|
||||
# proclogd -- fetches process information |
||||
# publishes: procLog |
||||
|
||||
# tombstoned -- reports native crashes |
||||
|
||||
# athenad -- on request, open a sub socket and return the value |
||||
|
||||
# updated -- waits for network access and tries to update every hour |
@ -0,0 +1,33 @@ |
||||
#!/usr/bin/env python3 |
||||
import os |
||||
import yaml |
||||
|
||||
class Service(): |
||||
def __init__(self, port, should_log, frequency, decimation=None): |
||||
self.port = port |
||||
self.should_log = should_log |
||||
self.frequency = frequency |
||||
self.decimation = decimation |
||||
|
||||
service_list_path = os.path.join(os.path.dirname(__file__), "service_list.yaml") |
||||
|
||||
service_list = {} |
||||
with open(service_list_path, "r") as f: |
||||
for k, v in yaml.safe_load(f).items(): |
||||
decimation = None |
||||
if len(v) == 4: |
||||
decimation = v[3] |
||||
|
||||
service_list[k] = Service(v[0], v[1], v[2], decimation) |
||||
|
||||
if __name__ == "__main__": |
||||
print("/* THIS IS AN AUTOGENERATED FILE, PLEASE EDIT service_list.yaml */") |
||||
print("#ifndef __SERVICES_H") |
||||
print("#define __SERVICES_H") |
||||
print("struct service { int port; bool should_log; int frequency; int decimation; char name[0x100]; };") |
||||
print("static struct service services[] = {") |
||||
for k, v in service_list.items(): |
||||
print(' { .name = "%s", .port = %d, .should_log = %s, .frequency = %d, .decimation = %d },' % (k, v.port, "true" if v.should_log else "false", v.frequency, -1 if v.decimation is None else v.decimation)) |
||||
print("};") |
||||
print("#endif") |
||||
|
Loading…
Reference in new issue