You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
				
					212 lines
				
				5.9 KiB
			
		
		
			
		
	
	
					212 lines
				
				5.9 KiB
			| 
								 
											1 year ago
										 
									 | 
							
								#include <time.h>
							 | 
						||
| 
								 | 
							
								#include <assert.h>
							 | 
						||
| 
								 | 
							
								#include <stdlib.h>
							 | 
						||
| 
								 | 
							
								#include <string>
							 | 
						||
| 
								 | 
							
								#include <mutex>
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								#include "cereal/services.h"
							 | 
						||
| 
								 | 
							
								#include "cereal/messaging/messaging.h"
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								const bool SIMULATION = (getenv("SIMULATION") != nullptr) && (std::string(getenv("SIMULATION")) == "1");
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								static inline uint64_t nanos_since_boot() {
							 | 
						||
| 
								 | 
							
								  struct timespec t;
							 | 
						||
| 
								 | 
							
								  clock_gettime(CLOCK_BOOTTIME, &t);
							 | 
						||
| 
								 | 
							
								  return t.tv_sec * 1000000000ULL + t.tv_nsec;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								static inline bool inList(const std::vector<const char *> &list, const char *value) {
							 | 
						||
| 
								 | 
							
								  for (auto &v : list) {
							 | 
						||
| 
								 | 
							
								    if (strcmp(value, v) == 0) return true;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  return false;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								class MessageContext {
							 | 
						||
| 
								 | 
							
								public:
							 | 
						||
| 
								 | 
							
								  MessageContext() : ctx_(nullptr) {}
							 | 
						||
| 
								 | 
							
								  ~MessageContext() { delete ctx_; }
							 | 
						||
| 
								 | 
							
								  inline Context *context() {
							 | 
						||
| 
								 | 
							
								    std::call_once(init_flag, [=]() { ctx_ = Context::create(); });
							 | 
						||
| 
								 | 
							
								    return ctx_;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								private:
							 | 
						||
| 
								 | 
							
								  Context *ctx_;
							 | 
						||
| 
								 | 
							
								  std::once_flag init_flag;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								MessageContext message_context;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								struct SubMaster::SubMessage {
							 | 
						||
| 
								 | 
							
								  std::string name;
							 | 
						||
| 
								 | 
							
								  SubSocket *socket = nullptr;
							 | 
						||
| 
								 | 
							
								  int freq = 0;
							 | 
						||
| 
								 | 
							
								  bool updated = false, alive = false, valid = true, ignore_alive;
							 | 
						||
| 
								 | 
							
								  uint64_t rcv_time = 0, rcv_frame = 0;
							 | 
						||
| 
								 | 
							
								  void *allocated_msg_reader = nullptr;
							 | 
						||
| 
								 | 
							
								  bool is_polled = false;
							 | 
						||
| 
								 | 
							
								  capnp::FlatArrayMessageReader *msg_reader = nullptr;
							 | 
						||
| 
								 | 
							
								  AlignedBuffer aligned_buf;
							 | 
						||
| 
								 | 
							
								  cereal::Event::Reader event;
							 | 
						||
| 
								 | 
							
								};
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								SubMaster::SubMaster(const std::vector<const char *> &service_list, const std::vector<const char *> &poll,
							 | 
						||
| 
								 | 
							
								                     const char *address, const std::vector<const char *> &ignore_alive) {
							 | 
						||
| 
								 | 
							
								  poller_ = Poller::create();
							 | 
						||
| 
								 | 
							
								  for (auto name : service_list) {
							 | 
						||
| 
								 | 
							
								    assert(services.count(std::string(name)) > 0);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    service serv = services.at(std::string(name));
							 | 
						||
| 
								 | 
							
								    SubSocket *socket = SubSocket::create(message_context.context(), name, address ? address : "127.0.0.1", true);
							 | 
						||
| 
								 | 
							
								    assert(socket != 0);
							 | 
						||
| 
								 | 
							
								    bool is_polled = inList(poll, name) || poll.empty();
							 | 
						||
| 
								 | 
							
								    if (is_polled) poller_->registerSocket(socket);
							 | 
						||
| 
								 | 
							
								    SubMessage *m = new SubMessage{
							 | 
						||
| 
								 | 
							
								      .name = name,
							 | 
						||
| 
								 | 
							
								      .socket = socket,
							 | 
						||
| 
								 | 
							
								      .freq = serv.frequency,
							 | 
						||
| 
								 | 
							
								      .ignore_alive = inList(ignore_alive, name),
							 | 
						||
| 
								 | 
							
								      .allocated_msg_reader = malloc(sizeof(capnp::FlatArrayMessageReader)),
							 | 
						||
| 
								 | 
							
								      .is_polled = is_polled};
							 | 
						||
| 
								 | 
							
								    m->msg_reader = new (m->allocated_msg_reader) capnp::FlatArrayMessageReader({});
							 | 
						||
| 
								 | 
							
								    messages_[socket] = m;
							 | 
						||
| 
								 | 
							
								    services_[name] = m;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								void SubMaster::update(int timeout) {
							 | 
						||
| 
								 | 
							
								  for (auto &kv : messages_) kv.second->updated = false;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  auto sockets = poller_->poll(timeout);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  // add non-polled sockets for non-blocking receive
							 | 
						||
| 
								 | 
							
								  for (auto &kv : messages_) {
							 | 
						||
| 
								 | 
							
								    SubMessage *m = kv.second;
							 | 
						||
| 
								 | 
							
								    SubSocket *s = kv.first;
							 | 
						||
| 
								 | 
							
								    if (!m->is_polled) sockets.push_back(s);
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  uint64_t current_time = nanos_since_boot();
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  std::vector<std::pair<std::string, cereal::Event::Reader>> messages;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  for (auto s : sockets) {
							 | 
						||
| 
								 | 
							
								    Message *msg = s->receive(true);
							 | 
						||
| 
								 | 
							
								    if (msg == nullptr) continue;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    SubMessage *m = messages_.at(s);
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    m->msg_reader->~FlatArrayMessageReader();
							 | 
						||
| 
								 | 
							
								    capnp::ReaderOptions options;
							 | 
						||
| 
								 | 
							
								    options.traversalLimitInWords = kj::maxValue; // Don't limit
							 | 
						||
| 
								 | 
							
								    m->msg_reader = new (m->allocated_msg_reader) capnp::FlatArrayMessageReader(m->aligned_buf.align(msg), options);
							 | 
						||
| 
								 | 
							
								    delete msg;
							 | 
						||
| 
								 | 
							
								    messages.push_back({m->name, m->msg_reader->getRoot<cereal::Event>()});
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  update_msgs(current_time, messages);
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								void SubMaster::update_msgs(uint64_t current_time, const std::vector<std::pair<std::string, cereal::Event::Reader>> &messages){
							 | 
						||
| 
								 | 
							
								  if (++frame == UINT64_MAX) frame = 1;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  for (auto &kv : messages) {
							 | 
						||
| 
								 | 
							
								    auto m_find = services_.find(kv.first);
							 | 
						||
| 
								 | 
							
								    if (m_find == services_.end()){
							 | 
						||
| 
								 | 
							
								      continue;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								    SubMessage *m = m_find->second;
							 | 
						||
| 
								 | 
							
								    m->event = kv.second;
							 | 
						||
| 
								 | 
							
								    m->updated = true;
							 | 
						||
| 
								 | 
							
								    m->rcv_time = current_time;
							 | 
						||
| 
								 | 
							
								    m->rcv_frame = frame;
							 | 
						||
| 
								 | 
							
								    m->valid = m->event.getValid();
							 | 
						||
| 
								 | 
							
								    if (SIMULATION) m->alive = true;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								  if (!SIMULATION) {
							 | 
						||
| 
								 | 
							
								    for (auto &kv : messages_) {
							 | 
						||
| 
								 | 
							
								      SubMessage *m = kv.second;
							 | 
						||
| 
								 | 
							
								      m->alive = (m->freq <= (1e-5) || ((current_time - m->rcv_time) * (1e-9)) < (10.0 / m->freq));
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								bool SubMaster::all_(const std::vector<const char *> &service_list, bool valid, bool alive) {
							 | 
						||
| 
								 | 
							
								  int found = 0;
							 | 
						||
| 
								 | 
							
								  for (auto &kv : messages_) {
							 | 
						||
| 
								 | 
							
								    SubMessage *m = kv.second;
							 | 
						||
| 
								 | 
							
								    if (service_list.size() == 0 || inList(service_list, m->name.c_str())) {
							 | 
						||
| 
								 | 
							
								      found += (!valid || m->valid) && (!alive || (m->alive || m->ignore_alive));
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								  return service_list.size() == 0 ? found == messages_.size() : found == service_list.size();
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								void SubMaster::drain() {
							 | 
						||
| 
								 | 
							
								  while (true) {
							 | 
						||
| 
								 | 
							
								    auto polls = poller_->poll(0);
							 | 
						||
| 
								 | 
							
								    if (polls.size() == 0)
							 | 
						||
| 
								 | 
							
								      break;
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    for (auto sock : polls) {
							 | 
						||
| 
								 | 
							
								      Message *msg = sock->receive(true);
							 | 
						||
| 
								 | 
							
								      delete msg;
							 | 
						||
| 
								 | 
							
								    }
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								bool SubMaster::updated(const char *name) const {
							 | 
						||
| 
								 | 
							
								  return services_.at(name)->updated;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								bool SubMaster::alive(const char *name) const {
							 | 
						||
| 
								 | 
							
								  return services_.at(name)->alive;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								bool SubMaster::valid(const char *name) const {
							 | 
						||
| 
								 | 
							
								  return services_.at(name)->valid;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								uint64_t SubMaster::rcv_frame(const char *name) const {
							 | 
						||
| 
								 | 
							
								  return services_.at(name)->rcv_frame;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								uint64_t SubMaster::rcv_time(const char *name) const {
							 | 
						||
| 
								 | 
							
								  return services_.at(name)->rcv_time;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								cereal::Event::Reader &SubMaster::operator[](const char *name) const {
							 | 
						||
| 
								 | 
							
								  return services_.at(name)->event;
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								SubMaster::~SubMaster() {
							 | 
						||
| 
								 | 
							
								  delete poller_;
							 | 
						||
| 
								 | 
							
								  for (auto &kv : messages_) {
							 | 
						||
| 
								 | 
							
								    SubMessage *m = kv.second;
							 | 
						||
| 
								 | 
							
								    m->msg_reader->~FlatArrayMessageReader();
							 | 
						||
| 
								 | 
							
								    free(m->allocated_msg_reader);
							 | 
						||
| 
								 | 
							
								    delete m->socket;
							 | 
						||
| 
								 | 
							
								    delete m;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								PubMaster::PubMaster(const std::vector<const char *> &service_list) {
							 | 
						||
| 
								 | 
							
								  for (auto name : service_list) {
							 | 
						||
| 
								 | 
							
								    assert(services.count(name) > 0);
							 | 
						||
| 
								 | 
							
								    PubSocket *socket = PubSocket::create(message_context.context(), name);
							 | 
						||
| 
								 | 
							
								    assert(socket);
							 | 
						||
| 
								 | 
							
								    sockets_[name] = socket;
							 | 
						||
| 
								 | 
							
								  }
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								int PubMaster::send(const char *name, MessageBuilder &msg) {
							 | 
						||
| 
								 | 
							
								  auto bytes = msg.toBytes();
							 | 
						||
| 
								 | 
							
								  return send(name, bytes.begin(), bytes.size());
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								PubMaster::~PubMaster() {
							 | 
						||
| 
								 | 
							
								  for (auto s : sockets_) delete s.second;
							 | 
						||
| 
								 | 
							
								}
							 |