diff --git a/release/files_common b/release/files_common index afa5b8772d..4a666b6b0b 100644 --- a/release/files_common +++ b/release/files_common @@ -173,6 +173,7 @@ selfdrive/common/efd.[c,h] selfdrive/common/cqueue.[c,h] selfdrive/common/clutil.[c,h] selfdrive/common/messaging.h +selfdrive/common/messagehelp.h selfdrive/common/params.h selfdrive/common/params.cc selfdrive/common/mutex.h diff --git a/selfdrive/boardd/boardd.cc b/selfdrive/boardd/boardd.cc index 4e227857f0..15711f4b1e 100644 --- a/selfdrive/boardd/boardd.cc +++ b/selfdrive/boardd/boardd.cc @@ -26,6 +26,7 @@ #include "common/params.h" #include "common/swaglog.h" #include "common/timing.h" +#include "common/messagehelp.h" #include "messaging.hpp" #include @@ -548,16 +549,12 @@ void can_send(SubSocket *subscriber) { int err; // recv from sendcan - Message * msg = subscriber->receive(); + MessageReader amsg = subscriber->receive(); + if (!amsg) return; - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); + auto event = amsg.getEvent(); if (nanos_since_boot() - event.getLogMonoTime() > 1e9) { //Older than 1 second. Dont send. - delete msg; return; } int msg_count = event.getSendcan().size(); @@ -578,15 +575,10 @@ void can_send(SubSocket *subscriber) { send[i*4+1] = cmsg.getDat().size() | (cmsg.getSrc() << 4); memcpy(&send[i*4+2], cmsg.getDat().begin(), cmsg.getDat().size()); } - - // release msg - delete msg; - // send to board int sent; pthread_mutex_lock(&usb_lock); - if (!fake_send) { do { // Try sending can messages. If the receive buffer on the panda is full it will NAK @@ -716,17 +708,10 @@ void *hardware_control_thread(void *crap) { while (!do_exit) { cnt++; for (auto sock : poller->poll(1000)){ - Message * msg = sock->receive(); - if (msg == NULL) continue; - - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - delete msg; - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); + MessageReader amsg = sock->receive(); + if (!amsg) continue; + auto event = amsg.getEvent(); auto type = event.which(); if(type == cereal::Event::THERMAL){ uint16_t fan_speed = event.getThermal().getFanSpeed(); diff --git a/selfdrive/camerad/cameras/camera_frame_stream.cc b/selfdrive/camerad/cameras/camera_frame_stream.cc index 0061fe6185..d02cc9be09 100644 --- a/selfdrive/camerad/cameras/camera_frame_stream.cc +++ b/selfdrive/camerad/cameras/camera_frame_stream.cc @@ -15,6 +15,7 @@ #include "common/util.h" #include "common/timing.h" #include "common/swaglog.h" +#include "common/messagehelp.h" #include "buffering.h" extern "C" { @@ -61,15 +62,10 @@ void run_frame_stream(DualCameraState *s) { auto *tb = &rear_camera->camera_tb; while (!do_exit) { - Message * msg = recorder_sock->receive(); - - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - auto frame = event.getFrame(); + MessageReader amsg = recorder_sock->receive(); + if (!amsg) continue; + auto frame = amsg.getEvent().getFrame(); const int buf_idx = tbuffer_select(tb); rear_camera->camera_bufs_metadata[buf_idx] = { .frame_id = frame.getFrameId(), @@ -94,8 +90,6 @@ void run_frame_stream(DualCameraState *s) { clWaitForEvents(1, &map_event); clReleaseEvent(map_event); tbuffer_dispatch(tb, buf_idx); - delete msg; - } delete recorder_sock; delete context; diff --git a/selfdrive/camerad/main.cc b/selfdrive/camerad/main.cc index f02f7824aa..ccbd9874b8 100644 --- a/selfdrive/camerad/main.cc +++ b/selfdrive/camerad/main.cc @@ -18,6 +18,7 @@ #include "common/visionipc.h" #include "common/visionbuf.h" #include "common/visionimg.h" +#include "common/messagehelp.h" #include "messaging.hpp" @@ -202,33 +203,21 @@ void* frontview_thread(void *arg) { // no more check after gps check if (!s->rhd_front_checked) { - Message *msg_dmon = dmonstate_sock->receive(true); - if (msg_dmon != NULL) { - auto amsg = kj::heapArray((msg_dmon->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg_dmon->getData(), msg_dmon->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - - s->rhd_front = event.getDMonitoringState().getIsRHD(); - s->rhd_front_checked = event.getDMonitoringState().getRhdChecked(); - - delete msg_dmon; + MessageReader amsg = dmonstate_sock->receive(true); + if (amsg) { + auto state = amsg.getEvent().getDMonitoringState(); + s->rhd_front = state.getIsRHD(); + s->rhd_front_checked = state.getRhdChecked(); } } - Message *msg = monitoring_sock->receive(true); - if (msg != NULL) { - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - - float face_prob = event.getDriverState().getFaceProb(); + MessageReader amsg = monitoring_sock->receive(true); + if (amsg) { + auto state = amsg.getEvent().getDriverState(); + float face_prob = state.getFaceProb(); float face_position[2]; - face_position[0] = event.getDriverState().getFacePosition()[0]; - face_position[1] = event.getDriverState().getFacePosition()[1]; + face_position[0] = state.getFacePosition()[0]; + face_position[1] = state.getFacePosition()[1]; // set front camera metering target if (face_prob > 0.4) @@ -246,8 +235,6 @@ void* frontview_thread(void *arg) { s->front_meteringbox_xmin = s->rhd_front ? 0:s->rgb_front_width * 3 / 5; s->front_meteringbox_xmax = s->rhd_front ? s->rgb_front_width * 2 / 5:s->rgb_front_width; } - - delete msg; } // auto exposure diff --git a/selfdrive/common/messagehelp.h b/selfdrive/common/messagehelp.h new file mode 100644 index 0000000000..c38d241d9e --- /dev/null +++ b/selfdrive/common/messagehelp.h @@ -0,0 +1,42 @@ +#pragma once +#include + +#include "cereal/gen/cpp/log.capnp.h" +#include "messaging.hpp" +class MessageReader { + public: + MessageReader(Message *msg) : msg(msg), buf(nullptr), msg_reader(nullptr) {} + ~MessageReader() { + if (msg_reader) delete msg_reader; + if (buf) free(buf); + if (msg) delete msg; + } + inline operator bool() { return msg != NULL; } + inline const char* getData() { return msg->getData(); } + inline size_t getSize() { return msg->getSize(); } + cereal::Event::Reader &getEvent() { + if (!msg_reader) { + msg_reader = newReader(); + event = msg_reader->getRoot(); + } + return event; + } + + private: + capnp::FlatArrayMessageReader *newReader() { + const char *data = msg->getData(); + const size_t size = msg->getSize(); + if (((reinterpret_cast(data)) % sizeof(capnp::word) == 0) && size % sizeof(capnp::word) == 0) { + return new capnp::FlatArrayMessageReader(kj::ArrayPtr((capnp::word *)data, size / sizeof(capnp::word))); + } else { + const size_t words = size / sizeof(capnp::word) + 1; + buf = (capnp::word *)malloc(words * sizeof(capnp::word)); + memcpy(buf, data, size); + return new capnp::FlatArrayMessageReader(kj::ArrayPtr(buf, words)); + } + } + capnp::word *buf; + Message *msg; + capnp::FlatArrayMessageReader *msg_reader; + cereal::Event::Reader event; +}; diff --git a/selfdrive/locationd/paramsd.cc b/selfdrive/locationd/paramsd.cc index 318a759824..1efc6ced0b 100644 --- a/selfdrive/locationd/paramsd.cc +++ b/selfdrive/locationd/paramsd.cc @@ -15,6 +15,7 @@ #include "common/messaging.h" #include "common/params.h" #include "common/timing.h" +#include "common/messagehelp.h" #include "messaging.hpp" #include "locationd_yawrate.h" @@ -105,14 +106,10 @@ int main(int argc, char *argv[]) { int save_counter = 0; while (true){ for (auto s : poller->poll(100)){ - Message * msg = s->receive(); - - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader capnp_msg(amsg); - cereal::Event::Reader event = capnp_msg.getRoot(); + MessageReader amsg = s->receive(); + if (!amsg) continue; + auto event = amsg.getEvent(); localizer.handle_log(event); auto which = event.which(); @@ -172,7 +169,6 @@ int main(int argc, char *argv[]) { }); } } - delete msg; } } diff --git a/selfdrive/locationd/ubloxd_main.cc b/selfdrive/locationd/ubloxd_main.cc index 4e07932fff..af5017c821 100644 --- a/selfdrive/locationd/ubloxd_main.cc +++ b/selfdrive/locationd/ubloxd_main.cc @@ -23,6 +23,7 @@ #include "common/params.h" #include "common/swaglog.h" #include "common/timing.h" +#include "common/messagehelp.h" #include "ublox_msg.h" @@ -54,17 +55,12 @@ int ubloxd_main(poll_ubloxraw_msg_func poll_func, send_gps_event_func send_func) while (!do_exit) { - Message * msg = poll_func(poller); - if (msg == NULL) continue; + MessageReader amsg = poll_func(poller); + if (!amsg) continue; - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - - const uint8_t *data = event.getUbloxRaw().begin(); - size_t len = event.getUbloxRaw().size(); + auto ubloxRaw = amsg.getEvent().getUbloxRaw(); + const uint8_t *data = ubloxRaw.begin(); + size_t len = ubloxRaw.size(); size_t bytes_consumed = 0; while(bytes_consumed < len && !do_exit) { size_t bytes_consumed_this_time = 0U; @@ -114,7 +110,6 @@ int ubloxd_main(poll_ubloxraw_msg_func poll_func, send_gps_event_func send_func) } bytes_consumed += bytes_consumed_this_time; } - delete msg; } delete poller; diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index eb41485230..85f68673af 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -36,6 +36,7 @@ #include "common/visionipc.h" #include "common/utilpp.h" #include "common/util.h" +#include "common/messagehelp.h" #include "logger.h" #include "messaging.hpp" @@ -654,21 +655,13 @@ int main(int argc, char** argv) { while (!do_exit) { for (auto sock : poller->poll(100 * 1000)){ while (true) { - Message * msg = sock->receive(true); - if (msg == NULL){ + MessageReader amsg = sock->receive(true); + if (!amsg){ break; } - - uint8_t* data = (uint8_t*)msg->getData(); - size_t len = msg->getSize(); - if (sock == frame_sock) { + auto event = amsg.getEvent(); // track camera frames to sync to encoder - auto amsg = kj::heapArray((len / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), data, len); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); if (event.isFrame()) { std::unique_lock lk(s.lock); s.last_frame_id = event.getFrame().getFrameId(); @@ -677,8 +670,7 @@ int main(int argc, char** argv) { } } - logger_log(&s.logger, data, len, qlog_counter[sock] == 0); - delete msg; + logger_log(&s.logger, (uint8_t*)amsg.getData(), amsg.getSize(), qlog_counter[sock] == 0); if (qlog_counter[sock] != -1) { //printf("%p: %d/%d\n", socks[i], qlog_counter[socks[i]], qlog_freqs[socks[i]]); @@ -686,7 +678,7 @@ int main(int argc, char** argv) { qlog_counter[sock] %= qlog_freqs[sock]; } - bytes_count += len; + bytes_count += amsg.getSize(); msg_count++; } } diff --git a/selfdrive/modeld/dmonitoringmodeld.cc b/selfdrive/modeld/dmonitoringmodeld.cc index d17247f9e5..ac0e277f4e 100644 --- a/selfdrive/modeld/dmonitoringmodeld.cc +++ b/selfdrive/modeld/dmonitoringmodeld.cc @@ -7,6 +7,7 @@ #include "common/visionbuf.h" #include "common/visionipc.h" #include "common/swaglog.h" +#include "common/messagehelp.h" #include "models/dmonitoring.h" @@ -61,17 +62,11 @@ int main(int argc, char **argv) { //printf("frame_id: %d %dx%d\n", extra.frame_id, buf_info.width, buf_info.height); if (!dmonitoringmodel.is_rhd_checked) { if (chk_counter >= RHD_CHECK_INTERVAL) { - Message *msg = dmonstate_sock->receive(true); - if (msg != NULL) { - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - - dmonitoringmodel.is_rhd = event.getDMonitoringState().getIsRHD(); - dmonitoringmodel.is_rhd_checked = event.getDMonitoringState().getRhdChecked(); - delete msg; + MessageReader amsg = dmonstate_sock->receive(true); + if (amsg) { + auto state = amsg.getEvent().getDMonitoringState(); + dmonitoringmodel.is_rhd = state.getIsRHD(); + dmonitoringmodel.is_rhd_checked = state.getRhdChecked(); } chk_counter = 0; } diff --git a/selfdrive/modeld/modeld.cc b/selfdrive/modeld/modeld.cc index 309c64c18a..102543f4e9 100644 --- a/selfdrive/modeld/modeld.cc +++ b/selfdrive/modeld/modeld.cc @@ -4,6 +4,7 @@ #include "common/visionbuf.h" #include "common/visionipc.h" #include "common/swaglog.h" +#include "common/messagehelp.h" #include "models/driving.h" @@ -48,14 +49,10 @@ void* live_thread(void *arg) { while (!do_exit) { for (auto sock : poller->poll(10)){ - Message * msg = sock->receive(); - - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); + MessageReader amsg = sock->receive(); + if (!amsg) continue; + auto event = amsg.getEvent(); if (event.isLiveCalibration()) { pthread_mutex_lock(&transform_lock); @@ -80,8 +77,6 @@ void* live_thread(void *arg) { run_model = true; pthread_mutex_unlock(&transform_lock); } - - delete msg; } } @@ -193,18 +188,10 @@ int main(int argc, char **argv) { const bool run_model_this_iter = run_model; pthread_mutex_unlock(&transform_lock); - Message *msg = pathplan_sock->receive(true); - if (msg != NULL) { - // TODO: copy and pasted from camerad/main.cc - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - + MessageReader amsg = pathplan_sock->receive(true); + if (amsg) { // TODO: path planner timeout? - desire = ((int)event.getPathPlan().getDesire()) - 1; - delete msg; + desire = ((int)amsg.getEvent().getPathPlan().getDesire()) - 1; } double mt1 = 0, mt2 = 0;