Merge cereal subtree

pull/950/head
Vehicle Researcher 5 years ago
commit 0440535f64
  1. 2
      cereal/.gitignore
  2. 42
      cereal/README.md
  3. 4
      cereal/SConscript
  4. 20
      cereal/car.capnp
  5. 11
      cereal/log.capnp
  6. 12
      cereal/messaging/__init__.py
  7. 2
      cereal/messaging/bridge.cc
  8. 17
      cereal/messaging/impl_msgq.cc
  9. 24
      cereal/messaging/messaging.cc
  10. 24
      cereal/messaging/msgq.cc
  11. 4
      cereal/service_list.yaml

2
cereal/.gitignore vendored

@ -10,5 +10,5 @@ libmessaging.*
libmessaging_shared.* libmessaging_shared.*
services.h services.h
.sconsign.dblite .sconsign.dblite
libcereal_shared.so libcereal_shared.*

@ -0,0 +1,42 @@
What is cereal?
----
cereal is both a messaging spec for robotics systems as well as generic high performance IPC pub sub messaging with a single publisher and multiple subscribers.
Imagine this use case:
* A sensor process reads gyro measurements directly from an IMU and publishes a sensorEvents packet
* A calibration process subscribes to the sensorEvents packet to use the IMU
* A localization process subscribes to the sensorEvents packet to use the IMU also
Messaging Spec
----
You'll find the message types in [log.capnp](log.capnp). It uses [Cap'n proto](https://capnproto.org/capnp-tool.html) and defines one struct called Event.
All Events have a logMonoTime and a valid. Then a big union defines the packet type.
Pub Sub Backends
----
cereal supports two backends, one based on [zmq](https://zeromq.org/), the other called msgq, a custom pub sub based on shared memory that doesn't require the bytes to pass through the kernel.
Example
---
```python
import cereal.messaging as messaging
# in subscriber
sm = messaging.SubMaster(['sensorEvents'])
while 1:
sm.update()
print(sm['sensorEvents'])
# in publisher
pm = messaging.PubMaster(['sensorEvents'])
dat = messaging.new_message()
dat.init('sensorEvents', 1)
dat.sensorEvents[0] = {"gyro": {"v": [0.1, -0.1, 0.1]}}
pm.send('sensorEvents', dat)
```

@ -29,7 +29,7 @@ cereal_objects = env.SharedObject([
]) ])
env.Library('cereal', cereal_objects) env.Library('cereal', cereal_objects)
env.SharedLibrary('cereal_shared', cereal_objects) env.SharedLibrary('cereal_shared', cereal_objects, LIBS=["capnp_c"])
cereal_dir = Dir('.') cereal_dir = Dir('.')
services_h = env.Command( services_h = env.Command(
@ -49,7 +49,7 @@ Depends('messaging/impl_zmq.cc', services_h)
# note, this rebuilds the deps shared, zmq is statically linked to make APK happy # note, this rebuilds the deps shared, zmq is statically linked to make APK happy
# TODO: get APK to load system zmq to remove the static link # TODO: get APK to load system zmq to remove the static link
shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else [] shared_lib_shared_lib = [zmq, 'm', 'stdc++'] + ["gnustl_shared"] if arch == "aarch64" else [zmq]
env.SharedLibrary('messaging_shared', messaging_objects, LIBS=shared_lib_shared_lib) env.SharedLibrary('messaging_shared', messaging_objects, LIBS=shared_lib_shared_lib)
env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging_lib, 'zmq']) env.Program('messaging/bridge', ['messaging/bridge.cc'], LIBS=[messaging_lib, 'zmq'])

