diff --git a/release/files_common b/release/files_common index b2791c4d46..e0f7ad6d29 100644 --- a/release/files_common +++ b/release/files_common @@ -173,7 +173,6 @@ selfdrive/common/efd.[c,h] selfdrive/common/cqueue.[c,h] selfdrive/common/clutil.[c,h] selfdrive/common/messaging.h -selfdrive/common/messagehelp.h selfdrive/common/params.h selfdrive/common/params.cc selfdrive/common/mutex.h diff --git a/selfdrive/boardd/boardd.cc b/selfdrive/boardd/boardd.cc index 117dacf4ce..ab38ed67d5 100644 --- a/selfdrive/boardd/boardd.cc +++ b/selfdrive/boardd/boardd.cc @@ -25,7 +25,6 @@ #include "common/params.h" #include "common/swaglog.h" #include "common/timing.h" -#include "common/messagehelp.h" #include "messaging.hpp" #include @@ -548,12 +547,16 @@ void can_send(SubSocket *subscriber) { int err; // recv from sendcan - MessageReader amsg = subscriber->receive(); - if (!amsg) return; + Message * msg = subscriber->receive(); - auto event = amsg.getEvent(); + auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); + memcpy(amsg.begin(), msg->getData(), msg->getSize()); + + capnp::FlatArrayMessageReader cmsg(amsg); + cereal::Event::Reader event = cmsg.getRoot(); if (nanos_since_boot() - event.getLogMonoTime() > 1e9) { //Older than 1 second. Dont send. + delete msg; return; } int msg_count = event.getSendcan().size(); @@ -574,10 +577,15 @@ void can_send(SubSocket *subscriber) { send[i*4+1] = cmsg.getDat().size() | (cmsg.getSrc() << 4); memcpy(&send[i*4+2], cmsg.getDat().begin(), cmsg.getDat().size()); } + + // release msg + delete msg; + // send to board int sent; pthread_mutex_lock(&usb_lock); + if (!fake_send) { do { // Try sending can messages. If the receive buffer on the panda is full it will NAK @@ -707,10 +715,17 @@ void *hardware_control_thread(void *crap) { while (!do_exit) { cnt++; for (auto sock : poller->poll(1000)){ - MessageReader amsg = sock->receive(); - if (!amsg) continue; + Message * msg = sock->receive(); + if (msg == NULL) continue; + + auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); + memcpy(amsg.begin(), msg->getData(), msg->getSize()); + + delete msg; + + capnp::FlatArrayMessageReader cmsg(amsg); + cereal::Event::Reader event = cmsg.getRoot(); - auto event = amsg.getEvent(); auto type = event.which(); if(type == cereal::Event::THERMAL){ uint16_t fan_speed = event.getThermal().getFanSpeed(); diff --git a/selfdrive/camerad/cameras/camera_frame_stream.cc b/selfdrive/camerad/cameras/camera_frame_stream.cc index d02cc9be09..0061fe6185 100644 --- a/selfdrive/camerad/cameras/camera_frame_stream.cc +++ b/selfdrive/camerad/cameras/camera_frame_stream.cc @@ -15,7 +15,6 @@ #include "common/util.h" #include "common/timing.h" #include "common/swaglog.h" -#include "common/messagehelp.h" #include "buffering.h" extern "C" { @@ -62,10 +61,15 @@ void run_frame_stream(DualCameraState *s) { auto *tb = &rear_camera->camera_tb; while (!do_exit) { - MessageReader amsg = recorder_sock->receive(); - if (!amsg) continue; + Message * msg = recorder_sock->receive(); + + auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); + memcpy(amsg.begin(), msg->getData(), msg->getSize()); + + capnp::FlatArrayMessageReader cmsg(amsg); + cereal::Event::Reader event = cmsg.getRoot(); + auto frame = event.getFrame(); - auto frame = amsg.getEvent().getFrame(); const int buf_idx = tbuffer_select(tb); rear_camera->camera_bufs_metadata[buf_idx] = { .frame_id = frame.getFrameId(), @@ -90,6 +94,8 @@ void run_frame_stream(DualCameraState *s) { clWaitForEvents(1, &map_event); clReleaseEvent(map_event); tbuffer_dispatch(tb, buf_idx); + delete msg; + } delete recorder_sock; delete context; diff --git a/selfdrive/camerad/main.cc b/selfdrive/camerad/main.cc index 95b57de19e..286978016c 100644 --- a/selfdrive/camerad/main.cc +++ b/selfdrive/camerad/main.cc @@ -18,7 +18,6 @@ #include "common/visionipc.h" #include "common/visionbuf.h" #include "common/visionimg.h" -#include "common/messagehelp.h" #include "messaging.hpp" @@ -214,21 +213,33 @@ void* frontview_thread(void *arg) { // no more check after gps check if (!s->rhd_front_checked) { - MessageReader amsg = dmonstate_sock->receive(true); - if (amsg) { - auto state = amsg.getEvent().getDMonitoringState(); - s->rhd_front = state.getIsRHD(); - s->rhd_front_checked = state.getRhdChecked(); + Message *msg_dmon = dmonstate_sock->receive(true); + if (msg_dmon != NULL) { + auto amsg = kj::heapArray((msg_dmon->getSize() / sizeof(capnp::word)) + 1); + memcpy(amsg.begin(), msg_dmon->getData(), msg_dmon->getSize()); + + capnp::FlatArrayMessageReader cmsg(amsg); + cereal::Event::Reader event = cmsg.getRoot(); + + s->rhd_front = event.getDMonitoringState().getIsRHD(); + s->rhd_front_checked = event.getDMonitoringState().getRhdChecked(); + + delete msg_dmon; } } - MessageReader amsg = monitoring_sock->receive(true); - if (amsg) { - auto state = amsg.getEvent().getDriverState(); - float face_prob = state.getFaceProb(); + Message *msg = monitoring_sock->receive(true); + if (msg != NULL) { + auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); + memcpy(amsg.begin(), msg->getData(), msg->getSize()); + + capnp::FlatArrayMessageReader cmsg(amsg); + cereal::Event::Reader event = cmsg.getRoot(); + + float face_prob = event.getDriverState().getFaceProb(); float face_position[2]; - face_position[0] = state.getFacePosition()[0]; - face_position[1] = state.getFacePosition()[1]; + face_position[0] = event.getDriverState().getFacePosition()[0]; + face_position[1] = event.getDriverState().getFacePosition()[1]; // set front camera metering target if (face_prob > 0.4) @@ -246,6 +257,8 @@ void* frontview_thread(void *arg) { s->front_meteringbox_xmin = s->rhd_front ? 0:s->rgb_front_width * 3 / 5; s->front_meteringbox_xmax = s->rhd_front ? s->rgb_front_width * 2 / 5:s->rgb_front_width; } + + delete msg; } // auto exposure diff --git a/selfdrive/common/messagehelp.h b/selfdrive/common/messagehelp.h deleted file mode 100644 index c38d241d9e..0000000000 --- a/selfdrive/common/messagehelp.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once -#include - -#include "cereal/gen/cpp/log.capnp.h" -#include "messaging.hpp" -class MessageReader { - public: - MessageReader(Message *msg) : msg(msg), buf(nullptr), msg_reader(nullptr) {} - ~MessageReader() { - if (msg_reader) delete msg_reader; - if (buf) free(buf); - if (msg) delete msg; - } - inline operator bool() { return msg != NULL; } - inline const char* getData() { return msg->getData(); } - inline size_t getSize() { return msg->getSize(); } - cereal::Event::Reader &getEvent() { - if (!msg_reader) { - msg_reader = newReader(); - event = msg_reader->getRoot(); - } - return event; - } - - private: - capnp::FlatArrayMessageReader *newReader() { - const char *data = msg->getData(); - const size_t size = msg->getSize(); - if (((reinterpret_cast(data)) % sizeof(capnp::word) == 0) && size % sizeof(capnp::word) == 0) { - return new capnp::FlatArrayMessageReader(kj::ArrayPtr((capnp::word *)data, size / sizeof(capnp::word))); - } else { - const size_t words = size / sizeof(capnp::word) + 1; - buf = (capnp::word *)malloc(words * sizeof(capnp::word)); - memcpy(buf, data, size); - return new capnp::FlatArrayMessageReader(kj::ArrayPtr(buf, words)); - } - } - capnp::word *buf; - Message *msg; - capnp::FlatArrayMessageReader *msg_reader; - cereal::Event::Reader event; -}; diff --git a/selfdrive/locationd/paramsd.cc b/selfdrive/locationd/paramsd.cc index 1efc6ced0b..318a759824 100644 --- a/selfdrive/locationd/paramsd.cc +++ b/selfdrive/locationd/paramsd.cc @@ -15,7 +15,6 @@ #include "common/messaging.h" #include "common/params.h" #include "common/timing.h" -#include "common/messagehelp.h" #include "messaging.hpp" #include "locationd_yawrate.h" @@ -106,10 +105,14 @@ int main(int argc, char *argv[]) { int save_counter = 0; while (true){ for (auto s : poller->poll(100)){ - MessageReader amsg = s->receive(); - if (!amsg) continue; + Message * msg = s->receive(); + + auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); + memcpy(amsg.begin(), msg->getData(), msg->getSize()); + + capnp::FlatArrayMessageReader capnp_msg(amsg); + cereal::Event::Reader event = capnp_msg.getRoot(); - auto event = amsg.getEvent(); localizer.handle_log(event); auto which = event.which(); @@ -169,6 +172,7 @@ int main(int argc, char *argv[]) { }); } } + delete msg; } } diff --git a/selfdrive/locationd/ubloxd_main.cc b/selfdrive/locationd/ubloxd_main.cc index af5017c821..4e07932fff 100644 --- a/selfdrive/locationd/ubloxd_main.cc +++ b/selfdrive/locationd/ubloxd_main.cc @@ -23,7 +23,6 @@ #include "common/params.h" #include "common/swaglog.h" #include "common/timing.h" -#include "common/messagehelp.h" #include "ublox_msg.h" @@ -55,12 +54,17 @@ int ubloxd_main(poll_ubloxraw_msg_func poll_func, send_gps_event_func send_func) while (!do_exit) { - MessageReader amsg = poll_func(poller); - if (!amsg) continue; + Message * msg = poll_func(poller); + if (msg == NULL) continue; - auto ubloxRaw = amsg.getEvent().getUbloxRaw(); - const uint8_t *data = ubloxRaw.begin(); - size_t len = ubloxRaw.size(); + auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); + memcpy(amsg.begin(), msg->getData(), msg->getSize()); + + capnp::FlatArrayMessageReader cmsg(amsg); + cereal::Event::Reader event = cmsg.getRoot(); + + const uint8_t *data = event.getUbloxRaw().begin(); + size_t len = event.getUbloxRaw().size(); size_t bytes_consumed = 0; while(bytes_consumed < len && !do_exit) { size_t bytes_consumed_this_time = 0U; @@ -110,6 +114,7 @@ int ubloxd_main(poll_ubloxraw_msg_func poll_func, send_gps_event_func send_func) } bytes_consumed += bytes_consumed_this_time; } + delete msg; } delete poller; diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 85f68673af..eb41485230 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -36,7 +36,6 @@ #include "common/visionipc.h" #include "common/utilpp.h" #include "common/util.h" -#include "common/messagehelp.h" #include "logger.h" #include "messaging.hpp" @@ -655,13 +654,21 @@ int main(int argc, char** argv) { while (!do_exit) { for (auto sock : poller->poll(100 * 1000)){ while (true) { - MessageReader amsg = sock->receive(true); - if (!amsg){ + Message * msg = sock->receive(true); + if (msg == NULL){ break; } + + uint8_t* data = (uint8_t*)msg->getData(); + size_t len = msg->getSize(); + if (sock == frame_sock) { - auto event = amsg.getEvent(); // track camera frames to sync to encoder + auto amsg = kj::heapArray((len / sizeof(capnp::word)) + 1); + memcpy(amsg.begin(), data, len); + + capnp::FlatArrayMessageReader cmsg(amsg); + cereal::Event::Reader event = cmsg.getRoot(); if (event.isFrame()) { std::unique_lock lk(s.lock); s.last_frame_id = event.getFrame().getFrameId(); @@ -670,7 +677,8 @@ int main(int argc, char** argv) { } } - logger_log(&s.logger, (uint8_t*)amsg.getData(), amsg.getSize(), qlog_counter[sock] == 0); + logger_log(&s.logger, data, len, qlog_counter[sock] == 0); + delete msg; if (qlog_counter[sock] != -1) { //printf("%p: %d/%d\n", socks[i], qlog_counter[socks[i]], qlog_freqs[socks[i]]); @@ -678,7 +686,7 @@ int main(int argc, char** argv) { qlog_counter[sock] %= qlog_freqs[sock]; } - bytes_count += amsg.getSize(); + bytes_count += len; msg_count++; } } diff --git a/selfdrive/modeld/dmonitoringmodeld.cc b/selfdrive/modeld/dmonitoringmodeld.cc index ac0e277f4e..d17247f9e5 100644 --- a/selfdrive/modeld/dmonitoringmodeld.cc +++ b/selfdrive/modeld/dmonitoringmodeld.cc @@ -7,7 +7,6 @@ #include "common/visionbuf.h" #include "common/visionipc.h" #include "common/swaglog.h" -#include "common/messagehelp.h" #include "models/dmonitoring.h" @@ -62,11 +61,17 @@ int main(int argc, char **argv) { //printf("frame_id: %d %dx%d\n", extra.frame_id, buf_info.width, buf_info.height); if (!dmonitoringmodel.is_rhd_checked) { if (chk_counter >= RHD_CHECK_INTERVAL) { - MessageReader amsg = dmonstate_sock->receive(true); - if (amsg) { - auto state = amsg.getEvent().getDMonitoringState(); - dmonitoringmodel.is_rhd = state.getIsRHD(); - dmonitoringmodel.is_rhd_checked = state.getRhdChecked(); + Message *msg = dmonstate_sock->receive(true); + if (msg != NULL) { + auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); + memcpy(amsg.begin(), msg->getData(), msg->getSize()); + + capnp::FlatArrayMessageReader cmsg(amsg); + cereal::Event::Reader event = cmsg.getRoot(); + + dmonitoringmodel.is_rhd = event.getDMonitoringState().getIsRHD(); + dmonitoringmodel.is_rhd_checked = event.getDMonitoringState().getRhdChecked(); + delete msg; } chk_counter = 0; } diff --git a/selfdrive/modeld/modeld.cc b/selfdrive/modeld/modeld.cc index 102543f4e9..309c64c18a 100644 --- a/selfdrive/modeld/modeld.cc +++ b/selfdrive/modeld/modeld.cc @@ -4,7 +4,6 @@ #include "common/visionbuf.h" #include "common/visionipc.h" #include "common/swaglog.h" -#include "common/messagehelp.h" #include "models/driving.h" @@ -49,10 +48,14 @@ void* live_thread(void *arg) { while (!do_exit) { for (auto sock : poller->poll(10)){ - MessageReader amsg = sock->receive(); - if (!amsg) continue; + Message * msg = sock->receive(); + + auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); + memcpy(amsg.begin(), msg->getData(), msg->getSize()); + + capnp::FlatArrayMessageReader cmsg(amsg); + cereal::Event::Reader event = cmsg.getRoot(); - auto event = amsg.getEvent(); if (event.isLiveCalibration()) { pthread_mutex_lock(&transform_lock); @@ -77,6 +80,8 @@ void* live_thread(void *arg) { run_model = true; pthread_mutex_unlock(&transform_lock); } + + delete msg; } } @@ -188,10 +193,18 @@ int main(int argc, char **argv) { const bool run_model_this_iter = run_model; pthread_mutex_unlock(&transform_lock); - MessageReader amsg = pathplan_sock->receive(true); - if (amsg) { + Message *msg = pathplan_sock->receive(true); + if (msg != NULL) { + // TODO: copy and pasted from camerad/main.cc + auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); + memcpy(amsg.begin(), msg->getData(), msg->getSize()); + + capnp::FlatArrayMessageReader cmsg(amsg); + cereal::Event::Reader event = cmsg.getRoot(); + // TODO: path planner timeout? - desire = ((int)amsg.getEvent().getPathPlan().getDesire()) - 1; + desire = ((int)event.getPathPlan().getDesire()) - 1; + delete msg; } double mt1 = 0, mt2 = 0;