commit
						391eb0b74b
					
				
				 35 changed files with 19871 additions and 71 deletions
			
			
		@ -0,0 +1 @@ | 
				
			||||
.sconsign.dblite | 
				
			||||
@ -0,0 +1,19 @@ | 
				
			||||
from ubuntu:16.04 | 
				
			||||
 | 
				
			||||
RUN apt-get update && apt-get install -y libzmq3-dev clang wget git autoconf libtool curl make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python-openssl | 
				
			||||
 | 
				
			||||
RUN curl -L https://github.com/pyenv/pyenv-installer/raw/master/bin/pyenv-installer | bash | 
				
			||||
ENV PATH="/root/.pyenv/bin:/root/.pyenv/shims:${PATH}" | 
				
			||||
RUN pyenv install 3.7.3 | 
				
			||||
RUN pyenv global 3.7.3 | 
				
			||||
RUN pyenv rehash | 
				
			||||
RUN pip3 install pyyaml==5.1.2 Cython==0.29.14 scons==3.1.1 pycapnp==0.6.4 | 
				
			||||
 | 
				
			||||
WORKDIR /project/cereal | 
				
			||||
COPY install_capnp.sh . | 
				
			||||
RUN ./install_capnp.sh | 
				
			||||
 | 
				
			||||
ENV PYTHONPATH=/project | 
				
			||||
 | 
				
			||||
COPY . . | 
				
			||||
RUN scons -c && scons -j$(nproc) | 
				
			||||
@ -1,62 +0,0 @@ | 
				
			||||
PWD := $(shell pwd)
 | 
				
			||||
 | 
				
			||||
SRCS := log.capnp car.capnp
 | 
				
			||||
 | 
				
			||||
GENS := gen/cpp/car.capnp.c++ gen/cpp/log.capnp.c++
 | 
				
			||||
JS := gen/js/car.capnp.js gen/js/log.capnp.js
 | 
				
			||||
 | 
				
			||||
UNAME_M ?= $(shell uname -m)
 | 
				
			||||
 | 
				
			||||
GENS += gen/c/car.capnp.c gen/c/log.capnp.c gen/c/include/c++.capnp.h gen/c/include/java.capnp.h
 | 
				
			||||
 | 
				
			||||
ifeq ($(UNAME_M),x86_64) | 
				
			||||
 | 
				
			||||
ifneq (, $(shell which capnpc-java)) | 
				
			||||
GENS += gen/java/Car.java gen/java/Log.java
 | 
				
			||||
else | 
				
			||||
$(warning capnpc-java not found, skipping java build) | 
				
			||||
endif | 
				
			||||
 | 
				
			||||
endif | 
				
			||||
 | 
				
			||||
 | 
				
			||||
ifeq ($(UNAME_M),aarch64) | 
				
			||||
CAPNPC=PATH=$(PWD)/../phonelibs/capnp-cpp/aarch64/bin/:$$PATH capnpc
 | 
				
			||||
else | 
				
			||||
CAPNPC=capnpc
 | 
				
			||||
endif | 
				
			||||
 | 
				
			||||
.PHONY: all | 
				
			||||
all: $(GENS) | 
				
			||||
js: $(JS) | 
				
			||||
 | 
				
			||||
.PHONY: clean | 
				
			||||
clean: | 
				
			||||
	rm -rf gen
 | 
				
			||||
	rm -rf node_modules
 | 
				
			||||
	rm -rf package-lock.json
 | 
				
			||||
 | 
				
			||||
gen/c/%.capnp.c: %.capnp | 
				
			||||
	@echo "[ CAPNPC C ] $@"
 | 
				
			||||
	mkdir -p gen/c/
 | 
				
			||||
	$(CAPNPC) '$<' -o c:gen/c/
 | 
				
			||||
 | 
				
			||||
gen/js/%.capnp.js: %.capnp | 
				
			||||
	@echo "[ CAPNPC JavaScript ] $@"
 | 
				
			||||
	mkdir -p gen/js/
 | 
				
			||||
	sh ./generate_javascript.sh
 | 
				
			||||
 | 
				
			||||
gen/cpp/%.capnp.c++: %.capnp | 
				
			||||
	@echo "[ CAPNPC C++ ] $@"
 | 
				
			||||
	mkdir -p gen/cpp/
 | 
				
			||||
	$(CAPNPC) '$<' -o c++:gen/cpp/
 | 
				
			||||
 | 
				
			||||
gen/java/Car.java gen/java/Log.java: $(SRCS) | 
				
			||||
	@echo "[ CAPNPC java ] $@"
 | 
				
			||||
	mkdir -p gen/java/
 | 
				
			||||
	$(CAPNPC) $^ -o java:gen/java
 | 
				
			||||
 | 
				
			||||
# c-capnproto needs some empty headers
 | 
				
			||||
gen/c/include/c++.capnp.h gen/c/include/java.capnp.h: | 
				
			||||
	mkdir -p gen/c/include
 | 
				
			||||
	touch '$@'
 | 
				
			||||
@ -0,0 +1,68 @@ | 
				
			||||
Import('env', 'arch', 'zmq') | 
				
			||||
 | 
				
			||||
gen_dir = Dir('gen') | 
				
			||||
messaging_dir = Dir('messaging') | 
				
			||||
 | 
				
			||||
# TODO: remove src-prefix and cereal from command string. can we set working directory? | 
				
			||||
env.Command(["gen/c/include/c++.capnp.h", "gen/c/include/java.capnp.h"], [], "mkdir -p " + gen_dir.path + "/c/include && touch $TARGETS") | 
				
			||||
env.Command( | 
				
			||||
  ['gen/c/car.capnp.c', 'gen/c/log.capnp.c', 'gen/c/car.capnp.h', 'gen/c/log.capnp.h'], | 
				
			||||
  ['car.capnp', 'log.capnp'], | 
				
			||||
  'capnpc $SOURCES --src-prefix=cereal -o c:' + gen_dir.path + '/c/') | 
				
			||||
env.Command( | 
				
			||||
  ['gen/cpp/car.capnp.c++', 'gen/cpp/log.capnp.c++', 'gen/cpp/car.capnp.h', 'gen/cpp/log.capnp.h'], | 
				
			||||
  ['car.capnp', 'log.capnp'], | 
				
			||||
  'capnpc $SOURCES --src-prefix=cereal -o c++:' + gen_dir.path + '/cpp/') | 
				
			||||
import shutil | 
				
			||||
if shutil.which('capnpc-java'): | 
				
			||||
  env.Command( | 
				
			||||
    ['gen/java/Car.java', 'gen/java/Log.java'], | 
				
			||||
    ['car.capnp', 'log.capnp'], | 
				
			||||
    'capnpc $SOURCES --src-prefix=cereal -o java:' + gen_dir.path + '/java/') | 
				
			||||
 | 
				
			||||
# TODO: remove non shared cereal and messaging | 
				
			||||
cereal_objects = env.SharedObject([ | 
				
			||||
    'gen/c/car.capnp.c', | 
				
			||||
    'gen/c/log.capnp.c', | 
				
			||||
    'gen/cpp/car.capnp.c++', | 
				
			||||
    'gen/cpp/log.capnp.c++', | 
				
			||||
  ]) | 
				
			||||
 | 
				
			||||
env.Library('cereal', cereal_objects) | 
				
			||||
env.SharedLibrary('cereal_shared', cereal_objects) | 
				
			||||
 | 
				
			||||
cereal_dir = Dir('.') | 
				
			||||
services_h = env.Command( | 
				
			||||
  ['services.h'], | 
				
			||||
  ['service_list.yaml', 'services.py'], | 
				
			||||
  'python3 ' + cereal_dir.path + '/services.py > $TARGET') | 
				
			||||
 | 
				
			||||
messaging_objects = env.SharedObject([ | 
				
			||||
  'messaging/messaging.cc', | 
				
			||||
  'messaging/impl_zmq.cc', | 
				
			||||
  'messaging/impl_msgq.cc', | 
				
			||||
  'messaging/msgq.cc', | 
				
			||||
]) | 
				
			||||
 | 
				
			||||
messaging_lib = env.Library('messaging', messaging_objects) | 
				
			||||
Depends('messaging/impl_zmq.cc', services_h) | 
				
			||||
 | 
				
			||||
# note, this rebuilds the deps shared, zmq is statically linked to make APK happy | 
				
			||||
# TODO: get APK to load system zmq to remove the static link | 
				
			||||
shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else [] | 
				
			||||
env.SharedLibrary('messaging_shared', messaging_objects, LIBS=shared_lib_shared_lib) | 
				
			||||
 | 
				
			||||
env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging_lib, 'zmq']) | 
				
			||||
Depends('messaging/bridge.cc', services_h) | 
				
			||||
 | 
				
			||||
# different target? | 
				
			||||
#env.Program('messaging/demo', ['messaging/demo.cc'], LIBS=[messaging_lib, 'zmq']) | 
				
			||||
 | 
				
			||||
 | 
				
			||||
env.Command(['messaging/messaging_pyx.so'], | 
				
			||||
  [messaging_lib, 'messaging/messaging_pyx_setup.py', 'messaging/messaging_pyx.pyx', 'messaging/messaging.pxd'], | 
				
			||||
  "cd " + messaging_dir.path + " && python3 messaging_pyx_setup.py build_ext --inplace") | 
				
			||||
 | 
				
			||||
 | 
				
			||||
if GetOption('test'): | 
				
			||||
  env.Program('messaging/test_runner', ['messaging/test_runner.cc', 'messaging/msgq_tests.cc'], LIBS=[messaging_lib]) | 
				
			||||
@ -0,0 +1,49 @@ | 
				
			||||
import os | 
				
			||||
import subprocess | 
				
			||||
 | 
				
			||||
zmq = 'zmq' | 
				
			||||
arch = subprocess.check_output(["uname", "-m"], encoding='utf8').rstrip() | 
				
			||||
 | 
				
			||||
cereal_dir = Dir('.') | 
				
			||||
 | 
				
			||||
cpppath = [ | 
				
			||||
    cereal_dir, | 
				
			||||
    '/usr/lib/include', | 
				
			||||
] | 
				
			||||
 | 
				
			||||
AddOption('--test', | 
				
			||||
          action='store_true', | 
				
			||||
          help='build test files') | 
				
			||||
 | 
				
			||||
AddOption('--asan', | 
				
			||||
          action='store_true', | 
				
			||||
          help='turn on ASAN') | 
				
			||||
 | 
				
			||||
ccflags_asan = ["-fsanitize=address", "-fno-omit-frame-pointer"] if GetOption('asan') else [] | 
				
			||||
ldflags_asan = ["-fsanitize=address"] if GetOption('asan') else [] | 
				
			||||
 | 
				
			||||
env = Environment( | 
				
			||||
  ENV=os.environ, | 
				
			||||
  CC='clang', | 
				
			||||
  CXX='clang++', | 
				
			||||
  CCFLAGS=[ | 
				
			||||
    "-g", | 
				
			||||
    "-fPIC", | 
				
			||||
    "-O2", | 
				
			||||
    "-Werror=implicit-function-declaration", | 
				
			||||
    "-Werror=incompatible-pointer-types", | 
				
			||||
    "-Werror=int-conversion", | 
				
			||||
    "-Werror=return-type", | 
				
			||||
    "-Werror=format-extra-args", | 
				
			||||
  ] + ccflags_asan, | 
				
			||||
  LDFLAGS=ldflags_asan, | 
				
			||||
  LINKFLAGS=ldflags_asan, | 
				
			||||
 | 
				
			||||
  CFLAGS="-std=gnu11", | 
				
			||||
  CXXFLAGS="-std=c++14", | 
				
			||||
  CPPPATH=cpppath, | 
				
			||||
) | 
				
			||||
 | 
				
			||||
 | 
				
			||||
Export('env', 'zmq', 'arch') | 
				
			||||
SConscript(['SConscript']) | 
				
			||||
@ -0,0 +1,14 @@ | 
				
			||||
pr: none | 
				
			||||
 | 
				
			||||
pool: | 
				
			||||
  vmImage: 'ubuntu-16.04' | 
				
			||||
 | 
				
			||||
steps: | 
				
			||||
- script: | | 
				
			||||
    set -e | 
				
			||||
    docker build -t cereal . | 
				
			||||
    docker run cereal bash -c "scons --test --asan -j$(nproc) && messaging/test_runner" | 
				
			||||
    docker run cereal bash -c "ZMQ=1 python -m unittest discover ." | 
				
			||||
    docker run cereal bash -c "MSGQ=1 python -m unittest discover ." | 
				
			||||
 | 
				
			||||
  displayName: 'Run Tests' | 
				
			||||
@ -0,0 +1,10 @@ | 
				
			||||
demo | 
				
			||||
bridge | 
				
			||||
test_runner | 
				
			||||
*.o | 
				
			||||
*.os | 
				
			||||
*.d | 
				
			||||
*.a | 
				
			||||
*.so | 
				
			||||
messaging_pyx.cpp | 
				
			||||
build/ | 
				
			||||
@ -0,0 +1,219 @@ | 
				
			||||
# must be build with scons | 
				
			||||
from .messaging_pyx import Context, Poller, SubSocket, PubSocket  # pylint: disable=no-name-in-module, import-error | 
				
			||||
from .messaging_pyx import MultiplePublishersError, MessagingError  # pylint: disable=no-name-in-module, import-error | 
				
			||||
 | 
				
			||||
assert MultiplePublishersError | 
				
			||||
assert MessagingError | 
				
			||||
 | 
				
			||||
from cereal import log | 
				
			||||
from cereal.services import service_list | 
				
			||||
 | 
				
			||||
# sec_since_boot is faster, but allow to run standalone too | 
				
			||||
try: | 
				
			||||
  from common.realtime import sec_since_boot | 
				
			||||
except ImportError: | 
				
			||||
  import time | 
				
			||||
  sec_since_boot = time.time | 
				
			||||
  print("Warning, using python time.time() instead of faster sec_since_boot") | 
				
			||||
 | 
				
			||||
context = Context() | 
				
			||||
 | 
				
			||||
def new_message(): | 
				
			||||
  dat = log.Event.new_message() | 
				
			||||
  dat.logMonoTime = int(sec_since_boot() * 1e9) | 
				
			||||
  dat.valid = True | 
				
			||||
  return dat | 
				
			||||
 | 
				
			||||
def pub_sock(endpoint): | 
				
			||||
  sock = PubSocket() | 
				
			||||
  sock.connect(context, endpoint) | 
				
			||||
  return sock | 
				
			||||
 | 
				
			||||
def sub_sock(endpoint, poller=None, addr="127.0.0.1", conflate=False, timeout=None): | 
				
			||||
  sock = SubSocket() | 
				
			||||
  addr = addr.encode('utf8') | 
				
			||||
  sock.connect(context, endpoint, addr, conflate) | 
				
			||||
 | 
				
			||||
  if timeout is not None: | 
				
			||||
    sock.setTimeout(timeout) | 
				
			||||
 | 
				
			||||
  if poller is not None: | 
				
			||||
    poller.registerSocket(sock) | 
				
			||||
  return sock | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def drain_sock_raw(sock, wait_for_one=False): | 
				
			||||
  """Receive all message currently available on the queue""" | 
				
			||||
  ret = [] | 
				
			||||
  while 1: | 
				
			||||
    if wait_for_one and len(ret) == 0: | 
				
			||||
      dat = sock.receive() | 
				
			||||
    else: | 
				
			||||
      dat = sock.receive(non_blocking=True) | 
				
			||||
 | 
				
			||||
    if dat is None: | 
				
			||||
      break | 
				
			||||
 | 
				
			||||
    ret.append(dat) | 
				
			||||
 | 
				
			||||
  return ret | 
				
			||||
 | 
				
			||||
def drain_sock(sock, wait_for_one=False): | 
				
			||||
  """Receive all message currently available on the queue""" | 
				
			||||
  ret = [] | 
				
			||||
  while 1: | 
				
			||||
    if wait_for_one and len(ret) == 0: | 
				
			||||
      dat = sock.receive() | 
				
			||||
    else: | 
				
			||||
      dat = sock.receive(non_blocking=True) | 
				
			||||
 | 
				
			||||
    if dat is None: # Timeout hit | 
				
			||||
      break | 
				
			||||
 | 
				
			||||
    dat = log.Event.from_bytes(dat) | 
				
			||||
    ret.append(dat) | 
				
			||||
 | 
				
			||||
  return ret | 
				
			||||
 | 
				
			||||
 | 
				
			||||