@ -88,6 +88,8 @@ struct CarEvent @0x9b1657f34caf3ad3 {
lowMemory @63; lowMemory @63;
stockAeb @64; stockAeb @64;
ldw @65; ldw @65;
carUnrecognized @66;
radarCommIssue @67;
} }
} }
@ -410,11 +412,11 @@ struct CarParams {
enum SafetyModel { enum SafetyModel {
silent @0; silent @0;
honda @1; hondaNidec @1;
toyota @2; toyota @2;
elm327 @3; elm327 @3;
gm @4; gm @4;
hondaBosch @5; hondaBoschGiraffe @5;
ford @6; ford @6;
cadillac @7; cadillac @7;
hyundai @8; hyundai @8;
@ -428,7 +430,9 @@ struct CarParams {
toyotaIpas @16; toyotaIpas @16;
allOutput @17; allOutput @17;
gmAscm @18; gmAscm @18;
noOutput @19; # like silent but with silent CAN TXs noOutput @19; # like silent but without silent CAN TXs
hondaBoschHarness @20;
volkswagenPq @21;
} }
enum SteerControlType { enum SteerControlType {
@ -444,7 +448,9 @@ struct CarParams {
struct CarFw { struct CarFw {
ecu @0 :Ecu; ecu @0 :Ecu;
fwVersion @1 :Text; fwVersion @1 :Data;
address @2: UInt32;
subAddress @3: UInt8;
} }
enum Ecu { enum Ecu {
@ -452,5 +458,11 @@ struct CarParams {
esp @1; esp @1;
fwdRadar @2; fwdRadar @2;
fwdCamera @3; fwdCamera @3;
engine @4;
unknown @5;
# Toyota only
dsu @6;
apgs @7;
} }
} }

@ -310,6 +310,7 @@ struct HealthData {
hasGps @6 :Bool; hasGps @6 :Bool;
canSendErrs @7 :UInt32; canSendErrs @7 :UInt32;
canFwdErrs @8 :UInt32; canFwdErrs @8 :UInt32;
canRxErrs @19 :UInt32;
gmlanSendErrs @9 :UInt32; gmlanSendErrs @9 :UInt32;
hwType @10 :HwType; hwType @10 :HwType;
fanSpeedRpm @11 :UInt16; fanSpeedRpm @11 :UInt16;
@ -484,6 +485,7 @@ struct ControlsState @0x97ff69c53601abf1 {
decelForTurn @47 :Bool; decelForTurn @47 :Bool;
decelForModel @54 :Bool; decelForModel @54 :Bool;
canErrorCounter @57 :UInt32;
lateralControlState :union { lateralControlState :union {
indiState @52 :LateralINDIState; indiState @52 :LateralINDIState;
@ -575,6 +577,7 @@ struct ModelData {
leadFuture @7 :LeadData; leadFuture @7 :LeadData;
speed @8 :List(Float32); speed @8 :List(Float32);
meta @10 :MetaData; meta @10 :MetaData;
longitudinal @11 :LongitudinalData;
struct PathData { struct PathData {
points @0 :List(Float32); points @0 :List(Float32);
@ -605,6 +608,7 @@ struct ModelData {
yuvCorrection @5 :List(Float32); yuvCorrection @5 :List(Float32);
inputTransform @6 :List(Float32); inputTransform @6 :List(Float32);
} }
struct MetaData { struct MetaData {
engagedProb @0 :Float32; engagedProb @0 :Float32;
desirePrediction @1 :List(Float32); desirePrediction @1 :List(Float32);
@ -612,6 +616,11 @@ struct ModelData {
gasDisengageProb @3 :Float32; gasDisengageProb @3 :Float32;
steerOverrideProb @4 :Float32; steerOverrideProb @4 :Float32;
} }
struct LongitudinalData {
speeds @0 :List(Float32);
accelerations @1 :List(Float32);
}
} }
struct CalibrationFeatures { struct CalibrationFeatures {
@ -1757,6 +1766,8 @@ struct DriverMonitoring {
leftBlinkProb @8 :Float32; leftBlinkProb @8 :Float32;
rightBlinkProb @9 :Float32; rightBlinkProb @9 :Float32;
irPwrDEPRECATED @10 :Float32; irPwrDEPRECATED @10 :Float32;
faceOrientationStd @11 :List(Float32);
facePositionStd @12 :List(Float32);
} }
struct Boot { struct Boot {

@ -1,6 +1,7 @@
# must be build with scons # must be build with scons
from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error from .messaging_pyx import Context, Poller, SubSocket, PubSocket # pylint: disable=no-name-in-module, import-error
from .messaging_pyx import MultiplePublishersError, MessagingError # pylint: disable=no-name-in-module, import-error from .messaging_pyx import MultiplePublishersError, MessagingError # pylint: disable=no-name-in-module, import-error
import capnp
assert MultiplePublishersError assert MultiplePublishersError
assert MessagingError assert MessagingError
@ -116,6 +117,7 @@ def recv_one_retry(sock):
if dat is not None: if dat is not None:
return log.Event.from_bytes(dat) return log.Event.from_bytes(dat)
# TODO: This does not belong in messaging
def get_one_can(logcan): def get_one_can(logcan):
while True: while True:
can = recv_one_retry(logcan) can = recv_one_retry(logcan)
@ -147,12 +149,12 @@ class SubMaster():
self.freq[s] = service_list[s].frequency self.freq[s] = service_list[s].frequency
data = new_message() data = new_message()
if s in ['can', 'sensorEvents', 'liveTracks', 'sendCan', try:
'ethernetData', 'cellInfo', 'wifiScan',
'trafficEvents', 'orbObservation', 'carEvents']:
data.init(s, 0)
else:
data.init(s) data.init(s)
except capnp.lib.capnp.KjException:
# lists
data.init(s, 0)
self.data[s] = getattr(data, s) self.data[s] = getattr(data, s)
self.logMonoTime[s] = 0 self.logMonoTime[s] = 0
self.valid[s] = data.valid self.valid[s] = data.valid

@ -4,6 +4,8 @@
#include <csignal> #include <csignal>
#include <map> #include <map>
typedef void (*sighandler_t)(int sig);
#include "services.h" #include "services.h"
#include "impl_msgq.hpp" #include "impl_msgq.hpp"

@ -85,7 +85,6 @@ Message * MSGQSubSocket::receive(bool non_blocking){
msgq_msg_t msg; msgq_msg_t msg;
MSGQMessage *r = NULL; MSGQMessage *r = NULL;
r = NULL;
int rc = msgq_msg_recv(&msg, q); int rc = msgq_msg_recv(&msg, q);
@ -109,17 +108,23 @@ Message * MSGQSubSocket::receive(bool non_blocking){
} }
} }
if (rc > 0){
r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
}
errno = msgq_do_exit ? EINTR : 0;
if (!non_blocking){ if (!non_blocking){
std::signal(SIGINT, prev_handler_sigint); std::signal(SIGINT, prev_handler_sigint);
std::signal(SIGTERM, prev_handler_sigterm); std::signal(SIGTERM, prev_handler_sigterm);
} }
errno = msgq_do_exit ? EINTR : 0;
if (rc > 0){
if (msgq_do_exit){
msgq_msg_close(&msg); // Free unused message on exit
} else {
r = new MSGQMessage;
r->takeOwnership(msg.data, msg.size);
}
}
return (Message*)r; return (Message*)r;
} }

@ -4,20 +4,20 @@
Context * Context::create(){ Context * Context::create(){
Context * c; Context * c;
if (std::getenv("MSGQ")){ if (std::getenv("ZMQ")){
c = new MSGQContext();
} else {
c = new ZMQContext(); c = new ZMQContext();
} else {
c = new MSGQContext();
} }
return c; return c;
} }
SubSocket * SubSocket::create(){ SubSocket * SubSocket::create(){
SubSocket * s; SubSocket * s;
if (std::getenv("MSGQ")){ if (std::getenv("ZMQ")){
s = new MSGQSubSocket();
} else {
s = new ZMQSubSocket(); s = new ZMQSubSocket();
} else {
s = new MSGQSubSocket();
} }
return s; return s;
} }
@ -60,10 +60,10 @@ SubSocket * SubSocket::create(Context * context, std::string endpoint, std::stri
PubSocket * PubSocket::create(){ PubSocket * PubSocket::create(){
PubSocket * s; PubSocket * s;
if (std::getenv("MSGQ")){ if (std::getenv("ZMQ")){
s = new MSGQPubSocket();
} else {
s = new ZMQPubSocket(); s = new ZMQPubSocket();
} else {
s = new MSGQPubSocket();
} }
return s; return s;
} }
@ -82,10 +82,10 @@ PubSocket * PubSocket::create(Context * context, std::string endpoint){
Poller * Poller::create(){ Poller * Poller::create(){
Poller * p; Poller * p;
if (std::getenv("MSGQ")){ if (std::getenv("ZMQ")){
p = new MSGQPoller();
} else {
p = new ZMQPoller(); p = new ZMQPoller();
} else {
p = new MSGQPoller();
} }
return p; return p;
} }

@ -23,8 +23,8 @@
#include "msgq.hpp" #include "msgq.hpp"
void sigusr1_handler(int signal) { void sigusr2_handler(int signal) {
assert(signal == SIGUSR1); assert(signal == SIGUSR2);
} }
uint64_t msgq_get_uid(void){ uint64_t msgq_get_uid(void){
@ -80,7 +80,7 @@ void msgq_wait_for_subscriber(msgq_queue_t *q){
int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){ int msgq_new_queue(msgq_queue_t * q, const char * path, size_t size){
assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes assert(size < 0xFFFFFFFF); // Buffer must be smaller than 2^32 bytes
std::signal(SIGUSR1, sigusr1_handler); std::signal(SIGUSR2, sigusr2_handler);
const char * prefix = "/dev/shm/"; const char * prefix = "/dev/shm/";
char * full_path = new char[strlen(path) + strlen(prefix) + 1]; char * full_path = new char[strlen(path) + strlen(prefix) + 1];
@ -136,7 +136,7 @@ void msgq_close_queue(msgq_queue_t *q){
void msgq_init_publisher(msgq_queue_t * q) { void msgq_init_publisher(msgq_queue_t * q) {
std::cout << "Starting publisher" << std::endl; //std::cout << "Starting publisher" << std::endl;
uint64_t uid = msgq_get_uid(); uint64_t uid = msgq_get_uid();
*q->write_uid = uid; *q->write_uid = uid;
@ -150,6 +150,15 @@ void msgq_init_publisher(msgq_queue_t * q) {
q->write_uid_local = uid; q->write_uid_local = uid;
} }
static void thread_signal(uint32_t tid) {
#ifndef SYS_tkill
// TODO: this won't work for multithreaded programs
kill(tid, SIGUSR2);
#else
syscall(SYS_tkill, tid, SIGUSR2);
#endif
}
void msgq_init_subscriber(msgq_queue_t * q) { void msgq_init_subscriber(msgq_queue_t * q) {
assert(q != NULL); assert(q != NULL);
assert(q->num_readers != NULL); assert(q->num_readers != NULL);
@ -173,7 +182,7 @@ void msgq_init_subscriber(msgq_queue_t * q) {
*q->read_uids[i] = 0; *q->read_uids[i] = 0;
// Wake up reader in case they are in a poll // Wake up reader in case they are in a poll
syscall(SYS_tkill, old_uid & 0xFFFFFFFF, SIGUSR1); thread_signal(old_uid & 0xFFFFFFFF);
} }
continue; continue;
@ -196,7 +205,7 @@ void msgq_init_subscriber(msgq_queue_t * q) {
} }
} }
std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl; //std::cout << "New subscriber id: " << q->reader_id << " uid: " << q->read_uid_local << " " << q->endpoint << std::endl;
msgq_reset_reader(q); msgq_reset_reader(q);
} }
@ -278,8 +287,7 @@ int msgq_msg_send(msgq_msg_t * msg, msgq_queue_t *q){
// Notify readers // Notify readers
for (uint64_t i = 0; i < num_readers; i++){ for (uint64_t i = 0; i < num_readers; i++){
uint64_t reader_uid = *q->read_uids[i]; uint64_t reader_uid = *q->read_uids[i];
thread_signal(reader_uid & 0xFFFFFFFF);
syscall(SYS_tkill, reader_uid & 0xFFFFFFFF, SIGUSR1);
} }
return msg->size; return msg->size;

@ -25,7 +25,7 @@ encodeIdx: [8015, true, 20.]
liveTracks: [8016, true, 20.] liveTracks: [8016, true, 20.]
sendcan: [8017, true, 100.] sendcan: [8017, true, 100.]
logMessage: [8018, true, 0.] logMessage: [8018, true, 0.]
liveCalibration: [8019, true, 5.] liveCalibration: [8019, true, 4., 4]
androidLog: [8020, true, 0.] androidLog: [8020, true, 0.]
carState: [8021, true, 100., 10] carState: [8021, true, 100., 10]
# 8022 is reserved for sshd # 8022 is reserved for sshd
@ -68,7 +68,7 @@ orbFeaturesSummary: [8062, true, 0.]
driverMonitoring: [8063, true, 5., 1] driverMonitoring: [8063, true, 5., 1]
liveParameters: [8064, true, 10.] liveParameters: [8064, true, 10.]
liveMapData: [8065, true, 0.] liveMapData: [8065, true, 0.]
cameraOdometry: [8066, true, 5.] cameraOdometry: [8066, true, 20.]
pathPlan: [8067, true, 20.] pathPlan: [8067, true, 20.]
kalmanOdometry: [8068, true, 0.] kalmanOdometry: [8068, true, 0.]
thumbnail: [8069, true, 0.2, 1] thumbnail: [8069, true, 0.2, 1]

Loading…
Cancel
Save