Msgq cleanups (#32643)
* Dead cereal stuff
* Dead code
* also dead
* More cleanup
old-commit-hash: 83209e0860
097
parent
f870a968e9
commit
9fcd3fe73b
10 changed files with 1 additions and 646 deletions
@ -1,47 +0,0 @@ |
|||||||
repos: |
|
||||||
- repo: https://github.com/pre-commit/pre-commit-hooks |
|
||||||
rev: v4.6.0 |
|
||||||
hooks: |
|
||||||
- id: check-ast |
|
||||||
- id: check-yaml |
|
||||||
- id: check-executables-have-shebangs |
|
||||||
- id: check-shebang-scripts-are-executable |
|
||||||
- repo: https://github.com/pre-commit/mirrors-mypy |
|
||||||
rev: v1.9.0 |
|
||||||
hooks: |
|
||||||
- id: mypy |
|
||||||
- repo: https://github.com/astral-sh/ruff-pre-commit |
|
||||||
rev: v0.3.5 |
|
||||||
hooks: |
|
||||||
- id: ruff |
|
||||||
- repo: local |
|
||||||
hooks: |
|
||||||
- id: cppcheck |
|
||||||
name: cppcheck |
|
||||||
entry: cppcheck |
|
||||||
language: system |
|
||||||
types: [c++] |
|
||||||
exclude: '^(messaging/msgq_tests.cc|messaging/test_runner.cc)' |
|
||||||
args: |
|
||||||
- --error-exitcode=1 |
|
||||||
- --inline-suppr |
|
||||||
- --language=c++ |
|
||||||
- --force |
|
||||||
- --quiet |
|
||||||
- -j4 |
|
||||||
- repo: https://github.com/cpplint/cpplint |
|
||||||
rev: 1.6.1 |
|
||||||
hooks: |
|
||||||
- id: cpplint |
|
||||||
args: |
|
||||||
- --quiet |
|
||||||
- --counting=detailed |
|
||||||
- --linelength=240 |
|
||||||
- --filter=-build,-legal,-readability,-runtime,-whitespace,+build/include_subdir,+build/forward_decl,+build/include_what_you_use,+build/deprecated,+whitespace/comma,+whitespace/line_length,+whitespace/empty_if_body,+whitespace/empty_loop_body,+whitespace/empty_conditional_body,+whitespace/forcolon,+whitespace/parens,+whitespace/semicolon,+whitespace/tab,+readability/braces |
|
||||||
- repo: https://github.com/codespell-project/codespell |
|
||||||
rev: v2.2.6 |
|
||||||
hooks: |
|
||||||
- id: codespell |
|
||||||
args: |
|
||||||
- -L ned |
|
||||||
- --builtins clear,rare,informal,usage,code,names,en-GB_to_en-US |
|
@ -1,54 +0,0 @@ |
|||||||
FROM ubuntu:24.04 |
|
||||||
|
|
||||||
ENV DEBIAN_FRONTEND=noninteractive |
|
||||||
RUN apt-get update && apt-get install -y --no-install-recommends \ |
|
||||||
autoconf \ |
|
||||||
build-essential \ |
|
||||||
ca-certificates \ |
|
||||||
capnproto \ |
|
||||||
clang \ |
|
||||||
cppcheck \ |
|
||||||
curl \ |
|
||||||
git \ |
|
||||||
libbz2-dev \ |
|
||||||
libcapnp-dev \ |
|
||||||
libclang-rt-dev \ |
|
||||||
libffi-dev \ |
|
||||||
liblzma-dev \ |
|
||||||
libncurses5-dev \ |
|
||||||
libncursesw5-dev \ |
|
||||||
libreadline-dev \ |
|
||||||
libsqlite3-dev \ |
|
||||||
libssl-dev \ |
|
||||||
libtool \ |
|
||||||
libzmq3-dev \ |
|
||||||
llvm \ |
|
||||||
make \ |
|
||||||
cmake \ |
|
||||||
ocl-icd-opencl-dev \ |
|
||||||
opencl-headers \ |
|
||||||
python3-dev \ |
|
||||||
python3-pip \ |
|
||||||
tk-dev \ |
|
||||||
wget \ |
|
||||||
xz-utils \ |
|
||||||
zlib1g-dev \ |
|
||||||
&& rm -rf /var/lib/apt/lists/* |
|
||||||
|
|
||||||
RUN pip3 install --break-system-packages --no-cache-dir pyyaml Cython scons pycapnp pre-commit ruff parameterized coverage numpy |
|
||||||
|
|
||||||
WORKDIR /project/ |
|
||||||
RUN cd /tmp/ && \ |
|
||||||
git clone -b v2.x --depth 1 https://github.com/catchorg/Catch2.git && \ |
|
||||||
cd Catch2 && \ |
|
||||||
mv single_include/catch2/ /project/ && \ |
|
||||||
cd .. \ |
|
||||||
rm -rf Catch2 |
|
||||||
|
|
||||||
WORKDIR /project/cereal |
|
||||||
|
|
||||||
ENV PYTHONPATH=/project |
|
||||||
|
|
||||||
COPY . . |
|
||||||
RUN rm -rf .git && \ |
|
||||||
scons -c && scons -j$(nproc) |
|
@ -1,7 +0,0 @@ |
|||||||
Copyright (c) 2020, Comma.ai, Inc. |
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: |
|
||||||
|
|
||||||
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. |
|
||||||
|
|
||||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|
@ -1,8 +0,0 @@ |
|||||||
comment: false |
|
||||||
coverage: |
|
||||||
status: |
|
||||||
project: |
|
||||||
default: |
|
||||||
informational: true |
|
||||||
patch: off |
|
||||||
|
|
@ -1,50 +0,0 @@ |
|||||||
#include <iostream> |
|
||||||
#include <cstddef> |
|
||||||
#include <chrono> |
|
||||||
#include <thread> |
|
||||||
#include <cassert> |
|
||||||
|
|
||||||
#include "cereal/messaging/messaging.h" |
|
||||||
#include "cereal/messaging/impl_zmq.h" |
|
||||||
|
|
||||||
#define MSGS 1e5 |
|
||||||
|
|
||||||
int main() { |
|
||||||
Context * c = Context::create(); |
|
||||||
SubSocket * sub_sock = SubSocket::create(c, "controlsState"); |
|
||||||
PubSocket * pub_sock = PubSocket::create(c, "controlsState"); |
|
||||||
|
|
||||||
char data[8]; |
|
||||||
|
|
||||||
Poller * poller = Poller::create({sub_sock}); |
|
||||||
|
|
||||||
auto start = std::chrono::steady_clock::now(); |
|
||||||
|
|
||||||
for (uint64_t i = 0; i < MSGS; i++){ |
|
||||||
*(uint64_t*)data = i; |
|
||||||
pub_sock->send(data, 8); |
|
||||||
|
|
||||||
auto r = poller->poll(100); |
|
||||||
|
|
||||||
for (auto p : r){ |
|
||||||
Message * m = p->receive(); |
|
||||||
uint64_t ii = *(uint64_t*)m->getData(); |
|
||||||
assert(i == ii); |
|
||||||
delete m; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
auto end = std::chrono::steady_clock::now(); |
|
||||||
double elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count() / 1e9; |
|
||||||
double throughput = ((double) MSGS / (double) elapsed); |
|
||||||
std::cout << throughput << " msg/s" << std::endl; |
|
||||||
|
|
||||||
delete poller; |
|
||||||
delete sub_sock; |
|
||||||
delete pub_sock; |
|
||||||
delete c; |
|
||||||
|
|
||||||
|
|
||||||
return 0; |
|
||||||
} |
|
@ -1,29 +0,0 @@ |
|||||||
import time |
|
||||||
|
|
||||||
from messaging_pyx import Context, Poller, SubSocket, PubSocket |
|
||||||
|
|
||||||
MSGS = 1e5 |
|
||||||
|
|
||||||
if __name__ == "__main__": |
|
||||||
c = Context() |
|
||||||
sub_sock = SubSocket() |
|
||||||
pub_sock = PubSocket() |
|
||||||
|
|
||||||
sub_sock.connect(c, "controlsState") |
|
||||||
pub_sock.connect(c, "controlsState") |
|
||||||
|
|
||||||
poller = Poller() |
|
||||||
poller.registerSocket(sub_sock) |
|
||||||
|
|
||||||
t = time.time() |
|
||||||
for i in range(int(MSGS)): |
|
||||||
bts = i.to_bytes(4, 'little') |
|
||||||
pub_sock.send(bts) |
|
||||||
|
|
||||||
for s in poller.poll(100): |
|
||||||
dat = s.receive() |
|
||||||
ii = int.from_bytes(dat, 'little') |
|
||||||
assert(i == ii) |
|
||||||
|
|
||||||
dt = time.time() - t |
|
||||||
print("%.1f msg/s" % (MSGS / dt)) |
|
@ -1,54 +0,0 @@ |
|||||||
# MSGQ: A lock free single producer multi consumer message queue |
|
||||||
|
|
||||||
## What is MSGQ? |
|
||||||
MSGQ is a system to pass messages from a single producer to multiple consumers. All the consumers need to be able to receive all the messages. It is designed to be a high performance replacement for ZMQ-like SUB/PUB patterns. It uses a ring buffer in shared memory to efficiently read and write data. Each read requires a copy. Writing can be done without a copy, as long as the size of the data is known in advance. |
|
||||||
|
|
||||||
## Storage |
|
||||||
The storage for the queue consists of an area of metadata, and the actual buffer. The metadata contains: |
|
||||||
|
|
||||||
1. A counter to the number of readers that are active |
|
||||||
2. A pointer to the head of the queue for writing. From now on referred to as *write pointer* |
|
||||||
3. A cycle counter for the writer. This counter is incremented when the writer wraps around |
|
||||||
4. N pointers, pointing to the current read position for all the readers. From now on referred to as *read pointer* |
|
||||||
5. N counters, counting the number of cycles for all the readers |
|
||||||
6. N booleans, indicating validity for all the readers. From now on referred to as *validity flag* |
|
||||||
|
|
||||||
The counter and the pointer are both 32 bit values, packed into 64 bit so they can be read and written atomically. |
|
||||||
|
|
||||||
The data buffer is a ring buffer. All messages are prefixed by an 8 byte size field, followed by the data. A size of -1 indicates a wrap-around, and means the next message is stored at the beginning of the buffer. |
|
||||||
|
|
||||||
|
|
||||||
## Writing |
|
||||||
Writing involves the following steps: |
|
||||||
|
|
||||||
1. Check if the area that is to be written overlaps with any of the read pointers, mark those readers as invalid by clearing the validity flag. |
|
||||||
2. Write the message |
|
||||||
3. Increase the write pointer by the size of the message |
|
||||||
|
|
||||||
In case there is not enough space at the end of the buffer, a special empty message with a prefix of -1 is written. The cycle counter is incremented by one. In this case step 1 will check there are no read pointers pointing to the remainder of the buffer. Then another write cycle will start with the actual message. |
|
||||||
|
|
||||||
There always needs to be 8 bytes of empty space at the end of the buffer. By doing this there is always space to write the -1. |
|
||||||
|
|
||||||
## Reset reader |
|
||||||
When the reader is lagging too much behind the read pointer becomes invalid and no longer points to the beginning of a valid message. To reset a reader to the current write pointer, the following steps are performed: |
|
||||||
|
|
||||||
1. Set valid flag |
|
||||||
2. Set read cycle counter to that of the writer |
|
||||||
3. Set read pointer to write pointer |
|
||||||
|
|
||||||
## Reading |
|
||||||
Reading involves the following steps: |
|
||||||
|
|
||||||
1. Read the size field at the current read pointer |
|
||||||
2. Read the validity flag |
|
||||||
3. Copy the data out of the buffer |
|
||||||
4. Increase the read pointer by the size of the message |
|
||||||
5. Check the validity flag again |
|
||||||
|
|
||||||
Before starting the copy, the valid flag is checked. This is to prevent a race condition where the size prefix was invalid, and the read could read outside of the buffer. Make sure that step 1 and 2 are not reordered by your compiler or CPU. |
|
||||||
|
|
||||||
If a writer overwrites the data while it's being copied out, the data will be invalid. Therefore the validity flag is also checked after reading it. The order of step 4 and 5 does not matter. |
|
||||||
|
|
||||||
If at steps 2 or 5 the validity flag is not set, the reader is reset. Any data that was already read is discarded. After the reader is reset, the reading starts from the beginning. |
|
||||||
|
|
||||||
If a message with size -1 is encountered, step 3 and 4 are replaced by increasing the cycle counter and setting the read pointer to the beginning of the buffer. After that another read is performed. |
|
@ -1,394 +0,0 @@ |
|||||||
#include "catch2/catch.hpp" |
|
||||||
#include "cereal/messaging/msgq.h" |
|
||||||
|
|
||||||
TEST_CASE("ALIGN"){ |
|
||||||
REQUIRE(ALIGN(0) == 0); |
|
||||||
REQUIRE(ALIGN(1) == 8); |
|
||||||
REQUIRE(ALIGN(7) == 8); |
|
||||||
REQUIRE(ALIGN(8) == 8); |
|
||||||
REQUIRE(ALIGN(99999) == 100000); |
|
||||||
} |
|
||||||
|
|
||||||
TEST_CASE("msgq_msg_init_size"){ |
|
||||||
const size_t msg_size = 30; |
|
||||||
msgq_msg_t msg; |
|
||||||
|
|
||||||
msgq_msg_init_size(&msg, msg_size); |
|
||||||
REQUIRE(msg.size == msg_size); |
|
||||||
|
|
||||||
msgq_msg_close(&msg); |
|
||||||
} |
|
||||||
|
|
||||||
TEST_CASE("msgq_msg_init_data"){ |
|
||||||
const size_t msg_size = 30; |
|
||||||
char * data = new char[msg_size]; |
|
||||||
|
|
||||||
for (size_t i = 0; i < msg_size; i++){ |
|
||||||
data[i] = i; |
|
||||||
} |
|
||||||
|
|
||||||
msgq_msg_t msg; |
|
||||||
msgq_msg_init_data(&msg, data, msg_size); |
|
||||||
|
|
||||||
REQUIRE(msg.size == msg_size); |
|
||||||
REQUIRE(memcmp(msg.data, data, msg_size) == 0); |
|
||||||
|
|
||||||
delete[] data; |
|
||||||
msgq_msg_close(&msg); |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
TEST_CASE("msgq_init_subscriber"){ |
|
||||||
remove("/dev/shm/test_queue"); |
|
||||||
msgq_queue_t q; |
|
||||||
msgq_new_queue(&q, "test_queue", 1024); |
|
||||||
REQUIRE(*q.num_readers == 0); |
|
||||||
|
|
||||||
q.reader_id = 1; |
|
||||||
*q.read_valids[0] = false; |
|
||||||
*q.read_pointers[0] = ((uint64_t)1 << 32); |
|
||||||
|
|
||||||
*q.write_pointer = 255; |
|
||||||
|
|
||||||
msgq_init_subscriber(&q); |
|
||||||
REQUIRE(q.read_conflate == false); |
|
||||||
REQUIRE(*q.read_valids[0] == true); |
|
||||||
REQUIRE((*q.read_pointers[0] >> 32) == 0); |
|
||||||
REQUIRE((*q.read_pointers[0] & 0xFFFFFFFF) == 255); |
|
||||||
} |
|
||||||
|
|
||||||
TEST_CASE("msgq_msg_send first message"){ |
|
||||||
remove("/dev/shm/test_queue"); |
|
||||||
msgq_queue_t q; |
|
||||||
msgq_new_queue(&q, "test_queue", 1024); |
|
||||||
msgq_init_publisher(&q); |
|
||||||
|
|
||||||
REQUIRE(*q.write_pointer == 0); |
|
||||||
|
|
||||||
size_t msg_size = 128; |
|
||||||
|
|
||||||
SECTION("Aligned message size"){ |
|
||||||
} |
|
||||||
SECTION("Unaligned message size"){ |
|
||||||
msg_size--; |
|
||||||
} |
|
||||||
|
|
||||||
char * data = new char[msg_size]; |
|
||||||
|
|
||||||
for (size_t i = 0; i < msg_size; i++){ |
|
||||||
data[i] = i; |
|
||||||
} |
|
||||||
|
|
||||||
msgq_msg_t msg; |
|
||||||
msgq_msg_init_data(&msg, data, msg_size); |
|
||||||
|
|
||||||
|
|
||||||
msgq_msg_send(&msg, &q); |
|
||||||
REQUIRE(*(int64_t*)q.data == msg_size); // Check size tag
|
|
||||||
REQUIRE(*q.write_pointer == 128 + sizeof(int64_t)); |
|
||||||
REQUIRE(memcmp(q.data + sizeof(int64_t), data, msg_size) == 0); |
|
||||||
|
|
||||||
delete[] data; |
|
||||||
msgq_msg_close(&msg); |
|
||||||
} |
|
||||||
|
|
||||||
TEST_CASE("msgq_msg_send test wraparound"){ |
|
||||||
remove("/dev/shm/test_queue"); |
|
||||||
msgq_queue_t q; |
|
||||||
msgq_new_queue(&q, "test_queue", 1024); |
|
||||||
msgq_init_publisher(&q); |
|
||||||
|
|
||||||
REQUIRE((*q.write_pointer & 0xFFFFFFFF) == 0); |
|
||||||
REQUIRE((*q.write_pointer >> 32) == 0); |
|
||||||
|
|
||||||
const size_t msg_size = 120; |
|
||||||
msgq_msg_t msg; |
|
||||||
msgq_msg_init_size(&msg, msg_size); |
|
||||||
|
|
||||||
for (int i = 0; i < 8; i++) { |
|
||||||
msgq_msg_send(&msg, &q); |
|
||||||
} |
|
||||||
// Check 8th message was written at the beginning
|
|
||||||
REQUIRE((*q.write_pointer & 0xFFFFFFFF) == msg_size + sizeof(int64_t)); |
|
||||||
|
|
||||||
// Check cycle count
|
|
||||||
REQUIRE((*q.write_pointer >> 32) == 1); |
|
||||||
|
|
||||||
// Check wraparound tag
|
|
||||||
char * tag_location = q.data; |
|
||||||
tag_location += 7 * (msg_size + sizeof(int64_t)); |
|
||||||
REQUIRE(*(int64_t*)tag_location == -1); |
|
||||||
|
|
||||||
msgq_msg_close(&msg); |
|
||||||
} |
|
||||||
|
|
||||||
TEST_CASE("msgq_msg_recv test wraparound"){ |
|
||||||
remove("/dev/shm/test_queue"); |
|
||||||
msgq_queue_t q_pub, q_sub; |
|
||||||
msgq_new_queue(&q_pub, "test_queue", 1024); |
|
||||||
msgq_new_queue(&q_sub, "test_queue", 1024); |
|
||||||
|
|
||||||
msgq_init_publisher(&q_pub); |
|
||||||
msgq_init_subscriber(&q_sub); |
|
||||||
|
|
||||||
REQUIRE((*q_pub.write_pointer >> 32) == 0); |
|
||||||
REQUIRE((*q_sub.read_pointers[0] >> 32) == 0); |
|
||||||
|
|
||||||
const size_t msg_size = 120; |
|
||||||
msgq_msg_t msg1; |
|
||||||
msgq_msg_init_size(&msg1, msg_size); |
|
||||||
|
|
||||||
|
|
||||||
SECTION("Check cycle counter after reset") { |
|
||||||
for (int i = 0; i < 8; i++) { |
|
||||||
msgq_msg_send(&msg1, &q_pub); |
|
||||||
} |
|
||||||
|
|
||||||
msgq_msg_t msg2; |
|
||||||
msgq_msg_recv(&msg2, &q_sub); |
|
||||||
REQUIRE(msg2.size == 0); // Reader had to reset
|
|
||||||
msgq_msg_close(&msg2); |
|
||||||
} |
|
||||||
SECTION("Check cycle counter while keeping up with writer") { |
|
||||||
for (int i = 0; i < 8; i++) { |
|
||||||
msgq_msg_send(&msg1, &q_pub); |
|
||||||
|
|
||||||
msgq_msg_t msg2; |
|
||||||
msgq_msg_recv(&msg2, &q_sub); |
|
||||||
REQUIRE(msg2.size > 0); |
|
||||||
msgq_msg_close(&msg2); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
REQUIRE((*q_sub.read_pointers[0] >> 32) == 1); |
|
||||||
msgq_msg_close(&msg1); |
|
||||||
} |
|
||||||
|
|
||||||
TEST_CASE("msgq_msg_send test invalidation"){ |
|
||||||
remove("/dev/shm/test_queue"); |
|
||||||
msgq_queue_t q_pub, q_sub; |
|
||||||
msgq_new_queue(&q_pub, "test_queue", 1024); |
|
||||||
msgq_new_queue(&q_sub, "test_queue", 1024); |
|
||||||
|
|
||||||
msgq_init_publisher(&q_pub); |
|
||||||
msgq_init_subscriber(&q_sub); |
|
||||||
*q_sub.write_pointer = (uint64_t)1 << 32; |
|
||||||
|
|
||||||
REQUIRE(*q_sub.read_valids[0] == true); |
|
||||||
|
|
||||||
SECTION("read pointer in tag"){ |
|
||||||
*q_sub.read_pointers[0] = 0; |
|
||||||
} |
|
||||||
SECTION("read pointer in data section"){ |
|
||||||
*q_sub.read_pointers[0] = 64; |
|
||||||
} |
|
||||||
SECTION("read pointer in wraparound section"){ |
|
||||||
*q_pub.write_pointer = ((uint64_t)1 << 32) | 1000; // Writer is one cycle ahead
|
|
||||||
*q_sub.read_pointers[0] = 1020; |
|
||||||
} |
|
||||||
|
|
||||||
msgq_msg_t msg; |
|
||||||
msgq_msg_init_size(&msg, 128); |
|
||||||
msgq_msg_send(&msg, &q_pub); |
|
||||||
|
|
||||||
REQUIRE(*q_sub.read_valids[0] == false); |
|
||||||
|
|
||||||
msgq_msg_close(&msg); |
|
||||||
} |
|
||||||
|
|
||||||
TEST_CASE("msgq_init_subscriber init 2 subscribers"){ |
|
||||||
remove("/dev/shm/test_queue"); |
|
||||||
msgq_queue_t q1, q2; |
|
||||||
msgq_new_queue(&q1, "test_queue", 1024); |
|
||||||
msgq_new_queue(&q2, "test_queue", 1024); |
|
||||||
|
|
||||||
*q1.num_readers = 0; |
|
||||||
|
|
||||||
REQUIRE(*q1.num_readers == 0); |
|
||||||
REQUIRE(*q2.num_readers == 0); |
|
||||||
|
|
||||||
msgq_init_subscriber(&q1); |
|
||||||
REQUIRE(*q1.num_readers == 1); |
|
||||||
REQUIRE(*q2.num_readers == 1); |
|
||||||
REQUIRE(q1.reader_id == 0); |
|
||||||
|
|
||||||
msgq_init_subscriber(&q2); |
|
||||||
REQUIRE(*q1.num_readers == 2); |
|
||||||
REQUIRE(*q2.num_readers == 2); |
|
||||||
REQUIRE(q2.reader_id == 1); |
|
||||||
} |
|
||||||
|
|
||||||
|
|
||||||
TEST_CASE("Write 1 msg, read 1 msg", "[integration]"){ |
|
||||||
remove("/dev/shm/test_queue"); |
|
||||||
const size_t msg_size = 128; |
|
||||||
msgq_queue_t writer, reader; |
|
||||||
|
|
||||||
msgq_new_queue(&writer, "test_queue", 1024); |
|
||||||
msgq_new_queue(&reader, "test_queue", 1024); |
|
||||||
|
|
||||||
msgq_init_publisher(&writer); |
|
||||||
msgq_init_subscriber(&reader); |
|
||||||
|
|
||||||
// Build 128 byte message
|
|
||||||
msgq_msg_t outgoing_msg; |
|
||||||
msgq_msg_init_size(&outgoing_msg, msg_size); |
|
||||||
|
|
||||||
for (size_t i = 0; i < msg_size; i++){ |
|
||||||
outgoing_msg.data[i] = i; |
|
||||||
} |
|
||||||
|
|
||||||
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); |
|
||||||
|
|
||||||
msgq_msg_t incoming_msg1; |
|
||||||
REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size); |
|
||||||
REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0); |
|
||||||
|
|
||||||
// Verify that there are no more messages
|
|
||||||
msgq_msg_t incoming_msg2; |
|
||||||
REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == 0); |
|
||||||
|
|
||||||
msgq_msg_close(&outgoing_msg); |
|
||||||
msgq_msg_close(&incoming_msg1); |
|
||||||
msgq_msg_close(&incoming_msg2); |
|
||||||
} |
|
||||||
|
|
||||||
TEST_CASE("Write 2 msg, read 2 msg - conflate = false", "[integration]"){ |
|
||||||
remove("/dev/shm/test_queue"); |
|
||||||
const size_t msg_size = 128; |
|
||||||
msgq_queue_t writer, reader; |
|
||||||
|
|
||||||
msgq_new_queue(&writer, "test_queue", 1024); |
|
||||||
msgq_new_queue(&reader, "test_queue", 1024); |
|
||||||
|
|
||||||
msgq_init_publisher(&writer); |
|
||||||
msgq_init_subscriber(&reader); |
|
||||||
|
|
||||||
// Build 128 byte message
|
|
||||||
msgq_msg_t outgoing_msg; |
|
||||||
msgq_msg_init_size(&outgoing_msg, msg_size); |
|
||||||
|
|
||||||
for (size_t i = 0; i < msg_size; i++){ |
|
||||||
outgoing_msg.data[i] = i; |
|
||||||
} |
|
||||||
|
|
||||||
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); |
|
||||||
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); |
|
||||||
|
|
||||||
msgq_msg_t incoming_msg1; |
|
||||||
REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size); |
|
||||||
REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0); |
|
||||||
|
|
||||||
msgq_msg_t incoming_msg2; |
|
||||||
REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == msg_size); |
|
||||||
REQUIRE(memcmp(incoming_msg2.data, outgoing_msg.data, msg_size) == 0); |
|
||||||
|
|
||||||
msgq_msg_close(&outgoing_msg); |
|
||||||
msgq_msg_close(&incoming_msg1); |
|
||||||
msgq_msg_close(&incoming_msg2); |
|
||||||
} |
|
||||||
|
|
||||||
TEST_CASE("Write 2 msg, read 2 msg - conflate = true", "[integration]"){ |
|
||||||
remove("/dev/shm/test_queue"); |
|
||||||
const size_t msg_size = 128; |
|
||||||
msgq_queue_t writer, reader; |
|
||||||
|
|
||||||
msgq_new_queue(&writer, "test_queue", 1024); |
|
||||||
msgq_new_queue(&reader, "test_queue", 1024); |
|
||||||
|
|
||||||
msgq_init_publisher(&writer); |
|
||||||
msgq_init_subscriber(&reader); |
|
||||||
reader.read_conflate = true; |
|
||||||
|
|
||||||
// Build 128 byte message
|
|
||||||
msgq_msg_t outgoing_msg; |
|
||||||
msgq_msg_init_size(&outgoing_msg, msg_size); |
|
||||||
|
|
||||||
for (size_t i = 0; i < msg_size; i++){ |
|
||||||
outgoing_msg.data[i] = i; |
|
||||||
} |
|
||||||
|
|
||||||
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); |
|
||||||
REQUIRE(msgq_msg_send(&outgoing_msg, &writer) == msg_size); |
|
||||||
|
|
||||||
msgq_msg_t incoming_msg1; |
|
||||||
REQUIRE(msgq_msg_recv(&incoming_msg1, &reader) == msg_size); |
|
||||||
REQUIRE(memcmp(incoming_msg1.data, outgoing_msg.data, msg_size) == 0); |
|
||||||
|
|
||||||
// Verify that there are no more messages
|
|
||||||
msgq_msg_t incoming_msg2; |
|
||||||
REQUIRE(msgq_msg_recv(&incoming_msg2, &reader) == 0); |
|
||||||
|
|
||||||
msgq_msg_close(&outgoing_msg); |
|
||||||
msgq_msg_close(&incoming_msg1); |
|
||||||
msgq_msg_close(&incoming_msg2); |
|
||||||
} |
|
||||||
|
|
||||||
TEST_CASE("1 publisher, 1 slow subscriber", "[integration]"){ |
|
||||||
remove("/dev/shm/test_queue"); |
|
||||||
msgq_queue_t writer, reader; |
|
||||||
|
|
||||||
msgq_new_queue(&writer, "test_queue", 1024); |
|
||||||
msgq_new_queue(&reader, "test_queue", 1024); |
|
||||||
|
|
||||||
msgq_init_publisher(&writer); |
|
||||||
msgq_init_subscriber(&reader); |
|
||||||
|
|
||||||
int n_received = 0; |
|
||||||
int n_skipped = 0; |
|
||||||
|
|
||||||
for (uint64_t i = 0; i < 1e5; i++) { |
|
||||||
msgq_msg_t outgoing_msg; |
|
||||||
msgq_msg_init_data(&outgoing_msg, (char*)&i, sizeof(uint64_t)); |
|
||||||
msgq_msg_send(&outgoing_msg, &writer); |
|
||||||
msgq_msg_close(&outgoing_msg); |
|
||||||
|
|
||||||
if (i % 10 == 0){ |
|
||||||
msgq_msg_t msg1; |
|
||||||
msgq_msg_recv(&msg1, &reader); |
|
||||||
|
|
||||||
if (msg1.size == 0){ |
|
||||||
n_skipped++; |
|
||||||
} else { |
|
||||||
n_received++; |
|
||||||
} |
|
||||||
msgq_msg_close(&msg1); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
// TODO: verify these numbers by hand
|
|
||||||
REQUIRE(n_received == 8572); |
|
||||||
REQUIRE(n_skipped == 1428); |
|
||||||
} |
|
||||||
|
|
||||||
TEST_CASE("1 publisher, 2 subscribers", "[integration]"){ |
|
||||||
remove("/dev/shm/test_queue"); |
|
||||||
msgq_queue_t writer, reader1, reader2; |
|
||||||
|
|
||||||
msgq_new_queue(&writer, "test_queue", 1024); |
|
||||||
msgq_new_queue(&reader1, "test_queue", 1024); |
|
||||||
msgq_new_queue(&reader2, "test_queue", 1024); |
|
||||||
|
|
||||||
msgq_init_publisher(&writer); |
|
||||||
msgq_init_subscriber(&reader1); |
|
||||||
msgq_init_subscriber(&reader2); |
|
||||||
|
|
||||||
for (uint64_t i = 0; i < 1024 * 3; i++) { |
|
||||||
msgq_msg_t outgoing_msg; |
|
||||||
msgq_msg_init_data(&outgoing_msg, (char*)&i, sizeof(uint64_t)); |
|
||||||
msgq_msg_send(&outgoing_msg, &writer); |
|
||||||
|
|
||||||
msgq_msg_t msg1, msg2; |
|
||||||
msgq_msg_recv(&msg1, &reader1); |
|
||||||
msgq_msg_recv(&msg2, &reader2); |
|
||||||
|
|
||||||
REQUIRE(msg1.size == sizeof(uint64_t)); |
|
||||||
REQUIRE(msg2.size == sizeof(uint64_t)); |
|
||||||
REQUIRE(*(uint64_t*)msg1.data == i); |
|
||||||
REQUIRE(*(uint64_t*)msg2.data == i); |
|
||||||
|
|
||||||
msgq_msg_close(&outgoing_msg); |
|
||||||
msgq_msg_close(&msg1); |
|
||||||
msgq_msg_close(&msg2); |
|
||||||
} |
|
||||||
} |
|
@ -1,2 +0,0 @@ |
|||||||
#define CATCH_CONFIG_MAIN |
|
||||||
#include "catch2/catch.hpp" |
|
@ -1 +1 @@ |
|||||||
Subproject commit 6d2cc6e22229a9c855d8474e5643b26fbf2b5976 |
Subproject commit 615aea9b5519d2a3631fce4753bed29287fc4f9b |
Loading…
Reference in new issue