# TODO: print when we drop packets? | 
				
			||||
def recv_sock(sock, wait=False): | 
				
			||||
  """Same as drain sock, but only returns latest message. Consider using conflate instead.""" | 
				
			||||
  dat = None | 
				
			||||
 | 
				
			||||
  while 1: | 
				
			||||
    if wait and dat is None: | 
				
			||||
      rcv = sock.receive() | 
				
			||||
    else: | 
				
			||||
      rcv = sock.receive(non_blocking=True) | 
				
			||||
 | 
				
			||||
    if rcv is None: # Timeout hit | 
				
			||||
      break | 
				
			||||
 | 
				
			||||
    dat = rcv | 
				
			||||
 | 
				
			||||
  if dat is not None: | 
				
			||||
    dat = log.Event.from_bytes(dat) | 
				
			||||
 | 
				
			||||
  return dat | 
				
			||||
 | 
				
			||||
def recv_one(sock): | 
				
			||||
  dat = sock.receive() | 
				
			||||
  if dat is not None: | 
				
			||||
    dat = log.Event.from_bytes(dat) | 
				
			||||
  return dat | 
				
			||||
 | 
				
			||||
def recv_one_or_none(sock): | 
				
			||||
  dat = sock.receive(non_blocking=True) | 
				
			||||
  if dat is not None: | 
				
			||||
    dat = log.Event.from_bytes(dat) | 
				
			||||
  return dat | 
				
			||||
 | 
				
			||||
def recv_one_retry(sock): | 
				
			||||
  """Keep receiving until we get a message""" | 
				
			||||
  while True: | 
				
			||||
    dat = sock.receive() | 
				
			||||
    if dat is not None: | 
				
			||||
      return log.Event.from_bytes(dat) | 
				
			||||
 | 
				
			||||
def get_one_can(logcan): | 
				
			||||
  while True: | 
				
			||||
    can = recv_one_retry(logcan) | 
				
			||||
    if len(can.can) > 0: | 
				
			||||
      return can | 
				
			||||
 | 
				
			||||
class SubMaster(): | 
				
			||||
  def __init__(self, services, ignore_alive=None, addr="127.0.0.1"): | 
				
			||||
    self.poller = Poller() | 
				
			||||
    self.frame = -1 | 
				
			||||
    self.updated = {s : False for s in services} | 
				
			||||
    self.rcv_time = {s : 0. for s in services} | 
				
			||||
    self.rcv_frame = {s : 0 for s in services} | 
				
			||||
    self.alive = {s : False for s in services} | 
				
			||||
    self.sock = {} | 
				
			||||
    self.freq = {} | 
				
			||||
    self.data = {} | 
				
			||||
    self.logMonoTime = {} | 
				
			||||
    self.valid = {} | 
				
			||||
 | 
				
			||||
    if ignore_alive is not None: | 
				
			||||
      self.ignore_alive = ignore_alive | 
				
			||||
    else: | 
				
			||||
      self.ignore_alive = [] | 
				
			||||
 | 
				
			||||
    for s in services: | 
				
			||||
      if addr is not None: | 
				
			||||
        self.sock[s] = sub_sock(s, poller=self.poller, addr=addr, conflate=True) | 
				
			||||
      self.freq[s] = service_list[s].frequency | 
				
			||||
 | 
				
			||||
      data = new_message() | 
				
			||||
      if s in ['can', 'sensorEvents', 'liveTracks', 'sendCan', | 
				
			||||
               'ethernetData', 'cellInfo', 'wifiScan', | 
				
			||||
               'trafficEvents', 'orbObservation', 'carEvents']: | 
				
			||||
        data.init(s, 0) | 
				
			||||
      else: | 
				
			||||
        data.init(s) | 
				
			||||
      self.data[s] = getattr(data, s) | 
				
			||||
      self.logMonoTime[s] = 0 | 
				
			||||
      self.valid[s] = data.valid | 
				
			||||
 | 
				
			||||
  def __getitem__(self, s): | 
				
			||||
    return self.data[s] | 
				
			||||
 | 
				
			||||
  def update(self, timeout=1000): | 
				
			||||
    msgs = [] | 
				
			||||
    for sock in self.poller.poll(timeout): | 
				
			||||
      msgs.append(recv_one_or_none(sock)) | 
				
			||||
    self.update_msgs(sec_since_boot(), msgs) | 
				
			||||
 | 
				
			||||
  def update_msgs(self, cur_time, msgs): | 
				
			||||
    # TODO: add optional input that specify the service to wait for | 
				
			||||
    self.frame += 1 | 
				
			||||
    self.updated = dict.fromkeys(self.updated, False) | 
				
			||||
    for msg in msgs: | 
				
			||||
      if msg is None: | 
				
			||||
        continue | 
				
			||||
 | 
				
			||||
      s = msg.which() | 
				
			||||
      self.updated[s] = True | 
				
			||||
      self.rcv_time[s] = cur_time | 
				
			||||
      self.rcv_frame[s] = self.frame | 
				
			||||
      self.data[s] = getattr(msg, s) | 
				
			||||
      self.logMonoTime[s] = msg.logMonoTime | 
				
			||||
      self.valid[s] = msg.valid | 
				
			||||
 | 
				
			||||
    for s in self.data: | 
				
			||||
      # arbitrary small number to avoid float comparison. If freq is 0, we can skip the check | 
				
			||||
      if self.freq[s] > 1e-5: | 
				
			||||
        # alive if delay is within 10x the expected frequency | 
				
			||||
        self.alive[s] = (cur_time - self.rcv_time[s]) < (10. / self.freq[s]) | 
				
			||||
      else: | 
				
			||||
        self.alive[s] = True | 
				
			||||
 | 
				
			||||
  def all_alive(self, service_list=None): | 
				
			||||
    if service_list is None:  # check all | 
				
			||||
      service_list = self.alive.keys() | 
				
			||||
    return all(self.alive[s] for s in service_list if s not in self.ignore_alive) | 
				
			||||
 | 
				
			||||
  def all_valid(self, service_list=None): | 
				
			||||
    if service_list is None:  # check all | 
				
			||||
      service_list = self.valid.keys() | 
				
			||||
    return all(self.valid[s] for s in service_list) | 
				
			||||
 | 
				
			||||
  def all_alive_and_valid(self, service_list=None): | 
				
			||||
    if service_list is None:  # check all | 
				
			||||
      service_list = self.alive.keys() | 
				
			||||
    return self.all_alive(service_list=service_list) and self.all_valid(service_list=service_list) | 
				
			||||
 | 
				
			||||
 | 
				
			||||
class PubMaster(): | 
				
			||||
  def __init__(self, services): | 
				
			||||
    self.sock = {} | 
				
			||||
    for s in services: | 
				
			||||
      self.sock[s] = pub_sock(s) | 
				
			||||
 | 
				
			||||
  def send(self, s, dat): | 
				
			||||
    # accept either bytes or capnp builder | 
				
			||||
    if not isinstance(dat, bytes): | 
				
			||||
      dat = dat.to_bytes() | 
				
			||||
    self.sock[s].send(dat) | 
				
			||||
@ -0,0 +1,62 @@ | 
				
			||||
#include <iostream> | 
				
			||||
#include <string> | 
				
			||||
#include <cassert> | 
				
			||||
#include <csignal> | 
				
			||||
#include <map> | 
				
			||||
 | 
				
			||||
#include "services.h" | 
				
			||||
 | 
				
			||||
#include "impl_msgq.hpp" | 
				
			||||
#include "impl_zmq.hpp" | 
				
			||||
 | 
				
			||||
void sigpipe_handler(int sig) { | 
				
			||||
  assert(sig == SIGPIPE); | 
				
			||||
  std::cout << "SIGPIPE received" << std::endl; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
static std::vector<std::string> get_services() { | 
				
			||||
  std::vector<std::string> name_list; | 
				
			||||
 | 
				
			||||
  for (const auto& it : services) { | 
				
			||||
    std::string name = it.name; | 
				
			||||
    if (name == "plusFrame" || name == "uiLayoutState") continue; | 
				
			||||
    name_list.push_back(name); | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  return name_list; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
int main(void){ | 
				
			||||
  signal(SIGPIPE, (sighandler_t)sigpipe_handler); | 
				
			||||
 | 
				
			||||
  auto endpoints = get_services(); | 
				
			||||
 | 
				
			||||
  std::map<SubSocket*, PubSocket*> sub2pub; | 
				
			||||
 | 
				
			||||
  Context *zmq_context = new ZMQContext(); | 
				
			||||
  Context *msgq_context = new MSGQContext(); | 
				
			||||
  Poller *poller = new MSGQPoller(); | 
				
			||||
 | 
				
			||||
  for (auto endpoint: endpoints){ | 
				
			||||
    SubSocket * msgq_sock = new MSGQSubSocket(); | 
				
			||||
    msgq_sock->connect(msgq_context, endpoint, "127.0.0.1", false); | 
				
			||||
    poller->registerSocket(msgq_sock); | 
				
			||||
 | 
				
			||||
    PubSocket * zmq_sock = new ZMQPubSocket(); | 
				
			||||
    zmq_sock->connect(zmq_context, endpoint); | 
				
			||||
 | 
				
			||||
    sub2pub[msgq_sock] = zmq_sock; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  while (true){ | 
				
			||||
    for (auto sub_sock : poller->poll(100)){ | 
				
			||||
      Message * msg = sub_sock->receive(); | 
				
			||||
      if (msg == NULL) continue; | 
				
			||||
      sub2pub[sub_sock]->sendMessage(msg); | 
				
			||||
      delete msg; | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
  return 0; | 
				
			||||
} | 
				
			||||
									
										
											File diff suppressed because it is too large
											Load Diff
										
									
								
							
						@ -0,0 +1,50 @@ | 
				
			||||
#include <iostream> | 
				
			||||
#include <cstddef> | 
				
			||||
#include <chrono> | 
				
			||||
#include <thread> | 
				
			||||
#include <cassert> | 
				
			||||
 | 
				
			||||
#include "messaging.hpp" | 
				
			||||
#include "impl_zmq.hpp" | 
				
			||||
 | 
				
			||||
#define MSGS 1e5 | 
				
			||||
 | 
				
			||||
int main() { | 
				
			||||
  Context * c = Context::create(); | 
				
			||||
  SubSocket * sub_sock = SubSocket::create(c, "controlsState"); | 
				
			||||
  PubSocket * pub_sock = PubSocket::create(c, "controlsState"); | 
				
			||||
 | 
				
			||||
  char data[8]; | 
				
			||||
 | 
				
			||||
  Poller * poller = Poller::create({sub_sock}); | 
				
			||||
 | 
				
			||||
  auto start = std::chrono::steady_clock::now(); | 
				
			||||
 | 
				
			||||
  for (uint64_t i = 0; i < MSGS; i++){ | 
				
			||||
    *(uint64_t*)data = i; | 
				
			||||
    pub_sock->send(data, 8); | 
				
			||||
 | 
				
			||||
    auto r = poller->poll(100); | 
				
			||||
 | 
				
			||||
    for (auto p : r){ | 
				
			||||
      Message * m = p->receive(); | 
				
			||||
      uint64_t ii = *(uint64_t*)m->getData(); | 
				
			||||
      assert(i == ii); | 
				
			||||
      delete m; | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  auto end = std::chrono::steady_clock::now(); | 
				
			||||
  double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count() / 1e9; | 
				
			||||
  double throughput = ((double) MSGS / (double) elapsed); | 
				
			||||
  std::cout << throughput << " msg/s" << std::endl; | 
				
			||||
 | 
				
			||||
  delete poller; | 
				
			||||
  delete sub_sock; | 
				
			||||
  delete pub_sock; | 
				
			||||
  delete c; | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  return 0; | 
				
			||||
} | 
				
			||||
@ -0,0 +1,30 @@ | 
				
			||||
import time | 
				
			||||
 | 
				
			||||
from messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error | 
				
			||||
 | 
				
			||||
MSGS = 1e5 | 
				
			||||
 | 
				
			||||
if __name__ == "__main__": | 
				
			||||
  c = Context() | 
				
			||||
  sub_sock = SubSocket() | 
				
			||||
  pub_sock = PubSocket() | 
				
			||||
 | 
				
			||||
  sub_sock.connect(c, "controlsState") | 
				
			||||
  pub_sock.connect(c, "controlsState") | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  poller = Poller() | 
				
			||||
  poller.registerSocket(sub_sock) | 
				
			||||
 | 
				
			||||
  t = time.time() | 
				
			||||
  for i in range(int(MSGS)): | 
				
			||||
    bts = i.to_bytes(4, 'little') | 
				
			||||
    pub_sock.send(bts) | 
				
			||||
 | 
				
			||||
    for s in poller.poll(100): | 
				
			||||
      dat = s.receive() | 
				
			||||
      ii = int.from_bytes(dat, 'little') | 
				
			||||
      assert(i == ii) | 
				
			||||
 | 
				
			||||
  dt = time.time() - t | 
				
			||||
  print("%.1f msg/s" % (MSGS / dt)) | 
				
			||||
@ -0,0 +1,190 @@ | 
				
			||||
#include <cassert> | 
				
			||||
#include <cstring> | 
				
			||||
#include <iostream> | 
				
			||||
#include <cstdlib> | 
				
			||||
#include <csignal> | 
				
			||||
#include <cerrno> | 
				
			||||
 | 
				
			||||
 | 
				
			||||
#include "impl_msgq.hpp" | 
				
			||||
 | 
				
			||||
volatile sig_atomic_t msgq_do_exit = 0; | 
				
			||||
 | 
				
			||||
void sig_handler(int signal) { | 
				
			||||
  assert(signal == SIGINT || signal == SIGTERM); | 
				
			||||
  msgq_do_exit = 1; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
MSGQContext::MSGQContext() { | 
				
			||||
} | 
				
			||||
 | 
				
			||||
MSGQContext::~MSGQContext() { | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void MSGQMessage::init(size_t sz) { | 
				
			||||
  size = sz; | 
				
			||||
  data = new char[size]; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void MSGQMessage::init(char * d, size_t sz) { | 
				
			||||
  size = sz; | 
				
			||||
  data = new char[size]; | 
				
			||||
  memcpy(data, d, size); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void MSGQMessage::takeOwnership(char * d, size_t sz) { | 
				
			||||
  size = sz; | 
				
			||||
  data = d; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void MSGQMessage::close() { | 
				
			||||
  if (size > 0){ | 
				
			||||
    delete[] data; | 
				
			||||
  } | 
				
			||||
  size = 0; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
MSGQMessage::~MSGQMessage() { | 
				
			||||
  this->close(); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){ | 
				
			||||
  assert(context); | 
				
			||||
  assert(address == "127.0.0.1"); | 
				
			||||
 | 
				
			||||
  q = new msgq_queue_t; | 
				
			||||
  int r = msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE); | 
				
			||||
  if (r != 0){ | 
				
			||||
    return r; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  msgq_init_subscriber(q); | 
				
			||||
 | 
				
			||||
  if (conflate){ | 
				
			||||
    q->read_conflate = true; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  timeout = -1; | 
				
			||||
 | 
				
			||||
  return 0; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
Message * MSGQSubSocket::receive(bool non_blocking){ | 
				
			||||
  msgq_do_exit = 0; | 
				
			||||
 | 
				
			||||
  void (*prev_handler_sigint)(int); | 
				
			||||
  void (*prev_handler_sigterm)(int); | 
				
			||||
  if (!non_blocking){ | 
				
			||||
    prev_handler_sigint = std::signal(SIGINT, sig_handler); | 
				
			||||
    prev_handler_sigterm = std::signal(SIGTERM, sig_handler); | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  msgq_msg_t msg; | 
				
			||||
 | 
				
			||||
  MSGQMessage *r = NULL; | 
				
			||||
  r = NULL; | 
				
			||||
 | 
				
			||||
  int rc = msgq_msg_recv(&msg, q); | 
				
			||||
 | 
				
			||||
  // Hack to implement blocking read with a poller. Don't use this
 | 
				
			||||
  while (!non_blocking && rc == 0 && msgq_do_exit == 0){ | 
				
			||||
    msgq_pollitem_t items[1]; | 
				
			||||
    items[0].q = q; | 
				
			||||
 | 
				
			||||
    int t = (timeout != -1) ? timeout : 100; | 
				
			||||
 | 
				
			||||
    int n = msgq_poll(items, 1, t); | 
				
			||||
    rc = msgq_msg_recv(&msg, q); | 
				
			||||
 | 
				
			||||
    // The poll indicated a message was ready, but the receive failed. Try again
 | 
				
			||||
    if (n == 1 && rc == 0){ | 
				
			||||
      continue; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    if (timeout != -1){ | 
				
			||||
      break; | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  if (rc > 0){ | 
				
			||||
    r = new MSGQMessage; | 
				
			||||
    r->takeOwnership(msg.data, msg.size); | 
				
			||||
  } | 
				
			||||
  errno = msgq_do_exit ? EINTR : 0; | 
				
			||||
 | 
				
			||||
  if (!non_blocking){ | 
				
			||||
    std::signal(SIGINT, prev_handler_sigint); | 
				
			||||
    std::signal(SIGTERM, prev_handler_sigterm); | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  return (Message*)r; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void MSGQSubSocket::setTimeout(int t){ | 
				
			||||
  timeout = t; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
MSGQSubSocket::~MSGQSubSocket(){ | 
				
			||||
  if (q != NULL){ | 
				
			||||
    msgq_close_queue(q); | 
				
			||||
    delete q; | 
				
			||||
  } | 
				
			||||
} | 
				
			||||
 | 
				
			||||
int MSGQPubSocket::connect(Context *context, std::string endpoint){ | 
				
			||||
  assert(context); | 
				
			||||
 | 
				
			||||
  q = new msgq_queue_t; | 
				
			||||
  msgq_new_queue(q, endpoint.c_str(), DEFAULT_SEGMENT_SIZE); | 
				
			||||
  msgq_init_publisher(q); | 
				
			||||
 | 
				
			||||
  return 0; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
int MSGQPubSocket::sendMessage(Message *message){ | 
				
			||||
  msgq_msg_t msg; | 
				
			||||
  msg.data = message->getData(); | 
				
			||||
  msg.size = message->getSize(); | 
				
			||||
 | 
				
			||||
  return msgq_msg_send(&msg, q); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
int MSGQPubSocket::send(char *data, size_t size){ | 
				
			||||
  msgq_msg_t msg; | 
				
			||||
  msg.data = data; | 
				
			||||
  msg.size = size; | 
				
			||||
 | 
				
			||||
  return msgq_msg_send(&msg, q); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
MSGQPubSocket::~MSGQPubSocket(){ | 
				
			||||
  if (q != NULL){ | 
				
			||||
    msgq_close_queue(q); | 
				
			||||
    delete q; | 
				
			||||
  } | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
void MSGQPoller::registerSocket(SubSocket * socket){ | 
				
			||||
  assert(num_polls + 1 < MAX_POLLERS); | 
				
			||||
  polls[num_polls].q = (msgq_queue_t*)socket->getRawSocket(); | 
				
			||||
 | 
				
			||||
  sockets.push_back(socket); | 
				
			||||
  num_polls++; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
std::vector<SubSocket*> MSGQPoller::poll(int timeout){ | 
				
			||||
  std::vector<SubSocket*> r; | 
				
			||||
 | 
				
			||||
  msgq_poll(polls, num_polls, timeout); | 
				
			||||
  for (size_t i = 0; i < num_polls; i++){ | 
				
			||||
    if (polls[i].revents){ | 
				
			||||
      r.push_back(sockets[i]); | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  return r; | 
				
			||||
} | 
				
			||||
@ -0,0 +1,64 @@ | 
				
			||||
#pragma once | 
				
			||||
#include "messaging.hpp" | 
				
			||||
#include "msgq.hpp" | 
				
			||||
#include <zmq.h> | 
				
			||||
#include <string> | 
				
			||||
 | 
				
			||||
#define MAX_POLLERS 128 | 
				
			||||
 | 
				
			||||
class MSGQContext : public Context { | 
				
			||||
private: | 
				
			||||
  void * context = NULL; | 
				
			||||
public: | 
				
			||||
  MSGQContext(); | 
				
			||||
  void * getRawContext() {return context;} | 
				
			||||
  ~MSGQContext(); | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class MSGQMessage : public Message { | 
				
			||||
private: | 
				
			||||
  char * data; | 
				
			||||
  size_t size; | 
				
			||||
public: | 
				
			||||
  void init(size_t size); | 
				
			||||
  void init(char *data, size_t size); | 
				
			||||
  void takeOwnership(char *data, size_t size); | 
				
			||||
  size_t getSize(){return size;} | 
				
			||||
  char * getData(){return data;} | 
				
			||||
  void close(); | 
				
			||||
  ~MSGQMessage(); | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class MSGQSubSocket : public SubSocket { | 
				
			||||
private: | 
				
			||||
  msgq_queue_t * q = NULL; | 
				
			||||
  int timeout; | 
				
			||||
public: | 
				
			||||
  int connect(Context *context, std::string endpoint, std::string address, bool conflate=false); | 
				
			||||
  void setTimeout(int timeout); | 
				
			||||
  void * getRawSocket() {return (void*)q;} | 
				
			||||
  Message *receive(bool non_blocking=false); | 
				
			||||
  ~MSGQSubSocket(); | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class MSGQPubSocket : public PubSocket { | 
				
			||||
private: | 
				
			||||
  msgq_queue_t * q = NULL; | 
				
			||||
public: | 
				
			||||
  int connect(Context *context, std::string endpoint); | 
				
			||||
  int sendMessage(Message *message); | 
				
			||||
  int send(char *data, size_t size); | 
				
			||||
  ~MSGQPubSocket(); | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class MSGQPoller : public Poller { | 
				
			||||
private: | 
				
			||||
  std::vector<SubSocket*> sockets; | 
				
			||||
  msgq_pollitem_t polls[MAX_POLLERS]; | 
				
			||||
  size_t num_polls = 0; | 
				
			||||
 | 
				
			||||
public: | 
				
			||||
  void registerSocket(SubSocket *socket); | 
				
			||||
  std::vector<SubSocket*> poll(int timeout); | 
				
			||||
  ~MSGQPoller(){}; | 
				
			||||
}; | 
				
			||||
@ -0,0 +1,155 @@ | 
				
			||||
#include <cassert> | 
				
			||||
#include <cstring> | 
				
			||||
#include <iostream> | 
				
			||||
#include <cstdlib> | 
				
			||||
#include <cerrno> | 
				
			||||
 | 
				
			||||
#include <zmq.h> | 
				
			||||
 | 
				
			||||
#include "services.h" | 
				
			||||
#include "impl_zmq.hpp" | 
				
			||||
 | 
				
			||||
static int get_port(std::string endpoint) { | 
				
			||||
  int port = -1; | 
				
			||||
  for (const auto& it : services) { | 
				
			||||
    std::string name = it.name; | 
				
			||||
    if (name == endpoint) { | 
				
			||||
      port = it.port; | 
				
			||||
      break; | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  assert(port >= 0); | 
				
			||||
  return port; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
ZMQContext::ZMQContext() { | 
				
			||||
  context = zmq_ctx_new(); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
ZMQContext::~ZMQContext() { | 
				
			||||
  zmq_ctx_term(context); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void ZMQMessage::init(size_t sz) { | 
				
			||||
  size = sz; | 
				
			||||
  data = new char[size]; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void ZMQMessage::init(char * d, size_t sz) { | 
				
			||||
  size = sz; | 
				
			||||
  data = new char[size]; | 
				
			||||
  memcpy(data, d, size); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void ZMQMessage::close() { | 
				
			||||
  if (size > 0){ | 
				
			||||
    delete[] data; | 
				
			||||
  } | 
				
			||||
  size = 0; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
ZMQMessage::~ZMQMessage() { | 
				
			||||
  this->close(); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
int ZMQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate){ | 
				
			||||
  sock = zmq_socket(context->getRawContext(), ZMQ_SUB); | 
				
			||||
  if (sock == NULL){ | 
				
			||||
    return -1; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0); | 
				
			||||
 | 
				
			||||
  if (conflate){ | 
				
			||||
    int arg = 1; | 
				
			||||
    zmq_setsockopt(sock, ZMQ_CONFLATE, &arg, sizeof(int)); | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  int reconnect_ivl = 500; | 
				
			||||
  zmq_setsockopt(sock, ZMQ_RECONNECT_IVL_MAX, &reconnect_ivl, sizeof(reconnect_ivl)); | 
				
			||||
 | 
				
			||||
  full_endpoint = "tcp://" + address + ":"; | 
				
			||||
  full_endpoint += std::to_string(get_port(endpoint)); | 
				
			||||
 | 
				
			||||
  return zmq_connect(sock, full_endpoint.c_str()); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
Message * ZMQSubSocket::receive(bool non_blocking){ | 
				
			||||
  zmq_msg_t msg; | 
				
			||||
  assert(zmq_msg_init(&msg) == 0); | 
				
			||||
 | 
				
			||||
  int flags = non_blocking ? ZMQ_DONTWAIT : 0; | 
				
			||||
  int rc = zmq_msg_recv(&msg, sock, flags); | 
				
			||||
  Message *r = NULL; | 
				
			||||
 | 
				
			||||
  if (rc >= 0){ | 
				
			||||
    // Make a copy to ensure the data is aligned
 | 
				
			||||
    r = new ZMQMessage; | 
				
			||||
    r->init((char*)zmq_msg_data(&msg), zmq_msg_size(&msg)); | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  zmq_msg_close(&msg); | 
				
			||||
  return r; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void ZMQSubSocket::setTimeout(int timeout){ | 
				
			||||
  zmq_setsockopt(sock, ZMQ_RCVTIMEO, &timeout, sizeof(int)); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
ZMQSubSocket::~ZMQSubSocket(){ | 
				
			||||
  zmq_close(sock); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
int ZMQPubSocket::connect(Context *context, std::string endpoint){ | 
				
			||||
  sock = zmq_socket(context->getRawContext(), ZMQ_PUB); | 
				
			||||
  if (sock == NULL){ | 
				
			||||
    return -1; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  full_endpoint = "tcp://*:"; | 
				
			||||
  full_endpoint += std::to_string(get_port(endpoint)); | 
				
			||||
 | 
				
			||||
  return zmq_bind(sock, full_endpoint.c_str()); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
int ZMQPubSocket::sendMessage(Message *message){ | 
				
			||||
  return zmq_send(sock, message->getData(), message->getSize(), ZMQ_DONTWAIT); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
int ZMQPubSocket::send(char *data, size_t size){ | 
				
			||||
  return zmq_send(sock, data, size, ZMQ_DONTWAIT); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
ZMQPubSocket::~ZMQPubSocket(){ | 
				
			||||
  zmq_close(sock); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
void ZMQPoller::registerSocket(SubSocket * socket){ | 
				
			||||
  assert(num_polls + 1 < MAX_POLLERS); | 
				
			||||
  polls[num_polls].socket = socket->getRawSocket(); | 
				
			||||
  polls[num_polls].events = ZMQ_POLLIN; | 
				
			||||
 | 
				
			||||
  sockets.push_back(socket); | 
				
			||||
  num_polls++; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
std::vector<SubSocket*> ZMQPoller::poll(int timeout){ | 
				
			||||
  std::vector<SubSocket*> r; | 
				
			||||
 | 
				
			||||
  int rc = zmq_poll(polls, num_polls, timeout); | 
				
			||||
  if (rc < 0){ | 
				
			||||
    return r; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  for (size_t i = 0; i < num_polls; i++){ | 
				
			||||
    if (polls[i].revents){ | 
				
			||||
      r.push_back(sockets[i]); | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  return r; | 
				
			||||
} | 
				
			||||
@ -0,0 +1,63 @@ | 
				
			||||
#pragma once | 
				
			||||
#include "messaging.hpp" | 
				
			||||
#include <zmq.h> | 
				
			||||
#include <string> | 
				
			||||
 | 
				
			||||
#define MAX_POLLERS 128 | 
				
			||||
 | 
				
			||||
class ZMQContext : public Context { | 
				
			||||
private: | 
				
			||||
  void * context = NULL; | 
				
			||||
public: | 
				
			||||
  ZMQContext(); | 
				
			||||
  void * getRawContext() {return context;} | 
				
			||||
  ~ZMQContext(); | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class ZMQMessage : public Message { | 
				
			||||
private: | 
				
			||||
  char * data; | 
				
			||||
  size_t size; | 
				
			||||
public: | 
				
			||||
  void init(size_t size); | 
				
			||||
  void init(char *data, size_t size); | 
				
			||||
  size_t getSize(){return size;} | 
				
			||||
  char * getData(){return data;} | 
				
			||||
  void close(); | 
				
			||||
  ~ZMQMessage(); | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class ZMQSubSocket : public SubSocket { | 
				
			||||
private: | 
				
			||||
  void * sock; | 
				
			||||
  std::string full_endpoint; | 
				
			||||
public: | 
				
			||||
  int connect(Context *context, std::string endpoint, std::string address, bool conflate=false); | 
				
			||||
  void setTimeout(int timeout); | 
				
			||||
  void * getRawSocket() {return sock;} | 
				
			||||
  Message *receive(bool non_blocking=false); | 
				
			||||
  ~ZMQSubSocket(); | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class ZMQPubSocket : public PubSocket { | 
				
			||||
private: | 
				
			||||
  void * sock; | 
				
			||||
  std::string full_endpoint; | 
				
			||||
public: | 
				
			||||
  int connect(Context *context, std::string endpoint); | 
				
			||||
  int sendMessage(Message *message); | 
				
			||||
  int send(char *data, size_t size); | 
				
			||||
  ~ZMQPubSocket(); | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class ZMQPoller : public Poller { | 
				
			||||
private: | 
				
			||||
  std::vector<SubSocket*> sockets; | 
				
			||||
  zmq_pollitem_t polls[MAX_POLLERS]; | 
				
			||||
  size_t num_polls = 0; | 
				
			||||
 | 
				
			||||
public: | 
				
			||||
  void registerSocket(SubSocket *socket); | 
				
			||||
  std::vector<SubSocket*> poll(int timeout); | 
				
			||||
  ~ZMQPoller(){}; | 
				
			||||
}; | 
				
			||||
@ -0,0 +1,117 @@ | 
				
			||||
#include "messaging.hpp" | 
				
			||||
#include "impl_zmq.hpp" | 
				
			||||
#include "impl_msgq.hpp" | 
				
			||||
 | 
				
			||||
Context * Context::create(){ | 
				
			||||
  Context * c; | 
				
			||||
  if (std::getenv("MSGQ")){ | 
				
			||||
    c = new MSGQContext(); | 
				
			||||
  } else { | 
				
			||||
    c = new ZMQContext(); | 
				
			||||
  } | 
				
			||||
  return c; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
SubSocket * SubSocket::create(){ | 
				
			||||
  SubSocket * s; | 
				
			||||
  if (std::getenv("MSGQ")){ | 
				
			||||
    s = new MSGQSubSocket(); | 
				
			||||
  } else { | 
				
			||||
    s = new ZMQSubSocket(); | 
				
			||||
  } | 
				
			||||
  return s; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
SubSocket * SubSocket::create(Context * context, std::string endpoint){ | 
				
			||||
  SubSocket *s = SubSocket::create(); | 
				
			||||
  int r = s->connect(context, endpoint, "127.0.0.1"); | 
				
			||||
 | 
				
			||||
  if (r == 0) { | 
				
			||||
    return s; | 
				
			||||
  } else { | 
				
			||||
    delete s; | 
				
			||||
    return NULL; | 
				
			||||
  } | 
				
			||||
} | 
				
			||||
 | 
				
			||||
SubSocket * SubSocket::create(Context * context, std::string endpoint, std::string address){ | 
				
			||||
  SubSocket *s = SubSocket::create(); | 
				
			||||
  int r = s->connect(context, endpoint, address); | 
				
			||||
 | 
				
			||||
  if (r == 0) { | 
				
			||||
    return s; | 
				
			||||
  } else { | 
				
			||||
    delete s; | 
				
			||||
    return NULL; | 
				
			||||
  } | 
				
			||||
} | 
				
			||||
 | 
				
			||||
SubSocket * SubSocket::create(Context * context, std::string endpoint, std::string address, bool conflate){ | 
				
			||||
  SubSocket *s = SubSocket::create(); | 
				
			||||
  int r = s->connect(context, endpoint, address, conflate); | 
				
			||||
 | 
				
			||||
  if (r == 0) { | 
				
			||||
    return s; | 
				
			||||
  } else { | 
				
			||||
    delete s; | 
				
			||||
    return NULL; | 
				
			||||
  } | 
				
			||||
} | 
				
			||||
 | 
				
			||||
PubSocket * PubSocket::create(){ | 
				
			||||
  PubSocket * s; | 
				
			||||
  if (std::getenv("MSGQ")){ | 
				
			||||
    s = new MSGQPubSocket(); | 
				
			||||
  } else { | 
				
			||||
    s = new ZMQPubSocket(); | 
				
			||||
  } | 
				
			||||
  return s; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
PubSocket * PubSocket::create(Context * context, std::string endpoint){ | 
				
			||||
  PubSocket *s = PubSocket::create(); | 
				
			||||
  int r = s->connect(context, endpoint); | 
				
			||||
 | 
				
			||||
  if (r == 0) { | 
				
			||||
    return s; | 
				
			||||
  } else { | 
				
			||||
    delete s; | 
				
			||||
    return NULL; | 
				
			||||
  } | 
				
			||||
} | 
				
			||||
 | 
				
			||||
Poller * Poller::create(){ | 
				
			||||
  Poller * p; | 
				
			||||
  if (std::getenv("MSGQ")){ | 
				
			||||
    p = new MSGQPoller(); | 
				
			||||
  } else { | 
				
			||||
    p = new ZMQPoller(); | 
				
			||||
  } | 
				
			||||
  return p; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
Poller * Poller::create(std::vector<SubSocket*> sockets){ | 
				
			||||
  Poller * p = Poller::create(); | 
				
			||||
 | 
				
			||||
  for (auto s : sockets){ | 
				
			||||
    p->registerSocket(s); | 
				
			||||
  } | 
				
			||||
  return p; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
extern "C" Context * messaging_context_create() { | 
				
			||||
  return Context::create(); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
extern "C" SubSocket * messaging_subsocket_create(Context* context, const char* endpoint) { | 
				
			||||
  return SubSocket::create(context, std::string(endpoint)); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
extern "C" PubSocket * messaging_pubsocket_create(Context* context, const char* endpoint) { | 
				
			||||
  return PubSocket::create(context, std::string(endpoint)); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
extern "C" Poller * messaging_poller_create(SubSocket** sockets, int size) { | 
				
			||||
  std::vector<SubSocket*> socketsVec(sockets, sockets + size); | 
				
			||||
  return Poller::create(socketsVec); | 
				
			||||
} | 
				
			||||
@ -0,0 +1,56 @@ | 
				
			||||
#pragma once | 
				
			||||
#include <cstddef> | 
				
			||||
#include <vector> | 
				
			||||
#include <string> | 
				
			||||
 | 
				
			||||
#define MSG_MULTIPLE_PUBLISHERS 100 | 
				
			||||
 | 
				
			||||
class Context { | 
				
			||||
public: | 
				
			||||
  virtual void * getRawContext() = 0; | 
				
			||||
  static Context * create(); | 
				
			||||
  virtual ~Context(){}; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class Message { | 
				
			||||
public: | 
				
			||||
  virtual void init(size_t size) = 0; | 
				
			||||
  virtual void init(char * data, size_t size) = 0; | 
				
			||||
  virtual void close() = 0; | 
				
			||||
  virtual size_t getSize() = 0; | 
				
			||||
  virtual char * getData() = 0; | 
				
			||||
  virtual ~Message(){}; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
 | 
				
			||||
class SubSocket { | 
				
			||||
public: | 
				
			||||
  virtual int connect(Context *context, std::string endpoint, std::string address, bool conflate=false) = 0; | 
				
			||||
  virtual void setTimeout(int timeout) = 0; | 
				
			||||
  virtual Message *receive(bool non_blocking=false) = 0; | 
				
			||||
  virtual void * getRawSocket() = 0; | 
				
			||||
  static SubSocket * create(); | 
				
			||||
  static SubSocket * create(Context * context, std::string endpoint); | 
				
			||||
  static SubSocket * create(Context * context, std::string endpoint, std::string address); | 
				
			||||
  static SubSocket * create(Context * context, std::string endpoint, std::string address, bool conflate); | 
				
			||||
  virtual ~SubSocket(){}; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class PubSocket { | 
				
			||||
public: | 
				
			||||
  virtual int connect(Context *context, std::string endpoint) = 0; | 
				
			||||
  virtual int sendMessage(Message *message) = 0; | 
				
			||||
  virtual int send(char *data, size_t size) = 0; | 
				
			||||
  static PubSocket * create(); | 
				
			||||
  static PubSocket * create(Context * context, std::string endpoint); | 
				
			||||
  virtual ~PubSocket(){}; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
class Poller { | 
				
			||||
public: | 
				
			||||
  virtual void registerSocket(SubSocket *socket) = 0; | 
				
			||||
  virtual std::vector<SubSocket*> poll(int timeout) = 0; | 
				
			||||
  static Poller * create(); | 
				
			||||
  static Poller * create(std::vector<SubSocket*> sockets); | 
				
			||||
  virtual ~Poller(){}; | 
				
			||||
}; | 
				
			||||
@ -0,0 +1,39 @@ | 
				
			||||
# distutils: language = c++ | 
				
			||||
#cython: language_level=3 | 
				
			||||
 | 
				
			||||
from libcpp.string cimport string | 
				
			||||
from libcpp.vector cimport vector | 
				
			||||
from libcpp cimport bool | 
				
			||||
 | 
				
			||||
 | 
				
			||||
cdef extern from "messaging.hpp": | 
				
			||||
  cdef cppclass Context: | 
				
			||||
    @staticmethod | 
				
			||||
    Context * create() | 
				
			||||
 | 
				
			||||
  cdef cppclass Message: | 
				
			||||
    void init(size_t) | 
				
			||||
    void init(char *, size_t) | 
				
			||||
    void close() | 
				
			||||
    size_t getSize() | 
				
			||||
    char *getData() | 
				
			||||
 | 
				
			||||
  cdef cppclass SubSocket: | 
				
			||||
    @staticmethod | 
				
			||||
    SubSocket * create() | 
				
			||||
    int connect(Context *, string, string, bool) | 
				
			||||
    Message * receive(bool) | 
				
			||||
    void setTimeout(int) | 
				
			||||
 | 
				
			||||
  cdef cppclass PubSocket: | 
				
			||||
    @staticmethod | 
				
			||||
    PubSocket * create() | 
				
			||||
    int connect(Context *, string) | 
				
			||||
    int sendMessage(Message *) | 
				
			||||
    int send(char *, size_t) | 
				
			||||
 | 
				
			||||
  cdef cppclass Poller: | 
				
			||||
    @staticmethod | 
				
			||||
    Poller * create() | 
				
			||||
    void registerSocket(SubSocket *) | 
				
			||||
    vector[SubSocket*] poll(int) nogil | 
				
			||||
@ -0,0 +1,151 @@ | 
				
			||||
# distutils: language = c++ | 
				
			||||
# cython: c_string_encoding=ascii, language_level=3 | 
				
			||||
 | 
				
			||||
import sys | 
				
			||||
from libcpp.string cimport string | 
				
			||||
from libcpp cimport bool | 
				
			||||
from libc cimport errno | 
				
			||||
 | 
				
			||||
 | 
				
			||||
from messaging cimport Context as cppContext | 
				
			||||
from messaging cimport SubSocket as cppSubSocket | 
				
			||||
from messaging cimport PubSocket as cppPubSocket | 
				
			||||
from messaging cimport Poller as cppPoller | 
				
			||||
from messaging cimport Message as cppMessage | 
				
			||||
 | 
				
			||||
 | 
				
			||||
class MessagingError(Exception): | 
				
			||||
  pass | 
				
			||||
 | 
				
			||||
 | 
				
			||||
class MultiplePublishersError(MessagingError): | 
				
			||||
  pass | 
				
			||||
 | 
				
			||||
 | 
				
			||||
cdef class Context: | 
				
			||||
  cdef cppContext * context | 
				
			||||
 | 
				
			||||
  def __cinit__(self): | 
				
			||||
    self.context = cppContext.create() | 
				
			||||
 | 
				
			||||
  def term(self): | 
				
			||||
    del self.context | 
				
			||||
    self.context = NULL | 
				
			||||
 | 
				
			||||
  def __dealloc__(self): | 
				
			||||
    pass | 
				
			||||
    # Deleting the context will hang if sockets are still active | 
				
			||||
    # TODO: Figure out a way to make sure the context is closed last | 
				
			||||
    # del self.context | 
				
			||||
 | 
				
			||||
 | 
				
			||||
cdef class Poller: | 
				
			||||
  cdef cppPoller * poller | 
				
			||||
  cdef list sub_sockets | 
				
			||||
 | 
				
			||||
  def __cinit__(self): | 
				
			||||
    self.sub_sockets = [] | 
				
			||||
    self.poller = cppPoller.create() | 
				
			||||
 | 
				
			||||
  def __dealloc__(self): | 
				
			||||
    del self.poller | 
				
			||||
 | 
				
			||||
  def registerSocket(self, SubSocket socket): | 
				
			||||
    self.sub_sockets.append(socket) | 
				
			||||
    self.poller.registerSocket(socket.socket) | 
				
			||||
 | 
				
			||||
  def poll(self, timeout): | 
				
			||||
    sockets = [] | 
				
			||||
    cdef int t = timeout | 
				
			||||
 | 
				
			||||
    with nogil: | 
				
			||||
        result = self.poller.poll(t) | 
				
			||||
 | 
				
			||||
    for s in result: | 
				
			||||
        socket = SubSocket() | 
				
			||||
        socket.setPtr(s) | 
				
			||||
        sockets.append(socket) | 
				
			||||
 | 
				
			||||
    return sockets | 
				
			||||
 | 
				
			||||
cdef class SubSocket: | 
				
			||||
  cdef cppSubSocket * socket | 
				
			||||
  cdef bool is_owner | 
				
			||||
 | 
				
			||||
  def __cinit__(self): | 
				
			||||
    self.socket = cppSubSocket.create() | 
				
			||||
    self.is_owner = True | 
				
			||||
 | 
				
			||||
    if self.socket == NULL: | 
				
			||||
      raise MessagingError | 
				
			||||
 | 
				
			||||
  def __dealloc__(self): | 
				
			||||
    if self.is_owner: | 
				
			||||
      del self.socket | 
				
			||||
 | 
				
			||||
  cdef setPtr(self, cppSubSocket * ptr): | 
				
			||||
    if self.is_owner: | 
				
			||||
      del self.socket | 
				
			||||
 | 
				
			||||
    self.is_owner = False | 
				
			||||
    self.socket = ptr | 
				
			||||
 | 
				
			||||
  def connect(self, Context context, string endpoint, string address=b"127.0.0.1", bool conflate=False): | 
				
			||||
    r = self.socket.connect(context.context, endpoint, address, conflate) | 
				
			||||
 | 
				
			||||
    if r != 0: | 
				
			||||
      if errno.errno == errno.EADDRINUSE: | 
				
			||||
        raise MultiplePublishersError | 
				
			||||
      else: | 
				
			||||
        raise MessagingError | 
				
			||||
 | 
				
			||||
  def setTimeout(self, int timeout): | 
				
			||||
    self.socket.setTimeout(timeout) | 
				
			||||
 | 
				
			||||
  def receive(self, bool non_blocking=False): | 
				
			||||
    msg = self.socket.receive(non_blocking) | 
				
			||||
 | 
				
			||||
    if msg == NULL: | 
				
			||||
      # If a blocking read returns no message check errno if SIGINT was caught in the C++ code | 
				
			||||
      if errno.errno == errno.EINTR: | 
				
			||||
        print("SIGINT received, exiting") | 
				
			||||
        sys.exit(1) | 
				
			||||
 | 
				
			||||
      return None | 
				
			||||
    else: | 
				
			||||
      sz = msg.getSize() | 
				
			||||
      m = msg.getData()[:sz] | 
				
			||||
      del msg | 
				
			||||
 | 
				
			||||
      return m | 
				
			||||
 | 
				
			||||
 | 
				
			||||
cdef class PubSocket: | 
				
			||||
  cdef cppPubSocket * socket | 
				
			||||
 | 
				
			||||
  def __cinit__(self): | 
				
			||||
    self.socket = cppPubSocket.create() | 
				
			||||
    if self.socket == NULL: | 
				
			||||
      raise MessagingError | 
				
			||||
 | 
				
			||||
  def __dealloc__(self): | 
				
			||||
    del self.socket | 
				
			||||
 | 
				
			||||
  def connect(self, Context context, string endpoint): | 
				
			||||
    r = self.socket.connect(context.context, endpoint) | 
				
			||||
 | 
				
			||||
    if r != 0: | 
				
			||||
      if errno.errno == errno.EADDRINUSE: | 
				
			||||
        raise MultiplePublishersError | 
				
			||||
      else: | 
				
			||||
        raise MessagingError | 
				
			||||
 | 
				
			||||
  def send(self, string data): | 
				
			||||
    length = len(data) | 
				
			||||
    r = self.socket.send(<char*>data.c_str(), length) | 
				
			||||
 | 
				
			||||
    if r != length: | 
				
			||||
      if errno.errno == errno.EADDRINUSE: | 
				
			||||
        raise MultiplePublishersError | 
				
			||||
      else: | 
				
			||||
        raise MessagingError | 
				
			||||
@ -0,0 +1,56 @@ | 
				
			||||
import os | 
				
			||||
import subprocess | 
				
			||||
import sysconfig | 
				
			||||
from distutils.core import Extension, setup  # pylint: disable=import-error,no-name-in-module | 
				
			||||
 | 
				
			||||
from Cython.Build import cythonize | 
				
			||||
from Cython.Distutils import build_ext | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def get_ext_filename_without_platform_suffix(filename): | 
				
			||||
  name, ext = os.path.splitext(filename) | 
				
			||||
  ext_suffix = sysconfig.get_config_var('EXT_SUFFIX') | 
				
			||||
 | 
				
			||||
  if ext_suffix == ext: | 
				
			||||
    return filename | 
				
			||||
 | 
				
			||||
  ext_suffix = ext_suffix.replace(ext, '') | 
				
			||||
  idx = name.find(ext_suffix) | 
				
			||||
 | 
				
			||||
  if idx == -1: | 
				
			||||
    return filename | 
				
			||||
  else: | 
				
			||||
    return name[:idx] + ext | 
				
			||||
 | 
				
			||||
 | 
				
			||||
class BuildExtWithoutPlatformSuffix(build_ext): | 
				
			||||
  def get_ext_filename(self, ext_name): | 
				
			||||
    filename = super().get_ext_filename(ext_name) | 
				
			||||
    return get_ext_filename_without_platform_suffix(filename) | 
				
			||||
 | 
				
			||||
 | 
				
			||||
sourcefiles = ['messaging_pyx.pyx'] | 
				
			||||
extra_compile_args = ["-std=c++11"] | 
				
			||||
libraries = ['zmq'] | 
				
			||||
ARCH = subprocess.check_output(["uname", "-m"], encoding='utf8').rstrip()  # pylint: disable=unexpected-keyword-arg | 
				
			||||
 | 
				
			||||
if ARCH == "aarch64": | 
				
			||||
  extra_compile_args += ["-Wno-deprecated-register"] | 
				
			||||
  libraries += ['gnustl_shared'] | 
				
			||||
 | 
				
			||||
setup(name='CAN parser', | 
				
			||||
      cmdclass={'build_ext': BuildExtWithoutPlatformSuffix}, | 
				
			||||
      ext_modules=cythonize( | 
				
			||||
        Extension( | 
				
			||||
          "messaging_pyx", | 
				
			||||
          language="c++", | 
				
			||||
          sources=sourcefiles, | 
				
			||||
          extra_compile_args=extra_compile_args, | 
				
			||||
          libraries=libraries, | 
				
			||||
          extra_objects=[ | 
				
			||||
            os.path.join(os.path.dirname(os.path.realpath(__file__)), '../', 'libmessaging.a'), | 
				
			||||
          ] | 
				
			||||
        ) | 
				
			||||
      ), | 
				
			||||
      nthreads=4, | 
				
			||||
) | 
				
			||||
@ -0,0 +1,441 @@ | 
				
			||||
#include <iostream> | 
				
			||||
#include <cassert> | 
				
			||||
#include <cerrno> | 
				
			||||
#include <cmath> | 
				
			||||
#include <cstring> | 
				
			||||
#include <cstdint> | 
				
			||||
#include <chrono> | 
				
			||||
#include <algorithm> | 
				
			||||
#include <cstdlib> | 
				
			||||
#include <csignal> | 
				
			||||
#include <random> | 
				
			||||
 | 
				
			||||
#include <poll.h> | 
				
			||||
#include <sys/ioctl.h> | 
				
			||||
#include <sys/mman.h> | 
				
			||||
#include <sys/stat.h> | 
				
			||||
#include <sys/types.h> | 
				
			||||
#include <sys/syscall.h> | 
				
			||||
#include <fcntl.h> | 
				
			||||
#include <unistd.h> | 
				
			||||
 | 
				
			||||
#include <stdio.h> | 
				
			||||
 | 
				
			||||
#include "msgq.hpp" | 
				
			||||
 | 
				
			||||
void sigusr1_handler(int signal) { | 
				
			||||
  assert(signal == SIGUSR1); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
uint64_t msgq_get_uid(void){ | 
				
			||||
  std::random_device rd("/dev/urandom"); | 
				
			||||
  std::uniform_int_distribution<uint64_t> distribution(0,std::numeric_limits<uint32_t>::max()); | 
				
			||||
 | 
				
			||||
  uint64_t uid = distribution(rd) << 32 | syscall(SYS_gettid); | 
				
			||||
  return uid; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
int msgq_msg_init_size(msgq_msg_t * msg, size_t size){ | 
				
			||||
  msg->size = size; | 
				
			||||
  msg->data = new(std::nothrow) char[size]; | 
				
			||||
 | 
				
			||||
  return (msg->data == NULL) ? -1 : 0; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
int msgq_msg_init_data(msgq_msg_t * msg, char * data, size_t size) { | 
				
			||||
  int r = msgq_msg_init_size(msg, size); | 
				
			||||
 | 
				
			||||
  if (r == 0) | 
				
			||||
    memcpy(msg->data, data, size); | 
				
			||||
 | 
				
			||||
  return r; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
int msgq_msg_close(msgq_msg_t * msg){ | 
				
			||||
  if (msg->size > 0) | 
				
			||||
    delete[] msg->data; | 
				
			||||
 | 
				
			||||
  msg->size = 0; | 
				
			||||
 | 
				
			||||
  return 0; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void msgq_reset_reader(msgq_queue_t * q){ | 
				
			||||
  int id = q->reader_id; | 
				
			||||
  q->read_valids[id]->store(true); | 
				
			||||
  q->read_pointers[id]->store(*q->write_pointer); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void msgq_wait_for_subscriber(msgq_queue_t *q){ | 
				
			||||
  while (*q->num_readers == 0){ | 
				
			||||
    ; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  return; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ | 
				
			||||
  assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
 | 
				
			||||
 | 
				
			||||
  std::signal(SIGUSR1, sigusr1_handler); | 
				
			||||
 | 
				
			||||
  const char * prefix = "/dev/shm/"; | 
				
			||||
  char * full_path = new char[strlen(path) + strlen(prefix) + 1]; | 
				
			||||
  strcpy(full_path, prefix); | 
				
			||||
  strcat(full_path, path); | 
				
			||||
 | 
				
			||||
  auto fd = open(full_path, O_RDWR | O_CREAT, 0777); | 
				
			||||
  delete[] full_path; | 
				
			||||
 | 
				
			||||
  if (fd < 0) | 
				
			||||
    return -1; | 
				
			||||
 | 
				
			||||
  int rc = ftruncate(fd, size + sizeof(msgq_header_t)); | 
				
			||||
  if (rc < 0) | 
				
			||||
    return -1; | 
				
			||||
 | 
				
			||||
  char * mem = (char*)mmap(NULL, size + sizeof(msgq_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); | 
				
			||||
  close(fd); | 
				
			||||
 | 
				
			||||
  if (mem == NULL) | 
				
			||||
    return -1; | 
				
			||||
 | 
				
			||||
  q->mmap_p = mem; | 
				
			||||
 | 
				
			||||
  msgq_header_t *header = (msgq_header_t *)mem; | 
				
			||||
 | 
				
			||||
  // Setup pointers to header segment
 | 
				
			||||
  q->num_readers = reinterpret_cast<std::atomic<uint64_t>*>(&header->num_readers); | 
				
			||||
  q->write_pointer = reinterpret_cast<std::atomic<uint64_t>*>(&header->write_pointer); | 
				
			||||
  q->write_uid = reinterpret_cast<std::atomic<uint64_t>*>(&header->write_uid); | 
				
			||||
 | 
				
			||||
  for (size_t i = 0; i < NUM_READERS; i++){ | 
				
			||||
    q->read_pointers[i] = reinterpret_cast<std::atomic<uint64_t>*>(&header->read_pointers[i]); | 
				
			||||
    q->read_valids[i] = reinterpret_cast<std::atomic<uint64_t>*>(&header->read_valids[i]); | 
				
			||||
    q->read_uids[i] = reinterpret_cast<std::atomic<uint64_t>*>(&header->read_uids[i]); | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  q->data = mem + sizeof(msgq_header_t); | 
				
			||||
  q->size = size; | 
				
			||||
  q->reader_id = -1; | 
				
			||||
 | 
				
			||||
  q->endpoint = path; | 
				
			||||
  q->read_conflate = false; | 
				
			||||
 | 
				
			||||
  return 0; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void msgq_close_queue(msgq_queue_t *q){ | 
				
			||||
  if (q->mmap_p != NULL){ | 
				
			||||
    munmap(q->mmap_p, q->size + sizeof(msgq_header_t)); | 
				
			||||
  } | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
void msgq_init_publisher(msgq_queue_t * q) { | 
				
			||||
  std::cout << "Starting publisher" << std::endl; | 
				
			||||
  uint64_t uid = msgq_get_uid(); | 
				
			||||
 | 
				
			||||
  *q->write_uid = uid; | 
				
			||||
  *q->num_readers = 0; | 
				
			||||
 | 
				
			||||
  for (size_t i = 0; i < NUM_READERS; i++){ | 
				
			||||
    *q->read_valids[i] = false; | 
				
			||||
    *q->read_uids[i] = 0; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  q->write_uid_local = uid; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
void msgq_init_subscriber(msgq_queue_t * q) { | 
				
			||||
  assert(q != NULL); | 
				
			||||
  assert(q->num_readers != NULL); | 
				
			||||
 | 
				
			||||
  uint64_t uid = msgq_get_uid(); | 
				
			||||
 | 
				
			||||
  // Get reader id
 | 
				
			||||
  while (true){ | 
				
			||||
    uint64_t cur_num_readers = *q->num_readers; | 
				
			||||
    uint64_t new_num_readers = cur_num_readers + 1; | 
				
			||||
 | 
				
			||||
    // No more slots available. Reset all subscribers to kick out inactive ones
 | 
				
			||||
    if (new_num_readers > NUM_READERS){ | 
				
			||||
      std::cout << "Warning, evicting all subscribers!" << std::endl; | 
				
			||||
      *q->num_readers = 0; | 
				
			||||
 | 
				
			||||
      for (size_t i = 0; i < NUM_READERS; i++){ | 
				
			||||
        *q->read_valids[i] = false; | 
				
			||||
 | 
				
			||||
        uint64_t old_uid = *q->read_uids[i]; | 
				
			||||
        *q->read_uids[i] = 0; | 
				
			||||
 | 
				
			||||
        // Wake up reader in case they are in a poll
 | 
				
			||||
        syscall(SYS_tkill, old_uid & 0xFFFFFFFF, SIGUSR1); | 
				
			||||
      } | 
				
			||||
 | 
				
			||||
      continue; | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    // Use atomic compare and swap to handle race condition
 | 
				
			||||
    // where two subscribers start at the same time
 | 
				
			||||
    if (std::atomic_compare_exchange_strong(q->num_readers, | 
				
			||||
                                            &cur_num_readers, | 
				
			||||
                                            new_num_readers)){ | 
				
			||||
      q->reader_id = cur_num_readers; | 
				
			||||
      q->read_uid_local = uid; | 
				
			||||
 | 
				
			||||
      // We start with read_valid = false,
 | 
				
			||||
      // on the first read the read pointer will be synchronized with the write pointer
 | 
				
			||||
      *q->read_valids[cur_num_readers] = false; | 
				
			||||
      *q->read_pointers[cur_num_readers] = 0; | 
				
			||||
      *q->read_uids[cur_num_readers] = uid; | 
				
			||||
      break; | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl; | 
				
			||||
  msgq_reset_reader(q); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){ | 
				
			||||
  // Die if we are no longer the active publisher
 | 
				
			||||
  if (q->write_uid_local != *q->write_uid){ | 
				
			||||
    std::cout << "Killing old publisher: " << q->endpoint << std::endl; | 
				
			||||
    errno = EADDRINUSE; | 
				
			||||
    return -1; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  uint64_t total_msg_size = ALIGN(msg->size + sizeof(int64_t)); | 
				
			||||
 | 
				
			||||
  // We need to fit at least three messages in the queue,
 | 
				
			||||
  // then we can always safely access the last message
 | 
				
			||||
  assert(3 * total_msg_size <= q->size); | 
				
			||||
 | 
				
			||||
  uint64_t num_readers = *q->num_readers; | 
				
			||||
 | 
				
			||||
  uint32_t write_cycles, write_pointer; | 
				
			||||
  UNPACK64(write_cycles, write_pointer, *q->write_pointer); | 
				
			||||
 | 
				
			||||
  char *p = q->data + write_pointer; // add base offset
 | 
				
			||||
 | 
				
			||||
  // Check remaining space
 | 
				
			||||
  // Always leave space for a wraparound tag for the next message, including alignment
 | 
				
			||||
  int64_t remaining_space = q->size - write_pointer - total_msg_size - sizeof(int64_t); | 
				
			||||
  if (remaining_space <= 0){ | 
				
			||||
    // Write -1 size tag indicating wraparound
 | 
				
			||||
    *(int64_t*)p = -1; | 
				
			||||
 | 
				
			||||
    // Invalidate all readers that are beyond the write pointer
 | 
				
			||||
    // TODO: should we handle the case where a new reader shows up while this is running?
 | 
				
			||||
    for (uint64_t i = 0; i < num_readers; i++){ | 
				
			||||
      uint64_t read_pointer = *q->read_pointers[i]; | 
				
			||||
      uint64_t read_cycles = read_pointer >> 32; | 
				
			||||
      read_pointer &= 0xFFFFFFFF; | 
				
			||||
 | 
				
			||||
      if ((read_pointer > write_pointer) && (read_cycles != write_cycles)) { | 
				
			||||
        *q->read_valids[i] = false; | 
				
			||||
      } | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    // Update global and local copies of write pointer and write_cycles
 | 
				
			||||
    write_pointer = 0; | 
				
			||||
    write_cycles = write_cycles + 1; | 
				
			||||
    PACK64(*q->write_pointer, write_cycles, write_pointer); | 
				
			||||
 | 
				
			||||
    // Set actual pointer to the beginning of the data segment
 | 
				
			||||
    p = q->data; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  // Invalidate readers that are in the area that will be written
 | 
				
			||||
  uint64_t start = write_pointer; | 
				
			||||
  uint64_t end = ALIGN(start + sizeof(int64_t) + msg->size); | 
				
			||||
 | 
				
			||||
  for (uint64_t i = 0; i < num_readers; i++){ | 
				
			||||
    uint32_t read_cycles, read_pointer; | 
				
			||||
    UNPACK64(read_cycles, read_pointer, *q->read_pointers[i]); | 
				
			||||
 | 
				
			||||
    if ((read_pointer >= start) && (read_pointer < end) && (read_cycles != write_cycles)) { | 
				
			||||
      *q->read_valids[i] = false; | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  // Write size tag
 | 
				
			||||
  std::atomic<int64_t> *size_p = reinterpret_cast<std::atomic<int64_t>*>(p); | 
				
			||||
  *size_p = msg->size; | 
				
			||||
 | 
				
			||||
  // Copy data
 | 
				
			||||
  memcpy(p + sizeof(int64_t), msg->data, msg->size); | 
				
			||||
  __sync_synchronize(); | 
				
			||||
 | 
				
			||||
  // Update write pointer
 | 
				
			||||
  uint32_t new_ptr = ALIGN(write_pointer + msg->size + sizeof(int64_t)); | 
				
			||||
  PACK64(*q->write_pointer, write_cycles, new_ptr); | 
				
			||||
 | 
				
			||||
  // Notify readers
 | 
				
			||||
  for (uint64_t i = 0; i < num_readers; i++){ | 
				
			||||
    uint64_t reader_uid = *q->read_uids[i]; | 
				
			||||
 | 
				
			||||
    syscall(SYS_tkill, reader_uid & 0xFFFFFFFF, SIGUSR1); | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  return msg->size; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
int msgq_msg_ready(msgq_queue_t * q){ | 
				
			||||
 start: | 
				
			||||
  int id = q->reader_id; | 
				
			||||
  assert(id >= 0); // Make sure subscriber is initialized
 | 
				
			||||
 | 
				
			||||
  if (q->read_uid_local != *q->read_uids[id]){ | 
				
			||||
    std::cout << q->endpoint << ": Reader was evicted, reconnecting" << std::endl; | 
				
			||||
    msgq_init_subscriber(q); | 
				
			||||
    goto start; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  // Check valid
 | 
				
			||||
  if (!*q->read_valids[id]){ | 
				
			||||
    msgq_reset_reader(q); | 
				
			||||
    goto start; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  uint32_t read_cycles, read_pointer; | 
				
			||||
  UNPACK64(read_cycles, read_pointer, *q->read_pointers[id]); | 
				
			||||
 | 
				
			||||
  uint32_t write_cycles, write_pointer; | 
				
			||||
  UNPACK64(write_cycles, write_pointer, *q->write_pointer); | 
				
			||||
 | 
				
			||||
  // Check if new message is available
 | 
				
			||||
  return (read_pointer != write_pointer); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
int msgq_msg_recv(msgq_msg_t * msg, msgq_queue_t * q){ | 
				
			||||
 start: | 
				
			||||
  int id = q->reader_id; | 
				
			||||
  assert(id >= 0); // Make sure subscriber is initialized
 | 
				
			||||
 | 
				
			||||
  if (q->read_uid_local != *q->read_uids[id]){ | 
				
			||||
    std::cout << q->endpoint << ": Reader was evicted, reconnecting" << std::endl; | 
				
			||||
    msgq_init_subscriber(q); | 
				
			||||
    goto start; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  // Check valid
 | 
				
			||||
  if (!*q->read_valids[id]){ | 
				
			||||
    msgq_reset_reader(q); | 
				
			||||
    goto start; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  uint32_t read_cycles, read_pointer; | 
				
			||||
  UNPACK64(read_cycles, read_pointer, *q->read_pointers[id]); | 
				
			||||
 | 
				
			||||
  uint32_t write_cycles, write_pointer; | 
				
			||||
  UNPACK64(write_cycles, write_pointer, *q->write_pointer); | 
				
			||||
 | 
				
			||||
  char * p = q->data + read_pointer; | 
				
			||||
 | 
				
			||||
  // Check if new message is available
 | 
				
			||||
  if (read_pointer == write_pointer) { | 
				
			||||
    msg->size = 0; | 
				
			||||
    return 0; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  // Read potential message size
 | 
				
			||||
  std::atomic<int64_t> *size_p = reinterpret_cast<std::atomic<int64_t>*>(p); | 
				
			||||
  std::int64_t size = *size_p; | 
				
			||||
 | 
				
			||||
  // Check if the size that was read is valid
 | 
				
			||||
  if (!*q->read_valids[id]){ | 
				
			||||
    msgq_reset_reader(q); | 
				
			||||
    goto start; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  // If size is -1 the buffer was full, and we need to wrap around
 | 
				
			||||
  if (size == -1){ | 
				
			||||
    read_cycles++; | 
				
			||||
    PACK64(*q->read_pointers[id], read_cycles, 0); | 
				
			||||
    goto start; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  // crashing is better than passing garbage data to the consumer
 | 
				
			||||
  // the size will have weird value if it was overwritten by data accidentally
 | 
				
			||||
  assert((uint64_t)size < q->size); | 
				
			||||
  assert(size > 0); | 
				
			||||
 | 
				
			||||
  uint32_t new_read_pointer = ALIGN(read_pointer + sizeof(std::int64_t) + size); | 
				
			||||
 | 
				
			||||
  // If conflate is true, check if this is the latest message, else start over
 | 
				
			||||
  if (q->read_conflate){ | 
				
			||||
    if (new_read_pointer != write_pointer){ | 
				
			||||
      // Update read pointer
 | 
				
			||||
      PACK64(*q->read_pointers[id], read_cycles, new_read_pointer); | 
				
			||||
      goto start; | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  // Copy message
 | 
				
			||||
  if (msgq_msg_init_size(msg, size) < 0) | 
				
			||||
    return -1; | 
				
			||||
 | 
				
			||||
  __sync_synchronize(); | 
				
			||||
  memcpy(msg->data, p + sizeof(int64_t), size); | 
				
			||||
  __sync_synchronize(); | 
				
			||||
 | 
				
			||||
  // Update read pointer
 | 
				
			||||
  PACK64(*q->read_pointers[id], read_cycles, new_read_pointer); | 
				
			||||
 | 
				
			||||
  // Check if the actual data that was copied is valid
 | 
				
			||||
  if (!*q->read_valids[id]){ | 
				
			||||
    msgq_msg_close(msg); | 
				
			||||
    msgq_reset_reader(q); | 
				
			||||
    goto start; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  return msg->size; | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
 | 
				
			||||
int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout){ | 
				
			||||
  assert(timeout >= 0); | 
				
			||||
 | 
				
			||||
  int num = 0; | 
				
			||||
 | 
				
			||||
  // Check if messages ready
 | 
				
			||||
  for (size_t i = 0; i < nitems; i++) { | 
				
			||||
    items[i].revents = msgq_msg_ready(items[i].q); | 
				
			||||
    if (items[i].revents) num++; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  int ms = (timeout == -1) ? 100 : timeout; | 
				
			||||
  struct timespec ts; | 
				
			||||
  ts.tv_sec = ms / 1000; | 
				
			||||
  ts.tv_nsec = (ms % 1000) * 1000 * 1000; | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  while (num == 0) { | 
				
			||||
    int ret; | 
				
			||||
 | 
				
			||||
    ret = nanosleep(&ts, &ts); | 
				
			||||
 | 
				
			||||
    // Check if messages ready
 | 
				
			||||
    for (size_t i = 0; i < nitems; i++) { | 
				
			||||
      if (items[i].revents == 0 && msgq_msg_ready(items[i].q)){ | 
				
			||||
        num += 1; | 
				
			||||
        items[i].revents = 1; | 
				
			||||
      } | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    // exit if we had a timeout and the sleep finished
 | 
				
			||||
    if (timeout != -1 && ret == 0){ | 
				
			||||
      break; | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  return num; | 
				
			||||
} | 
				
			||||
@ -0,0 +1,66 @@ | 
				
			||||
#pragma once | 
				
			||||
#include <cstdint> | 
				
			||||
#include <cstring> | 
				
			||||
#include <string> | 
				
			||||
#include <atomic> | 
				
			||||
 | 
				
			||||
#define DEFAULT_SEGMENT_SIZE (10 * 1024 * 1024) | 
				
			||||
#define NUM_READERS 8 | 
				
			||||
#define ALIGN(n) ((n + (8 - 1)) & -8) | 
				
			||||
 | 
				
			||||
#define UNPACK64(higher, lower, input) do {uint64_t tmp = input; higher = tmp >> 32; lower = tmp & 0xFFFFFFFF;} while (0) | 
				
			||||
#define PACK64(output, higher, lower) output = ((uint64_t)higher << 32 ) | ((uint64_t)lower & 0xFFFFFFFF) | 
				
			||||
 | 
				
			||||
struct  msgq_header_t { | 
				
			||||
  uint64_t num_readers; | 
				
			||||
  uint64_t write_pointer; | 
				
			||||
  uint64_t write_uid; | 
				
			||||
  uint64_t read_pointers[NUM_READERS]; | 
				
			||||
  uint64_t read_valids[NUM_READERS]; | 
				
			||||
  uint64_t read_uids[NUM_READERS]; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
struct msgq_queue_t { | 
				
			||||
  std::atomic<uint64_t> *num_readers; | 
				
			||||
  std::atomic<uint64_t> *write_pointer; | 
				
			||||
  std::atomic<uint64_t> *write_uid; | 
				
			||||
  std::atomic<uint64_t> *read_pointers[NUM_READERS]; | 
				
			||||
  std::atomic<uint64_t> *read_valids[NUM_READERS]; | 
				
			||||
  std::atomic<uint64_t> *read_uids[NUM_READERS]; | 
				
			||||
  char * mmap_p; | 
				
			||||
  char * data; | 
				
			||||
  size_t size; | 
				
			||||
  int reader_id; | 
				
			||||
  uint64_t read_uid_local; | 
				
			||||
  uint64_t write_uid_local; | 
				
			||||
 | 
				
			||||
  bool read_conflate; | 
				
			||||
  std::string endpoint; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
struct msgq_msg_t { | 
				
			||||
  size_t size; | 
				
			||||
  char * data; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
struct msgq_pollitem_t { | 
				
			||||
  msgq_queue_t *q; | 
				
			||||
  int revents; | 
				
			||||
}; | 
				
			||||
 | 
				
			||||
void msgq_wait_for_subscriber(msgq_queue_t *q); | 
				
			||||
void msgq_reset_reader(msgq_queue_t *q); | 
				
			||||
 | 
				
			||||
int msgq_msg_init_size(msgq_msg_t *msg, size_t size); | 
				
			||||
int msgq_msg_init_data(msgq_msg_t *msg, char * data, size_t size); | 
				
			||||
int msgq_msg_close(msgq_msg_t *msg); | 
				
			||||
 | 
				
			||||
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size); | 
				
			||||
void msgq_close_queue(msgq_queue_t *q); | 
				
			||||
void msgq_init_publisher(msgq_queue_t * q); | 
				
			||||
void msgq_init_subscriber(msgq_queue_t * q); | 
				
			||||
 | 
				
			||||
int msgq_msg_send(msgq_msg_t *msg, msgq_queue_t *q); | 
				
			||||
int msgq_msg_recv(msgq_msg_t *msg, msgq_queue_t *q); | 
				
			||||
int msgq_msg_ready(msgq_queue_t * q); | 
				
			||||
int msgq_poll(msgq_pollitem_t * items, size_t nitems, int timeout); | 
				
			||||
@ -0,0 +1,56 @@ | 
				
			||||
# MSGQ: A lock free single producer multi consumer message queue | 
				
			||||
 | 
				
			||||
[](https://dev.azure.com/commaai/default/_build/latest?definitionId=21&branchName=master) | 
				
			||||
 | 
				
			||||
## What is MSGQ? | 
				
			||||
MSGQ is a system to pass messages from a single producer to multiple consumers. All the consumers need to be able to receive all the messages. It is designed to be a high performance replacement for ZMQ-like SUB/PUB patterns. It uses a ring buffer in shared memory to efficiently read and write data. Each read requires a copy. Writing can be done without a copy, as long as the size of the data is known in advance. | 
				
			||||
 | 
				
			||||
## Storage | 
				
			||||
The storage for the queue consists of an area of metadata, and the actual buffer. The metadata contains: | 
				
			||||
 | 
				
			||||
1. A counter to the number of readers that are active | 
				
			||||
2. A pointer to the head of the queue for writing. From now on referred to as *write pointer* | 
				
			||||
3. A cycle counter for the writer. This counter is incremented when the writer wraps around | 
				
			||||
4. N pointers, pointing to the current read position for all the readers. From now on referred to as *read pointer* | 
				
			||||
5. N counters,  counting the number of cycles for all the readers | 
				
			||||
6. N booleans, indicating validity for all the readers. From now on referred to as *validity flag* | 
				
			||||
 | 
				
			||||
The counter and the pointer are both 32 bit values, packed into 64 bit so they can be read and written atomically. | 
				
			||||
 | 
				
			||||
The data buffer is a ring buffer. All messages are prefixed by an 8 byte size field, followed by the data. A size of -1 indicates a wrap-around, and means the next message is stored at the beginning of the buffer. | 
				
			||||
 | 
				
			||||
 | 
				
			||||
## Writing | 
				
			||||
Writing involves the following steps: | 
				
			||||
 | 
				
			||||
1. Check if the area that is to be written overlaps with any of the read pointers, mark those readers as invalid by clearing the validity flag. | 
				
			||||
2. Write the message | 
				
			||||
3. Increase the write pointer by the size of the message | 
				
			||||
 | 
				
			||||
In case there is not enough space at the end of the buffer, a special empty message with a prefix of -1 is written. The cycle counter is incremented by one. In this case step 1 will check there are no read pointers pointing to the remainder of the buffer. Then another write cycle will start with the actual message. | 
				
			||||
 | 
				
			||||
There always needs to be 8 bytes of empty space at the end of the buffer. By doing this there is always space to write the -1. | 
				
			||||
 | 
				
			||||
## Reset reader | 
				
			||||
When the reader is lagging too much behind the read pointer becomes invalid and no longer points to the beginning of a valid message. To reset a reader to the current write pointer, the following steps are performed: | 
				
			||||
 | 
				
			||||
1. Set valid flag | 
				
			||||
2. Set read cycle counter to that of the writer | 
				
			||||
3. Set read pointer to write pointer | 
				
			||||
 | 
				
			||||
## Reading | 
				
			||||
Reading involves the following steps: | 
				
			||||
 | 
				
			||||
1. Read the size field at the current read pointer | 
				
			||||
2. Read the validity flag | 
				
			||||
3. Copy the data out of the buffer | 
				
			||||
4. Increase the read pointer by the size of the message | 
				
			||||
5. Check the validity flag again | 
				
			||||
 | 
				
			||||
Before starting the copy, the valid flag is checked. This is to prevent a race condition where the size prefix was invalid, and the read could read outside of the buffer. Make sure that step 1 and 2 are not reordered by your compiler or CPU. | 
				
			||||
 | 
				
			||||
If a writer overwrites the data while it's being copied out, the data will be invalid. Therefore the validity flag is also checked after reading it. The order of step 4 and 5 does not matter. | 
				
			||||
 | 
				
			||||
If at steps 2 or 5 the validity flag is not set, the reader is reset. Any data that was already read is discarded. After the reader is reset, the reading starts from the beginning. | 
				
			||||
 | 
				
			||||
If a message with size -1 is encountered, step 3 and 4 are replaced by increasing the cycle counter and setting the read pointer to the beginning of the buffer. After that another read is performed. | 
				
			||||
@ -0,0 +1,395 @@ | 
				
			||||
#include "catch2/catch.hpp" | 
				
			||||
#include "msgq.hpp" | 
				
			||||
 | 
				
			||||
TEST_CASE("ALIGN"){ | 
				
			||||
  REQUIRE(ALIGN(0) == 0); | 
				
			||||
  REQUIRE(ALIGN(1) == 8); | 
				
			||||
  REQUIRE(ALIGN(7) == 8); | 
				
			||||
  REQUIRE(ALIGN(8) == 8); | 
				
			||||
  REQUIRE(ALIGN(99999) == 100000); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
TEST_CASE("msgq_msg_init_size"){ | 
				
			||||
  const size_t msg_size = 30; | 
				
			||||
  msgq_msg_t msg; | 
				
			||||
 | 
				
			||||
  msgq_msg_init_size(&msg, msg_size); | 
				
			||||
  REQUIRE(msg.size == msg_size); | 
				
			||||
 | 
				
			||||
  msgq_msg_close(&msg); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
TEST_CASE("msgq_msg_init_data"){ | 
				
			||||
  const size_t msg_size = 30; | 
				
			||||
  char * data = new char[msg_size]; | 
				
			||||
 | 
				
			||||
  for (size_t i = 0; i < msg_size; i++){ | 
				
			||||
    data[i] = i; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  msgq_msg_t msg; | 
				
			||||
  msgq_msg_init_data(&msg, data, msg_size); | 
				
			||||
 | 
				
			||||
  REQUIRE(msg.size == msg_size); | 
				
			||||
  REQUIRE(memcmp(msg.data, data, msg_size) == 0); | 
				
			||||
 | 
				
			||||
  delete[] data; | 
				
			||||
  msgq_msg_close(&msg); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
TEST_CASE("msgq_init_subscriber"){ | 
				
			||||
  remove("/dev/shm/test_queue"); | 
				
			||||
  msgq_queue_t q; | 
				
			||||
  msgq_new_queue(&q, "test_queue", 1024); | 
				
			||||
  REQUIRE(*q.num_readers == 0); | 
				
			||||
 | 
				
			||||
  q.reader_id = 1; | 
				
			||||
  *q.read_valids[0] = false; | 
				
			||||
  *q.read_pointers[0] = ((uint64_t)1 << 32); | 
				
			||||
 | 
				
			||||
  *q.write_pointer = 255; | 
				
			||||
 | 
				
			||||
  msgq_init_subscriber(&q); | 
				
			||||
  REQUIRE(q.read_conflate == false); | 
				
			||||
  REQUIRE(*q.read_valids[0] == true); | 
				
			||||
  REQUIRE((*q.read_pointers[0] >> 32) == 0); | 
				
			||||
  REQUIRE((*q.read_pointers[0]  & 0xFFFFFFFF) == 255); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
TEST_CASE("msgq_msg_send first message"){ | 
				
			||||
  remove("/dev/shm/test_queue"); | 
				
			||||
  msgq_queue_t q; | 
				
			||||
  msgq_new_queue(&q, "test_queue", 1024); | 
				
			||||
  msgq_init_publisher(&q); | 
				
			||||
 | 
				
			||||
  REQUIRE(*q.write_pointer == 0); | 
				
			||||
 | 
				
			||||
  size_t msg_size = 128; | 
				
			||||
 | 
				
			||||
  SECTION("Aligned message size"){ | 
				
			||||
  } | 
				
			||||
  SECTION("Unaligned message size"){ | 
				
			||||
    msg_size--; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  char * data = new char[msg_size]; | 
				
			||||
 | 
				
			||||
  for (size_t i = 0; i < msg_size; i++){ | 
				
			||||
    data[i] = i; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  msgq_msg_t msg; | 
				
			||||
  msgq_msg_init_data(&msg, data, msg_size); | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  msgq_msg_send(&msg, &q); | 
				
			||||
  REQUIRE(*(int64_t*)q.data == msg_size); // Check size tag
 | 
				
			||||
  REQUIRE(*q.write_pointer == 128 + sizeof(int64_t)); | 
				
			||||
  REQUIRE(memcmp(q.data + sizeof(int64_t), data, msg_size) == 0); | 
				
			||||
 | 
				
			||||
  delete[] data; | 
				
			||||
  msgq_msg_close(&msg); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
TEST_CASE("msgq_msg_send test wraparound"){ | 
				
			||||
  remove("/dev/shm/test_queue"); | 
				
			||||
  msgq_queue_t q; | 
				
			||||
  msgq_new_queue(&q, "test_queue", 1024); | 
				
			||||
  msgq_init_publisher(&q); | 
				
			||||
 | 
				
			||||
  REQUIRE((*q.write_pointer & 0xFFFFFFFF) == 0); | 
				
			||||
  REQUIRE((*q.write_pointer >> 32) == 0); | 
				
			||||
 | 
				
			||||
  const size_t msg_size = 120; | 
				
			||||
  msgq_msg_t msg; | 
				
			||||
  msgq_msg_init_size(&msg, msg_size); | 
				
			||||
 | 
				
			||||
  for (int i = 0; i < 8; i++) { | 
				
			||||
    msgq_msg_send(&msg, &q); | 
				
			||||
  } | 
				
			||||
  // Check 8th message was written at the beginning
 | 
				
			||||
  REQUIRE((*q.write_pointer & 0xFFFFFFFF) == msg_size + sizeof(int64_t)); | 
				
			||||
 | 
				
			||||
  // Check cycle count
 | 
				
			||||
  REQUIRE((*q.write_pointer >> 32) == 1); | 
				
			||||
 | 
				
			||||
  // Check wraparound tag
 | 
				
			||||
  char * tag_location = q.data; | 
				
			||||
  tag_location += 7 * (msg_size + sizeof(int64_t)); | 
				
			||||
  REQUIRE(*(int64_t*)tag_location == -1); | 
				
			||||
 | 
				
			||||
  msgq_msg_close(&msg); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
TEST_CASE("msgq_msg_recv test wraparound"){ | 
				
			||||
  remove("/dev/shm/test_queue"); | 
				
			||||
  msgq_queue_t q_pub, q_sub; | 
				
			||||
  msgq_new_queue(&q_pub, "test_queue", 1024); | 
				
			||||
  msgq_new_queue(&q_sub, "test_queue", 1024); | 
				
			||||
 | 
				
			||||
  msgq_init_publisher(&q_pub); | 
				
			||||
  msgq_init_subscriber(&q_sub); | 
				
			||||
 | 
				
			||||
  REQUIRE((*q_pub.write_pointer >> 32) == 0); | 
				
			||||
  REQUIRE((*q_sub.read_pointers[0] >> 32) == 0); | 
				
			||||
 | 
				
			||||
  const size_t msg_size = 120; | 
				
			||||
  msgq_msg_t msg1; | 
				
			||||
  msgq_msg_init_size(&msg1, msg_size); | 
				
			||||
 | 
				
			||||
 | 
				
			||||
  SECTION("Check cycle counter after reset") { | 
				
			||||
    for (int i = 0; i < 8; i++) { | 
				
			||||
      msgq_msg_send(&msg1, &q_pub); | 
				
			||||
    } | 
				
			||||
 | 
				
			||||
    msgq_msg_t msg2; | 
				
			||||
    msgq_msg_recv(&msg2, &q_sub); | 
				
			||||
    REQUIRE(msg2.size == 0); // Reader had to reset
 | 
				
			||||
    msgq_msg_close(&msg2); | 
				
			||||
 | 
				
			||||
  } | 
				
			||||
  SECTION("Check cycle counter while keeping up with writer") { | 
				
			||||
    for (int i = 0; i < 8; i++) { | 
				
			||||
      msgq_msg_send(&msg1, &q_pub); | 
				
			||||
 | 
				
			||||
      msgq_msg_t msg2; | 
				
			||||
      msgq_msg_recv(&msg2, &q_sub); | 
				
			||||
      REQUIRE(msg2.size > 0); | 
				
			||||
      msgq_msg_close(&msg2); | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  REQUIRE((*q_sub.read_pointers[0] >> 32) == 1); | 
				
			||||
  msgq_msg_close(&msg1); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
TEST_CASE("msgq_msg_send test invalidation"){ | 
				
			||||
  remove("/dev/shm/test_queue"); | 
				
			||||
  msgq_queue_t q_pub, q_sub; | 
				
			||||
  msgq_new_queue(&q_pub, "test_queue", 1024); | 
				
			||||
  msgq_new_queue(&q_sub, "test_queue", 1024); | 
				
			||||
 | 
				
			||||
  msgq_init_publisher(&q_pub); | 
				
			||||
  msgq_init_subscriber(&q_sub); | 
				
			||||
  *q_sub.write_pointer = (uint64_t)1 << 32; | 
				
			||||
 | 
				
			||||
  REQUIRE(*q_sub.read_valids[0] == true); | 
				
			||||
 | 
				
			||||
  SECTION("read pointer in tag"){ | 
				
			||||
    *q_sub.read_pointers[0] = 0; | 
				
			||||
  } | 
				
			||||
  SECTION("read pointer in data section"){ | 
				
			||||
    *q_sub.read_pointers[0] = 64; | 
				
			||||
  } | 
				
			||||
  SECTION("read pointer in wraparound section"){ | 
				
			||||
    *q_pub.write_pointer = ((uint64_t)1 << 32) | 1000; // Writer is one cycle ahead
 | 
				
			||||
    *q_sub.read_pointers[0] = 1020; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  msgq_msg_t msg; | 
				
			||||
  msgq_msg_init_size(&msg, 128); | 
				
			||||
  msgq_msg_send(&msg, &q_pub); | 
				
			||||
 | 
				
			||||
  REQUIRE(*q_sub.read_valids[0] == false); | 
				
			||||
 | 
				
			||||
  msgq_msg_close(&msg); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
TEST_CASE("msgq_init_subscriber init 2 subscribers"){ | 
				
			||||
  remove("/dev/shm/test_queue"); | 
				
			||||
  msgq_queue_t q1, q2; | 
				
			||||
  msgq_new_queue(&q1, "test_queue", 1024); | 
				
			||||
  msgq_new_queue(&q2, "test_queue", 1024); | 
				
			||||
 | 
				
			||||
  *q1.num_readers = 0; | 
				
			||||
 | 
				
			||||
  REQUIRE(*q1.num_readers == 0); | 
				
			||||
  REQUIRE(*q2.num_readers == 0); | 
				
			||||
 | 
				
			||||
  msgq_init_subscriber(&q1); | 
				
			||||
  REQUIRE(*q1.num_readers == 1); | 
				
			||||
  REQUIRE(*q2.num_readers == 1); | 
				
			||||
  REQUIRE(q1.reader_id == 0); | 
				
			||||
 | 
				
			||||
  msgq_init_subscriber(&q2); | 
				
			||||
  REQUIRE(*q1.num_readers == 2); | 
				
			||||
  REQUIRE(*q2.num_readers == 2); | 
				
			||||
  REQUIRE(q2.reader_id == 1); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
 | 
				
			||||
TEST_CASE("Write 1 msg, read 1 msg", "[integration]"){ | 
				
			||||
  remove("/dev/shm/test_queue"); | 
				
			||||
  const size_t msg_size = 128; | 
				
			||||
  msgq_queue_t writer, reader; | 
				
			||||
 | 
				
			||||
  msgq_new_queue(&writer, "test_queue", 1024); | 
				
			||||
  msgq_new_queue(&reader, "test_queue", 1024); | 
				
			||||
 | 
				
			||||
  msgq_init_publisher(&writer); | 
				
			||||
  msgq_init_subscriber(&reader); | 
				
			||||
 | 
				
			||||
  // Build 128 byte message
 | 
				
			||||
  msgq_msg_t outgoing_msg; | 
				
			||||
  msgq_msg_init_size(&outgoing_msg, msg_size); | 
				
			||||
 | 
				
			||||
  for (size_t i = 0; i < msg_size; i++){ | 
				
			||||
    outgoing_msg.data[i] = i; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); | 
				
			||||
 | 
				
			||||
  msgq_msg_t incoming_msg1; | 
				
			||||
  REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size); | 
				
			||||
  REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0); | 
				
			||||
 | 
				
			||||
  // Verify that there are no more messages
 | 
				
			||||
  msgq_msg_t incoming_msg2; | 
				
			||||
  REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == 0); | 
				
			||||
 | 
				
			||||
  msgq_msg_close(&outgoing_msg); | 
				
			||||
  msgq_msg_close(&incoming_msg1); | 
				
			||||
  msgq_msg_close(&incoming_msg2); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
TEST_CASE("Write 2 msg, read 2 msg - conflate = false", "[integration]"){ | 
				
			||||
  remove("/dev/shm/test_queue"); | 
				
			||||
  const size_t msg_size = 128; | 
				
			||||
  msgq_queue_t writer, reader; | 
				
			||||
 | 
				
			||||
  msgq_new_queue(&writer, "test_queue", 1024); | 
				
			||||
  msgq_new_queue(&reader, "test_queue", 1024); | 
				
			||||
 | 
				
			||||
  msgq_init_publisher(&writer); | 
				
			||||
  msgq_init_subscriber(&reader); | 
				
			||||
 | 
				
			||||
  // Build 128 byte message
 | 
				
			||||
  msgq_msg_t outgoing_msg; | 
				
			||||
  msgq_msg_init_size(&outgoing_msg, msg_size); | 
				
			||||
 | 
				
			||||
  for (size_t i = 0; i < msg_size; i++){ | 
				
			||||
    outgoing_msg.data[i] = i; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); | 
				
			||||
  REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); | 
				
			||||
 | 
				
			||||
  msgq_msg_t incoming_msg1; | 
				
			||||
  REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size); | 
				
			||||
  REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0); | 
				
			||||
 | 
				
			||||
  msgq_msg_t incoming_msg2; | 
				
			||||
  REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == msg_size); | 
				
			||||
  REQUIRE(memcmp(incoming_msg2.data, outgoing_msg.data, msg_size) == 0); | 
				
			||||
 | 
				
			||||
  msgq_msg_close(&outgoing_msg); | 
				
			||||
  msgq_msg_close(&incoming_msg1); | 
				
			||||
  msgq_msg_close(&incoming_msg2); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
TEST_CASE("Write 2 msg, read 2 msg - conflate = true", "[integration]"){ | 
				
			||||
  remove("/dev/shm/test_queue"); | 
				
			||||
  const size_t msg_size = 128; | 
				
			||||
  msgq_queue_t writer, reader; | 
				
			||||
 | 
				
			||||
  msgq_new_queue(&writer, "test_queue", 1024); | 
				
			||||
  msgq_new_queue(&reader, "test_queue", 1024); | 
				
			||||
 | 
				
			||||
  msgq_init_publisher(&writer); | 
				
			||||
  msgq_init_subscriber(&reader); | 
				
			||||
  reader.read_conflate = true; | 
				
			||||
 | 
				
			||||
  // Build 128 byte message
 | 
				
			||||
  msgq_msg_t outgoing_msg; | 
				
			||||
  msgq_msg_init_size(&outgoing_msg, msg_size); | 
				
			||||
 | 
				
			||||
  for (size_t i = 0; i < msg_size; i++){ | 
				
			||||
    outgoing_msg.data[i] = i; | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); | 
				
			||||
  REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); | 
				
			||||
 | 
				
			||||
  msgq_msg_t incoming_msg1; | 
				
			||||
  REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size); | 
				
			||||
  REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0); | 
				
			||||
 | 
				
			||||
  // Verify that there are no more messages
 | 
				
			||||
  msgq_msg_t incoming_msg2; | 
				
			||||
  REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == 0); | 
				
			||||
 | 
				
			||||
  msgq_msg_close(&outgoing_msg); | 
				
			||||
  msgq_msg_close(&incoming_msg1); | 
				
			||||
  msgq_msg_close(&incoming_msg2); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
TEST_CASE("1 publisher, 1 slow subscriber", "[integration]"){ | 
				
			||||
  remove("/dev/shm/test_queue"); | 
				
			||||
  msgq_queue_t writer, reader; | 
				
			||||
 | 
				
			||||
  msgq_new_queue(&writer, "test_queue", 1024); | 
				
			||||
  msgq_new_queue(&reader, "test_queue", 1024); | 
				
			||||
 | 
				
			||||
  msgq_init_publisher(&writer); | 
				
			||||
  msgq_init_subscriber(&reader); | 
				
			||||
 | 
				
			||||
  int n_received = 0; | 
				
			||||
  int n_skipped = 0; | 
				
			||||
 | 
				
			||||
  for (uint64_t i = 0; i < 1e5; i++) { | 
				
			||||
    msgq_msg_t outgoing_msg; | 
				
			||||
    msgq_msg_init_data(&outgoing_msg, (char*)&i, sizeof(uint64_t)); | 
				
			||||
    msgq_msg_send(&outgoing_msg, &writer); | 
				
			||||
    msgq_msg_close(&outgoing_msg); | 
				
			||||
 | 
				
			||||
    if (i % 10 == 0){ | 
				
			||||
      msgq_msg_t msg1; | 
				
			||||
      msgq_msg_recv(&msg1, &reader); | 
				
			||||
 | 
				
			||||
      if (msg1.size == 0){ | 
				
			||||
        n_skipped++; | 
				
			||||
      } else { | 
				
			||||
        n_received++; | 
				
			||||
      } | 
				
			||||
      msgq_msg_close(&msg1); | 
				
			||||
    } | 
				
			||||
  } | 
				
			||||
 | 
				
			||||
  // TODO: verify these numbers by hand
 | 
				
			||||
  REQUIRE(n_received == 8572); | 
				
			||||
  REQUIRE(n_skipped == 1428); | 
				
			||||
} | 
				
			||||
 | 
				
			||||
TEST_CASE("1 publisher, 2 subscribers", "[integration]"){ | 
				
			||||
  remove("/dev/shm/test_queue"); | 
				
			||||
  msgq_queue_t writer, reader1, reader2; | 
				
			||||
 | 
				
			||||
  msgq_new_queue(&writer, "test_queue", 1024); | 
				
			||||
  msgq_new_queue(&reader1, "test_queue", 1024); | 
				
			||||
  msgq_new_queue(&reader2, "test_queue", 1024); | 
				
			||||
 | 
				
			||||
  msgq_init_publisher(&writer); | 
				
			||||
  msgq_init_subscriber(&reader1); | 
				
			||||
  msgq_init_subscriber(&reader2); | 
				
			||||
 | 
				
			||||
  for (uint64_t i = 0; i < 1024 * 3; i++) { | 
				
			||||
    msgq_msg_t outgoing_msg; | 
				
			||||
    msgq_msg_init_data(&outgoing_msg, (char*)&i, sizeof(uint64_t)); | 
				
			||||
    msgq_msg_send(&outgoing_msg, &writer); | 
				
			||||
 | 
				
			||||
    msgq_msg_t msg1, msg2; | 
				
			||||
    msgq_msg_recv(&msg1, &reader1); | 
				
			||||
    msgq_msg_recv(&msg2, &reader2); | 
				
			||||
 | 
				
			||||
    REQUIRE(msg1.size == sizeof(uint64_t)); | 
				
			||||
    REQUIRE(msg2.size == sizeof(uint64_t)); | 
				
			||||
    REQUIRE(*(uint64_t*)msg1.data == i); | 
				
			||||
    REQUIRE(*(uint64_t*)msg2.data == i); | 
				
			||||
 | 
				
			||||
    msgq_msg_close(&outgoing_msg); | 
				
			||||
    msgq_msg_close(&msg1); | 
				
			||||
    msgq_msg_close(&msg2); | 
				
			||||
  } | 
				
			||||
} | 
				
			||||
@ -0,0 +1,14 @@ | 
				
			||||
from messaging_pyx import Context, SubSocket, PubSocket  # pylint: disable=no-name-in-module, import-error | 
				
			||||
 | 
				
			||||
if __name__ == "__main__": | 
				
			||||
  c = Context() | 
				
			||||
  pub_sock = PubSocket() | 
				
			||||
  pub_sock.connect(c, "controlsState") | 
				
			||||
 | 
				
			||||
  for i in range(int(1e10)): | 
				
			||||
    print(i) | 
				
			||||
    sub_sock = SubSocket() | 
				
			||||
    sub_sock.connect(c, "controlsState") | 
				
			||||
 | 
				
			||||
    pub_sock.send(b'a') | 
				
			||||
    print(sub_sock.receive()) | 
				
			||||
@ -0,0 +1,2 @@ | 
				
			||||
#define CATCH_CONFIG_MAIN | 
				
			||||
#include "catch2/catch.hpp" | 
				
			||||
@ -0,0 +1,142 @@ | 
				
			||||
import unittest | 
				
			||||
import time | 
				
			||||
import cereal.messaging as messaging | 
				
			||||
 | 
				
			||||
import concurrent.futures | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def poller(): | 
				
			||||
  context = messaging.Context() | 
				
			||||
 | 
				
			||||
  p = messaging.Poller() | 
				
			||||
 | 
				
			||||
  sub = messaging.SubSocket() | 
				
			||||
  sub.connect(context, 'controlsState') | 
				
			||||
  p.registerSocket(sub) | 
				
			||||
 | 
				
			||||
  socks = p.poll(10000) | 
				
			||||
  r = [s.receive(non_blocking=True) for s in socks] | 
				
			||||
 | 
				
			||||
  return r | 
				
			||||
 | 
				
			||||
 | 
				
			||||
class TestPoller(unittest.TestCase): | 
				
			||||
  def test_poll_once(self): | 
				
			||||
    context = messaging.Context() | 
				
			||||
 | 
				
			||||
    pub = messaging.PubSocket() | 
				
			||||
    pub.connect(context, 'controlsState') | 
				
			||||
 | 
				
			||||
    with concurrent.futures.ThreadPoolExecutor() as e: | 
				
			||||
      poll = e.submit(poller) | 
				
			||||
 | 
				
			||||
      time.sleep(0.1)  # Slow joiner syndrome | 
				
			||||
 | 
				
			||||
      # Send message | 
				
			||||
      pub.send("a") | 
				
			||||
 | 
				
			||||
      # Wait for poll result | 
				
			||||
      result = poll.result() | 
				
			||||
 | 
				
			||||
    del pub | 
				
			||||
    context.term() | 
				
			||||
 | 
				
			||||
    self.assertEqual(result, [b"a"]) | 
				
			||||
 | 
				
			||||
  def test_poll_and_create_many_subscribers(self): | 
				
			||||
    context = messaging.Context() | 
				
			||||
 | 
				
			||||
    pub = messaging.PubSocket() | 
				
			||||
    pub.connect(context, 'controlsState') | 
				
			||||
 | 
				
			||||
    with concurrent.futures.ThreadPoolExecutor() as e: | 
				
			||||
      poll = e.submit(poller) | 
				
			||||
 | 
				
			||||
      time.sleep(0.1)  # Slow joiner syndrome | 
				
			||||
      c = messaging.Context() | 
				
			||||
      for _ in range(10): | 
				
			||||
        messaging.SubSocket().connect(c, 'controlsState') | 
				
			||||
 | 
				
			||||
      time.sleep(0.1) | 
				
			||||
 | 
				
			||||
      # Send message | 
				
			||||
      pub.send("a") | 
				
			||||
 | 
				
			||||
      # Wait for poll result | 
				
			||||
      result = poll.result() | 
				
			||||
 | 
				
			||||
    del pub | 
				
			||||
    context.term() | 
				
			||||
 | 
				
			||||
    self.assertEqual(result, [b"a"]) | 
				
			||||
 | 
				
			||||
  def test_multiple_publishers_exception(self): | 
				
			||||
    context = messaging.Context() | 
				
			||||
 | 
				
			||||
    with self.assertRaises(messaging.MultiplePublishersError): | 
				
			||||
      pub1 = messaging.PubSocket() | 
				
			||||
      pub1.connect(context, 'controlsState') | 
				
			||||
 | 
				
			||||
      pub2 = messaging.PubSocket() | 
				
			||||
      pub2.connect(context, 'controlsState') | 
				
			||||
 | 
				
			||||
      pub1.send("a") | 
				
			||||
 | 
				
			||||
    del pub1 | 
				
			||||
    del pub2 | 
				
			||||
    context.term() | 
				
			||||
 | 
				
			||||
  def test_multiple_messages(self): | 
				
			||||
    context = messaging.Context() | 
				
			||||
 | 
				
			||||
    pub = messaging.PubSocket() | 
				
			||||
    pub.connect(context, 'controlsState') | 
				
			||||
 | 
				
			||||
    sub = messaging.SubSocket() | 
				
			||||
    sub.connect(context, 'controlsState') | 
				
			||||
 | 
				
			||||
    time.sleep(0.1)  # Slow joiner | 
				
			||||
 | 
				
			||||
    for i in range(100): | 
				
			||||
      pub.send(str(i)) | 
				
			||||
 | 
				
			||||
    msg_seen = False | 
				
			||||
    i = 0 | 
				
			||||
    while True: | 
				
			||||
      r = sub.receive(non_blocking=True) | 
				
			||||
 | 
				
			||||
      if r is not None: | 
				
			||||
        self.assertEqual(str(i), r.decode('utf8')) | 
				
			||||
 | 
				
			||||
        msg_seen = True | 
				
			||||
        i += 1 | 
				
			||||
 | 
				
			||||
      if r is None and msg_seen:  # ZMQ sometimes receives nothing on the first receive | 
				
			||||
        break | 
				
			||||
 | 
				
			||||
    del pub | 
				
			||||
    del sub | 
				
			||||
    context.term() | 
				
			||||
 | 
				
			||||
  def test_conflate(self): | 
				
			||||
    context = messaging.Context() | 
				
			||||
 | 
				
			||||
    pub = messaging.PubSocket() | 
				
			||||
    pub.connect(context, 'controlsState') | 
				
			||||
 | 
				
			||||
    sub = messaging.SubSocket() | 
				
			||||
    sub.connect(context, 'controlsState', conflate=True) | 
				
			||||
 | 
				
			||||
    time.sleep(0.1)  # Slow joiner | 
				
			||||
    pub.send('a') | 
				
			||||
    pub.send('b') | 
				
			||||
 | 
				
			||||
    self.assertEqual(b'b', sub.receive()) | 
				
			||||
 | 
				
			||||
    del pub | 
				
			||||
    del sub | 
				
			||||
    context.term() | 
				
			||||
 | 
				
			||||
 | 
				
			||||
if __name__ == "__main__": | 
				
			||||
  unittest.main() | 
				
			||||
@ -0,0 +1,163 @@ | 
				
			||||
# TODO: these port numbers are hardcoded in c, fix this | 
				
			||||
 | 
				
			||||
# LogRotate: 8001 is a PUSH PULL socket between loggerd and visiond | 
				
			||||
 | 
				
			||||
# all ZMQ pub sub: port, should_log, frequency, (qlog_decimation) | 
				
			||||
 | 
				
			||||
# frame syncing packet | 
				
			||||
frame: [8002, true, 20., 1] | 
				
			||||
# accel, gyro, and compass | 
				
			||||
sensorEvents: [8003, true, 100., 100] | 
				
			||||
# GPS data, also global timestamp | 
				
			||||
gpsNMEA: [8004, true, 9.]  # 9 msgs each sec | 
				
			||||
# CPU+MEM+GPU+BAT temps | 
				
			||||
thermal: [8005, true, 2., 1] | 
				
			||||
# List(CanData), list of can messages | 
				
			||||
can: [8006, true, 100.] | 
				
			||||
controlsState: [8007, true, 100., 100] | 
				
			||||
#liveEvent: [8008, true, 0.] | 
				
			||||
model: [8009, true, 20., 5] | 
				
			||||
features: [8010, true, 0.] | 
				
			||||
health: [8011, true, 2., 1] | 
				
			||||
radarState: [8012, true, 20.] | 
				
			||||
#liveUI: [8014, true, 0.] | 
				
			||||
encodeIdx: [8015, true, 20.] | 
				
			||||
liveTracks: [8016, true, 20.] | 
				
			||||
sendcan: [8017, true, 100.] | 
				
			||||
logMessage: [8018, true, 0.] | 
				
			||||
liveCalibration: [8019, true, 5.] | 
				
			||||
androidLog: [8020, true, 0.] | 
				
			||||
carState: [8021, true, 100., 10] | 
				
			||||
# 8022 is reserved for sshd | 
				
			||||
carControl: [8023, true, 100., 10] | 
				
			||||
plan: [8024, true, 20.] | 
				
			||||
liveLocation: [8025, true, 0.] | 
				
			||||
gpsLocation: [8026, true, 1., 1] | 
				
			||||
ethernetData: [8027, true, 0.] | 
				
			||||
navUpdate: [8028, true, 0.] | 
				
			||||
qcomGnss: [8029, true, 0.] | 
				
			||||
lidarPts: [8030, true, 0.] | 
				
			||||
procLog: [8031, true, 0.5] | 
				
			||||
gpsLocationExternal: [8032, true, 10., 1] | 
				
			||||
ubloxGnss: [8033, true, 10.] | 
				
			||||
clocks: [8034, true, 1., 1] | 
				
			||||
liveMpc: [8035, false, 20.] | 
				
			||||
liveLongitudinalMpc: [8036, false, 20.] | 
				
			||||
navStatus: [8038, true, 0.] | 
				
			||||
gpsLocationTrimble: [8039, true, 0.] | 
				
			||||
trimbleGnss: [8041, true, 0.] | 
				
			||||
ubloxRaw: [8042, true, 20.] | 
				
			||||
gpsPlannerPoints: [8043, true, 0.] | 
				
			||||
gpsPlannerPlan: [8044, true, 0.] | 
				
			||||
applanixRaw: [8046, true, 0.] | 
				
			||||
orbLocation: [8047, true, 0.] | 
				
			||||
trafficEvents: [8048, true, 0.] | 
				
			||||
liveLocationTiming: [8049, true, 0.] | 
				
			||||
orbslamCorrection: [8050, true, 0.] | 
				
			||||
liveLocationCorrected: [8051, true, 0.] | 
				
			||||
orbObservation: [8052, true, 0.] | 
				
			||||
applanixLocation: [8053, true, 0.] | 
				
			||||
liveLocationKalman: [8054, true, 0.] | 
				
			||||
uiNavigationEvent: [8055, true, 0.] | 
				
			||||
orbOdometry: [8057, true, 0.] | 
				
			||||
orbFeatures: [8058, false, 0.] | 
				
			||||
orbKeyFrame: [8059, true, 0.] | 
				
			||||
uiLayoutState: [8060, true, 0.] | 
				
			||||
frontEncodeIdx: [8061, true, 5.] | 
				
			||||
orbFeaturesSummary: [8062, true, 0.] | 
				
			||||
driverMonitoring: [8063, true, 5., 1] | 
				
			||||
liveParameters: [8064, true, 10.] | 
				
			||||
liveMapData: [8065, true, 0.] | 
				
			||||
cameraOdometry: [8066, true, 5.] | 
				
			||||
pathPlan: [8067, true, 20.] | 
				
			||||
kalmanOdometry: [8068, true, 0.] | 
				
			||||
thumbnail: [8069, true, 0.2, 1] | 
				
			||||
carEvents: [8070, true, 1., 1] | 
				
			||||
carParams: [8071, true, 0.02, 1] | 
				
			||||
frontFrame: [8072, true, 10.] | 
				
			||||
 | 
				
			||||
testModel: [8040, false, 0.] | 
				
			||||
testLiveLocation: [8045, false, 0.] | 
				
			||||
testJoystick: [8056, false, 0.] | 
				
			||||
 | 
				
			||||
# 8080 is reserved for slave testing daemon | 
				
			||||
# 8762 is reserved for logserver | 
				
			||||
 | 
				
			||||
# manager -- base process to manage starting and stopping of all others | 
				
			||||
#   subscribes: thermal | 
				
			||||
 | 
				
			||||
# **** processes that communicate with the outside world **** | 
				
			||||
 | 
				
			||||
# thermald -- decides when to start and stop onroad | 
				
			||||
#   subscribes: health, location | 
				
			||||
#   publishes: thermal | 
				
			||||
 | 
				
			||||
# boardd -- communicates with the car | 
				
			||||
#   subscribes: sendcan | 
				
			||||
#   publishes: can, health, ubloxRaw | 
				
			||||
 | 
				
			||||
# sensord -- publishes IMU and Magnetometer | 
				
			||||
#   publishes: sensorEvents | 
				
			||||
 | 
				
			||||
# gpsd -- publishes EON's gps | 
				
			||||
#   publishes: gpsNMEA | 
				
			||||
 | 
				
			||||
# visiond -- talks to the cameras, runs the model, saves the videos | 
				
			||||
#   publishes:  frame, model, driverMonitoring, cameraOdometry, thumbnail | 
				
			||||
 | 
				
			||||
# **** stateful data transformers **** | 
				
			||||
 | 
				
			||||
# plannerd -- decides where to drive the car | 
				
			||||
#   subscribes: carState, model, radarState, controlsState, liveParameters | 
				
			||||
#   publishes:  plan, pathPlan, liveMpc, liveLongitudinalMpc | 
				
			||||
 | 
				
			||||
# controlsd -- drives the car by sending CAN messages to panda | 
				
			||||
#   subscribes: can, thermal, health, plan, pathPlan, driverMonitoring, liveCalibration | 
				
			||||
#   publishes:  carState, carControl, sendcan, controlsState, carEvents, carParams | 
				
			||||
 | 
				
			||||
# radard -- processes the radar and vision data | 
				
			||||
#   subscribes: can, controlsState, model, liveParameters | 
				
			||||
#   publishes:  radarState, liveTracks | 
				
			||||
 | 
				
			||||
# params_learner -- learns vehicle params by observing the vehicle dynamics | 
				
			||||
#   subscribes: controlsState, sensorEvents, cameraOdometry | 
				
			||||
#   publishes: liveParameters | 
				
			||||
 | 
				
			||||
# calibrationd -- reads posenet and applies a temporal filter on the frame region to look at | 
				
			||||
#   subscribes: cameraOdometry | 
				
			||||
#   publishes: liveCalibration | 
				
			||||
 | 
				
			||||
# ubloxd -- read raw ublox data and converts them in readable format | 
				
			||||
#   subscribes: ubloxRaw | 
				
			||||
#   publishes: ubloxGnss | 
				
			||||
 | 
				
			||||
# **** LOGGING SERVICE **** | 
				
			||||
 | 
				
			||||
# loggerd | 
				
			||||
#   subscribes: EVERYTHING | 
				
			||||
 | 
				
			||||
# **** NON VITAL SERVICES **** | 
				
			||||
 | 
				
			||||
# ui | 
				
			||||
#   subscribes: thermal, model, controlsState, uiLayout, liveCalibration, radarState, liveMpc, plusFrame, liveMapData | 
				
			||||
 | 
				
			||||
# uploader | 
				
			||||
#   communicates through file system with loggerd | 
				
			||||
 | 
				
			||||
# deleter | 
				
			||||
#   communicates through file system with loggerd and uploader | 
				
			||||
 | 
				
			||||
# logmessaged -- central logging service, can log to cloud | 
				
			||||
#   publishes:  logMessage | 
				
			||||
 | 
				
			||||
# logcatd -- fetches logcat info from android | 
				
			||||
#   publishes:  androidLog | 
				
			||||
 | 
				
			||||
# proclogd -- fetches process information | 
				
			||||
#   publishes: procLog | 
				
			||||
 | 
				
			||||
# tombstoned -- reports native crashes | 
				
			||||
 | 
				
			||||
# athenad -- on request, open a sub socket and return the value | 
				
			||||
 | 
				
			||||
# updated -- waits for network access and tries to update every hour | 
				
			||||
@ -0,0 +1,33 @@ | 
				
			||||
#!/usr/bin/env python3 | 
				
			||||
import os | 
				
			||||
import yaml | 
				
			||||
 | 
				
			||||
class Service(): | 
				
			||||
  def __init__(self, port, should_log, frequency, decimation=None): | 
				
			||||
    self.port = port | 
				
			||||
    self.should_log = should_log | 
				
			||||
    self.frequency = frequency | 
				
			||||
    self.decimation = decimation | 
				
			||||
 | 
				
			||||
service_list_path = os.path.join(os.path.dirname(__file__), "service_list.yaml") | 
				
			||||
 | 
				
			||||
service_list = {} | 
				
			||||
with open(service_list_path, "r") as f: | 
				
			||||
  for k, v in yaml.safe_load(f).items(): | 
				
			||||
    decimation = None | 
				
			||||
    if len(v) == 4: | 
				
			||||
      decimation = v[3] | 
				
			||||
 | 
				
			||||
    service_list[k] = Service(v[0], v[1], v[2], decimation) | 
				
			||||
 | 
				
			||||
if __name__ == "__main__": | 
				
			||||
  print("/* THIS IS AN AUTOGENERATED FILE, PLEASE EDIT service_list.yaml */") | 
				
			||||
  print("#ifndef __SERVICES_H") | 
				
			||||
  print("#define __SERVICES_H") | 
				
			||||
  print("struct service { int port; bool should_log; int frequency; int decimation; char name[0x100]; };") | 
				
			||||
  print("static struct service services[] = {") | 
				
			||||
  for k, v in service_list.items(): | 
				
			||||
    print('  { .name = "%s", .port = %d, .should_log = %s, .frequency = %d, .decimation = %d },' % (k, v.port, "true" if v.should_log else "false", v.frequency, -1 if v.decimation is None else v.decimation)) | 
				
			||||
  print("};") | 
				
			||||
  print("#endif") | 
				
			||||
 | 
				
			||||
					Loading…
					
					
				
		Reference in new issue