diff --git a/selfdrive/boardd/SConscript b/selfdrive/boardd/SConscript index 14c1ff780d..d7e575d064 100644 --- a/selfdrive/boardd/SConscript +++ b/selfdrive/boardd/SConscript @@ -1,6 +1,6 @@ -Import('env', 'common', 'messaging') +Import('env', 'common', 'cereal', 'messaging') -env.Program('boardd.cc', LIBS=['usb-1.0', common, messaging, 'pthread', 'zmq', 'capnp', 'kj']) +env.Program('boardd.cc', LIBS=['usb-1.0', common, cereal, messaging, 'pthread', 'zmq', 'capnp', 'kj']) env.Library('libcan_list_to_can_capnp', ['can_list_to_can_capnp.cc']) env.Command(['boardd_api_impl.so'], diff --git a/selfdrive/boardd/boardd.cc b/selfdrive/boardd/boardd.cc index d5990bc1d8..b1f17a4bc1 100644 --- a/selfdrive/boardd/boardd.cc +++ b/selfdrive/boardd/boardd.cc @@ -2,7 +2,6 @@ #include #include #include -#include #include #include #include @@ -16,8 +15,6 @@ #include -#include -#include "cereal/gen/cpp/log.capnp.h" #include "cereal/gen/cpp/car.capnp.h" #include "common/util.h" @@ -281,7 +278,7 @@ void handle_usb_issue(int err, const char func[]) { // TODO: check other errors, is simply retrying okay? } -void can_recv(PubSocket *publisher) { +void can_recv(PubMaster &pm) { int err; uint32_t data[RECV_SIZE/4]; int recv; @@ -333,13 +330,10 @@ void can_recv(PubSocket *publisher) { canData[i].setSrc((data[i*4+1] >> 4) & 0xff); } - // send to can - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - publisher->send((char*)bytes.begin(), bytes.size()); + pm.send("can", msg); } -void can_health(PubSocket *publisher) { +void can_health(PubMaster &pm) { int cnt; int err; @@ -384,10 +378,7 @@ void can_health(PubSocket *publisher) { // No panda connected, send empty health packet if (!received){ healthData.setHwType(cereal::HealthData::HwType::UNKNOWN); - - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - publisher->send((char*)bytes.begin(), bytes.size()); + pm.send("health", msg); return; } @@ -532,9 +523,7 @@ void can_health(PubSocket *publisher) { } } // send to health - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - publisher->send((char*)bytes.begin(), bytes.size()); + pm.send("health", msg); // send heartbeat back to panda pthread_mutex_lock(&usb_lock); @@ -543,20 +532,11 @@ void can_health(PubSocket *publisher) { } -void can_send(SubSocket *subscriber) { +void can_send(cereal::Event::Reader &event) { int err; - // recv from sendcan - Message * msg = subscriber->receive(); - - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); if (nanos_since_boot() - event.getLogMonoTime() > 1e9) { //Older than 1 second. Dont send. - delete msg; return; } int msg_count = event.getSendcan().size(); @@ -578,14 +558,10 @@ void can_send(SubSocket *subscriber) { memcpy(&send[i*4+2], cmsg.getDat().begin(), cmsg.getDat().size()); } - // release msg - delete msg; - // send to board int sent; pthread_mutex_lock(&usb_lock); - if (!fake_send) { do { // Try sending can messages. If the receive buffer on the panda is full it will NAK @@ -611,29 +587,17 @@ void can_send(SubSocket *subscriber) { void *can_send_thread(void *crap) { LOGD("start send thread"); - - // sendcan = 8017 - Context * context = Context::create(); - SubSocket * subscriber = SubSocket::create(context, "sendcan"); - assert(subscriber != NULL); - + SubMaster sm({"sendcan"}); // drain sendcan to delete any stale messages from previous runs - while (true){ - Message * msg = subscriber->receive(true); - if (msg == NULL){ - break; - } - delete msg; - } - + sm.drain(); // run as fast as messages come in while (!do_exit) { - can_send(subscriber); + if (sm.update(1000) > 0){ + can_send(sm["sendcan"]); + } } - delete subscriber; - delete context; return NULL; } @@ -641,16 +605,14 @@ void *can_recv_thread(void *crap) { LOGD("start recv thread"); // can = 8006 - Context * c = Context::create(); - PubSocket * publisher = PubSocket::create(c, "can"); - assert(publisher != NULL); + PubMaster pm({"can"}); // run at 100hz const uint64_t dt = 10000000ULL; uint64_t next_frame_time = nanos_since_boot() + dt; while (!do_exit) { - can_recv(publisher); + can_recv(pm); uint64_t cur_time = nanos_since_boot(); int64_t remaining = next_frame_time - cur_time; @@ -664,39 +626,26 @@ void *can_recv_thread(void *crap) { next_frame_time += dt; } - - delete publisher; - delete c; return NULL; } void *can_health_thread(void *crap) { LOGD("start health thread"); // health = 8011 - Context * c = Context::create(); - PubSocket * publisher = PubSocket::create(c, "health"); - assert(publisher != NULL); + PubMaster pm({"health"}); // run at 2hz while (!do_exit) { - can_health(publisher); + can_health(pm); usleep(500*1000); } - delete publisher; - delete c; return NULL; } void *hardware_control_thread(void *crap) { LOGD("start hardware control thread"); - Context * c = Context::create(); - SubSocket * thermal_sock = SubSocket::create(c, "thermal"); - SubSocket * front_frame_sock = SubSocket::create(c, "frontFrame"); - assert(thermal_sock != NULL); - assert(front_frame_sock != NULL); - - Poller * poller = Poller::create({thermal_sock, front_frame_sock}); + SubMaster sm({"thermal", "frontFrame"}); // Wait for hardware type to be set. while (hw_type == cereal::HealthData::HwType::UNKNOWN){ @@ -714,42 +663,30 @@ void *hardware_control_thread(void *crap) { while (!do_exit) { cnt++; - for (auto sock : poller->poll(1000)){ - Message * msg = sock->receive(); - if (msg == NULL) continue; - - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - delete msg; - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - - auto type = event.which(); - if(type == cereal::Event::THERMAL){ - uint16_t fan_speed = event.getThermal().getFanSpeed(); - if (fan_speed != prev_fan_speed || cnt % 100 == 0){ - pthread_mutex_lock(&usb_lock); - libusb_control_transfer(dev_handle, 0x40, 0xb1, fan_speed, 0, NULL, 0, TIMEOUT); - pthread_mutex_unlock(&usb_lock); - - prev_fan_speed = fan_speed; - } - } else if (type == cereal::Event::FRONT_FRAME){ - float cur_front_gain = event.getFrontFrame().getGainFrac(); - last_front_frame_t = event.getLogMonoTime(); - - if (cur_front_gain <= CUTOFF_GAIN) { - ir_pwr = 100.0 * MIN_IR_POWER; - } else if (cur_front_gain > SATURATE_GAIN) { - ir_pwr = 100.0 * MAX_IR_POWER; - } else { - ir_pwr = 100.0 * (MIN_IR_POWER + ((cur_front_gain - CUTOFF_GAIN) * (MAX_IR_POWER - MIN_IR_POWER) / (SATURATE_GAIN - CUTOFF_GAIN))); - } + sm.update(1000); + if (sm.updated("thermal")){ + uint16_t fan_speed = sm["thermal"].getThermal().getFanSpeed(); + if (fan_speed != prev_fan_speed || cnt % 100 == 0){ + pthread_mutex_lock(&usb_lock); + libusb_control_transfer(dev_handle, 0x40, 0xb1, fan_speed, 0, NULL, 0, TIMEOUT); + pthread_mutex_unlock(&usb_lock); + + prev_fan_speed = fan_speed; + } + } + if (sm.updated("frontFrame")){ + auto event = sm["frontFrame"]; + float cur_front_gain = event.getFrontFrame().getGainFrac(); + last_front_frame_t = event.getLogMonoTime(); + + if (cur_front_gain <= CUTOFF_GAIN) { + ir_pwr = 100.0 * MIN_IR_POWER; + } else if (cur_front_gain > SATURATE_GAIN) { + ir_pwr = 100.0 * MAX_IR_POWER; + } else { + ir_pwr = 100.0 * (MIN_IR_POWER + ((cur_front_gain - CUTOFF_GAIN) * (MAX_IR_POWER - MIN_IR_POWER) / (SATURATE_GAIN - CUTOFF_GAIN))); } } - // Disable ir_pwr on front frame timeout uint64_t cur_t = nanos_since_boot(); if (cur_t - last_front_frame_t > 1e9){ @@ -765,10 +702,6 @@ void *hardware_control_thread(void *crap) { } - delete poller; - delete thermal_sock; - delete c; - return NULL; } @@ -865,7 +798,7 @@ void pigeon_init() { LOGW("panda GPS on"); } -static void pigeon_publish_raw(PubSocket *publisher, unsigned char *dat, int alen) { +static void pigeon_publish_raw(PubMaster &pm, unsigned char *dat, int alen) { // create message capnp::MallocMessageBuilder msg; cereal::Event::Builder event = msg.initRoot(); @@ -873,18 +806,13 @@ static void pigeon_publish_raw(PubSocket *publisher, unsigned char *dat, int ale auto ublox_raw = event.initUbloxRaw(alen); memcpy(ublox_raw.begin(), dat, alen); - // send to ubloxRaw - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - publisher->send((char*)bytes.begin(), bytes.size()); + pm.send("ubloxRaw", msg); } void *pigeon_thread(void *crap) { // ubloxRaw = 8042 - Context * context = Context::create(); - PubSocket * publisher = PubSocket::create(context, "ubloxRaw"); - assert(publisher != NULL); + PubMaster pm({"ubloxRaw"}); // run at ~100hz unsigned char dat[0x1000]; @@ -910,7 +838,7 @@ void *pigeon_thread(void *crap) { LOGW("received invalid ublox message, resetting panda GPS"); pigeon_init(); } else { - pigeon_publish_raw(publisher, dat, alen); + pigeon_publish_raw(pm, dat, alen); } } @@ -918,9 +846,6 @@ void *pigeon_thread(void *crap) { usleep(10*1000); cnt++; } - - delete publisher; - delete context; return NULL; } diff --git a/selfdrive/camerad/SConscript b/selfdrive/camerad/SConscript index e1930a83f9..618df342c8 100644 --- a/selfdrive/camerad/SConscript +++ b/selfdrive/camerad/SConscript @@ -1,6 +1,6 @@ -Import('env', 'arch', 'messaging', 'common', 'gpucommon', 'visionipc', 'cereal', 'webcam') +Import('env', 'arch', 'cereal', 'messaging', 'common', 'gpucommon', 'visionipc', 'webcam') -libs = ['m', 'pthread', common, 'jpeg', cereal, 'OpenCL', messaging, 'czmq', 'zmq', 'capnp', 'kj', visionipc, gpucommon] +libs = ['m', 'pthread', common, 'jpeg', 'OpenCL', cereal, messaging, 'czmq', 'zmq', 'capnp', 'kj', visionipc, gpucommon] if arch == "aarch64": libs += ['gsl', 'CB', 'adreno_utils', 'EGL', 'GLESv3', 'cutils', 'ui'] diff --git a/selfdrive/camerad/cameras/camera_frame_stream.cc b/selfdrive/camerad/cameras/camera_frame_stream.cc index 0061fe6185..37045f4378 100644 --- a/selfdrive/camerad/cameras/camera_frame_stream.cc +++ b/selfdrive/camerad/cameras/camera_frame_stream.cc @@ -1,15 +1,11 @@ #include "camera_frame_stream.h" -#include #include -#include #include #include #include #include -#include -#include "cereal/gen/cpp/log.capnp.h" #include "messaging.hpp" #include "common/util.h" @@ -53,22 +49,15 @@ void camera_init(CameraState *s, int camera_id, unsigned int fps) { void run_frame_stream(DualCameraState *s) { int err; - Context * context = Context::create(); - SubSocket * recorder_sock = SubSocket::create(context, "frame"); - assert(recorder_sock != NULL); + SubMaster sm({"frame"}); CameraState *const rear_camera = &s->rear; auto *tb = &rear_camera->camera_tb; while (!do_exit) { - Message * msg = recorder_sock->receive(); + if (sm.update(1000) == 0) continue; - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - auto frame = event.getFrame(); + auto frame = sm["frame"].getFrame(); const int buf_idx = tbuffer_select(tb); rear_camera->camera_bufs_metadata[buf_idx] = { @@ -94,11 +83,7 @@ void run_frame_stream(DualCameraState *s) { clWaitForEvents(1, &map_event); clReleaseEvent(map_event); tbuffer_dispatch(tb, buf_idx); - delete msg; - } - delete recorder_sock; - delete context; } } // namespace diff --git a/selfdrive/camerad/main.cc b/selfdrive/camerad/main.cc index 286978016c..f914989ead 100644 --- a/selfdrive/camerad/main.cc +++ b/selfdrive/camerad/main.cc @@ -29,9 +29,7 @@ #include #include -#include #include -#include "cereal/gen/cpp/log.capnp.h" #define UI_BUF_COUNT 4 #define YUV_COUNT 40 @@ -143,10 +141,7 @@ struct VisionState { zsock_t *terminate_pub; - Context * msg_context; - PubSocket *frame_sock; - PubSocket *front_frame_sock; - PubSocket *thumbnail_sock; + PubMaster *pm; pthread_mutex_t clients_lock; VisionClientState clients[MAX_CLIENTS]; @@ -158,16 +153,9 @@ void* frontview_thread(void *arg) { VisionState *s = (VisionState*)arg; set_thread_name("frontview"); - - s->msg_context = Context::create(); - // we subscribe to this for placement of the AE metering box // TODO: the loop is bad, ideally models shouldn't affect sensors - Context *msg_context = Context::create(); - SubSocket *monitoring_sock = SubSocket::create(msg_context, "driverState", "127.0.0.1", true); - SubSocket *dmonstate_sock = SubSocket::create(msg_context, "dMonitoringState", "127.0.0.1", true); - assert(monitoring_sock != NULL); - assert(dmonstate_sock != NULL); + SubMaster sm({"driverState", "dMonitoringState"}); cl_command_queue q = clCreateCommandQueue(s->context, s->device_id, 0, &err); assert(err == 0); @@ -211,54 +199,34 @@ void* frontview_thread(void *arg) { tbuffer_release(&s->cameras.front.camera_tb, buf_idx); visionbuf_sync(&s->rgb_front_bufs[ui_idx], VISIONBUF_SYNC_FROM_DEVICE); + sm.update(0); // no more check after gps check - if (!s->rhd_front_checked) { - Message *msg_dmon = dmonstate_sock->receive(true); - if (msg_dmon != NULL) { - auto amsg = kj::heapArray((msg_dmon->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg_dmon->getData(), msg_dmon->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - - s->rhd_front = event.getDMonitoringState().getIsRHD(); - s->rhd_front_checked = event.getDMonitoringState().getRhdChecked(); - - delete msg_dmon; - } + if (!s->rhd_front_checked && sm.updated("dMonitoringState")) { + auto state = sm["dMonitoringState"].getDMonitoringState(); + s->rhd_front = state.getIsRHD(); + s->rhd_front_checked = state.getRhdChecked(); } - Message *msg = monitoring_sock->receive(true); - if (msg != NULL) { - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - - float face_prob = event.getDriverState().getFaceProb(); + if (sm.updated("driverState")) { + auto state = sm["driverState"].getDriverState(); + float face_prob = state.getFaceProb(); float face_position[2]; - face_position[0] = event.getDriverState().getFacePosition()[0]; - face_position[1] = event.getDriverState().getFacePosition()[1]; + face_position[0] = state.getFacePosition()[0]; + face_position[1] = state.getFacePosition()[1]; // set front camera metering target - if (face_prob > 0.4) - { + if (face_prob > 0.4) { int x_offset = s->rhd_front ? 0:s->rgb_front_width - 0.5 * s->rgb_front_height; s->front_meteringbox_xmin = x_offset + (face_position[0] + 0.5) * (0.5 * s->rgb_front_height) - 72; s->front_meteringbox_xmax = x_offset + (face_position[0] + 0.5) * (0.5 * s->rgb_front_height) + 72; s->front_meteringbox_ymin = (face_position[1] + 0.5) * (s->rgb_front_height) - 72; s->front_meteringbox_ymax = (face_position[1] + 0.5) * (s->rgb_front_height) + 72; - } - else // use default setting if no face - { + } else {// use default setting if no face s->front_meteringbox_ymin = s->rgb_front_height * 1 / 3; s->front_meteringbox_ymax = s->rgb_front_height * 1; s->front_meteringbox_xmin = s->rhd_front ? 0:s->rgb_front_width * 3 / 5; s->front_meteringbox_xmax = s->rhd_front ? s->rgb_front_width * 2 / 5:s->rgb_front_width; } - - delete msg; } // auto exposure @@ -330,7 +298,7 @@ void* frontview_thread(void *arg) { // send frame event { - if (s->front_frame_sock != NULL) { + if (s->pm != NULL) { capnp::MallocMessageBuilder msg; cereal::Event::Builder event = msg.initRoot(); event.setLogMonoTime(nanos_since_boot()); @@ -349,9 +317,7 @@ void* frontview_thread(void *arg) { framed.setGainFrac(frame_data.gain_frac); framed.setFrameType(cereal::FrameData::FrameType::FRONT); - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - s->front_frame_sock->send((char*)bytes.begin(), bytes.size()); + s->pm->send("frontFrame", msg); } } @@ -367,9 +333,6 @@ void* frontview_thread(void *arg) { //LOGD("front process: %.2fms", t2-t1); } - delete monitoring_sock; - delete dmonstate_sock; - return NULL; } // processing @@ -534,7 +497,7 @@ void* processing_thread(void *arg) { // send frame event { - if (s->frame_sock != NULL) { + if (s->pm != NULL) { capnp::MallocMessageBuilder msg; cereal::Event::Builder event = msg.initRoot(); event.setLogMonoTime(nanos_since_boot()); @@ -570,9 +533,7 @@ void* processing_thread(void *arg) { kj::ArrayPtr transform_vs(&s->yuv_transform.v[0], 9); framed.setTransform(transform_vs); - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - s->frame_sock->send((char*)bytes.begin(), bytes.size()); + s->pm->send("frame", msg); } } @@ -633,10 +594,8 @@ void* processing_thread(void *arg) { thumbnaild.setTimestampEof(frame_data.timestamp_eof); thumbnaild.setThumbnail(kj::arrayPtr((const uint8_t*)thumbnail_buffer, thumbnail_len)); - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - if (s->thumbnail_sock != NULL) { - s->thumbnail_sock->send((char*)bytes.begin(), bytes.size()); + if (s->pm != NULL) { + s->pm->send("thumbnail", msg); } free(thumbnail_buffer); @@ -1103,7 +1062,7 @@ void init_buffers(VisionState *s) { s->rgb_front_width = s->cameras.front.ci.frame_width; s->rgb_front_height = s->cameras.front.ci.frame_height; } - + for (int i=0; irgb_front_width, s->rgb_front_height, &s->rgb_front_bufs[i]); @@ -1171,7 +1130,7 @@ void init_buffers(VisionState *s) { assert(err == 0); } - s->prg_rgb_laplacian = build_conv_program(s, s->rgb_width/NUM_SEGMENTS_X, s->rgb_height/NUM_SEGMENTS_Y, + s->prg_rgb_laplacian = build_conv_program(s, s->rgb_width/NUM_SEGMENTS_X, s->rgb_height/NUM_SEGMENTS_Y, 3); s->krnl_rgb_laplacian = clCreateKernel(s->prg_rgb_laplacian, "rgb2gray_conv2d", &err); assert(err == 0); @@ -1297,13 +1256,7 @@ int main(int argc, char *argv[]) { init_buffers(s); #if defined(QCOM) || defined(QCOM2) - s->msg_context = Context::create(); - s->frame_sock = PubSocket::create(s->msg_context, "frame"); - s->front_frame_sock = PubSocket::create(s->msg_context, "frontFrame"); - s->thumbnail_sock = PubSocket::create(s->msg_context, "thumbnail"); - assert(s->frame_sock != NULL); - assert(s->front_frame_sock != NULL); - assert(s->thumbnail_sock != NULL); + s->pm = new PubMaster({"frame", "frontFrame", "thumbnail"}); #endif cameras_open(&s->cameras, &s->camera_bufs[0], &s->focus_bufs[0], &s->stats_bufs[0], &s->front_camera_bufs[0]); @@ -1311,10 +1264,7 @@ int main(int argc, char *argv[]) { party(s); #if defined(QCOM) || defined(QCOM2) - delete s->frame_sock; - delete s->front_frame_sock; - delete s->thumbnail_sock; - delete s->msg_context; + delete s->pm; #endif free_buffers(s); diff --git a/selfdrive/clocksd/SConscript b/selfdrive/clocksd/SConscript index 63c508c4fe..601e64bf16 100644 --- a/selfdrive/clocksd/SConscript +++ b/selfdrive/clocksd/SConscript @@ -1,2 +1,2 @@ -Import('env', 'common', 'messaging') -env.Program('clocksd.cc', LIBS=['diag', 'time_genoff', common, messaging, 'capnp', 'zmq', 'kj']) \ No newline at end of file +Import('env', 'common', 'cereal', 'messaging') +env.Program('clocksd.cc', LIBS=['diag', 'time_genoff', common, cereal, messaging, 'capnp', 'zmq', 'kj']) \ No newline at end of file diff --git a/selfdrive/clocksd/clocksd.cc b/selfdrive/clocksd/clocksd.cc index d289387a8b..2e17058e0e 100644 --- a/selfdrive/clocksd/clocksd.cc +++ b/selfdrive/clocksd/clocksd.cc @@ -4,10 +4,8 @@ #include #include #include -#include #include "messaging.hpp" #include "common/timing.h" -#include "cereal/gen/cpp/log.capnp.h" namespace { int64_t arm_cntpct() { @@ -21,10 +19,7 @@ int main() { setpriority(PRIO_PROCESS, 0, -13); int err = 0; - Context *context = Context::create(); - - PubSocket* clock_publisher = PubSocket::create(context, "clocks"); - assert(clock_publisher != NULL); + PubMaster pm({"clocks"}); int timerfd = timerfd_create(CLOCK_BOOTTIME, 0); assert(timerfd >= 0); @@ -60,13 +55,9 @@ int main() { clocks.setWallTimeNanos(wall_time); clocks.setModemUptimeMillis(modem_uptime_v); - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - clock_publisher->send((char*)bytes.begin(), bytes.size()); + pm.send("clocks", msg); } close(timerfd); - delete clock_publisher; - delete context; return 0; } \ No newline at end of file diff --git a/selfdrive/locationd/SConscript b/selfdrive/locationd/SConscript index 29b5d00d01..fbecaa1bae 100644 --- a/selfdrive/locationd/SConscript +++ b/selfdrive/locationd/SConscript @@ -1,9 +1,9 @@ -Import('env', 'common', 'messaging') +Import('env', 'common', 'cereal', 'messaging') loc_objs = [ "locationd_yawrate.cc", "params_learner.cc", "paramsd.cc"] -loc_libs = [messaging, 'zmq', common, 'capnp', 'kj', 'json11', 'pthread'] +loc_libs = [cereal, messaging, 'zmq', common, 'capnp', 'kj', 'json11', 'pthread'] env.Program("paramsd", loc_objs, LIBS=loc_libs) env.SharedLibrary("locationd", loc_objs, LIBS=loc_libs) diff --git a/selfdrive/locationd/paramsd.cc b/selfdrive/locationd/paramsd.cc index bae43ba367..80eda71053 100644 --- a/selfdrive/locationd/paramsd.cc +++ b/selfdrive/locationd/paramsd.cc @@ -4,12 +4,8 @@ #include #include - -#include #include - #include "json11.hpp" -#include "cereal/gen/cpp/log.capnp.h" #include "common/swaglog.h" #include "common/messaging.h" @@ -30,18 +26,8 @@ void sigpipe_handler(int sig) { int main(int argc, char *argv[]) { signal(SIGPIPE, (sighandler_t)sigpipe_handler); - Context * c = Context::create(); - SubSocket * controls_state_sock = SubSocket::create(c, "controlsState"); - SubSocket * sensor_events_sock = SubSocket::create(c, "sensorEvents"); - SubSocket * camera_odometry_sock = SubSocket::create(c, "cameraOdometry"); - PubSocket * live_parameters_sock = PubSocket::create(c, "liveParameters"); - - assert(controls_state_sock != NULL); - assert(sensor_events_sock != NULL); - assert(camera_odometry_sock != NULL); - assert(live_parameters_sock != NULL); - - Poller * poller = Poller::create({controls_state_sock, sensor_events_sock, camera_odometry_sock}); + SubMaster sm({"controlsState", "sensorEvents", "cameraOdometry"}); + PubMaster pm({"liveParameters"}); Localizer localizer; @@ -104,70 +90,55 @@ int main(int argc, char *argv[]) { // Main loop int save_counter = 0; while (true){ - for (auto s : poller->poll(100)){ - Message * msg = s->receive(); - - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader capnp_msg(amsg); - cereal::Event::Reader event = capnp_msg.getRoot(); - - localizer.handle_log(event); - - auto which = event.which(); - if (which == cereal::Event::CONTROLS_STATE){ - save_counter++; - - double yaw_rate = -localizer.x[0]; - bool valid = learner.update(yaw_rate, localizer.car_speed, localizer.steering_angle); - - double angle_offset_degrees = RADIANS_TO_DEGREES * learner.ao; - double angle_offset_average_degrees = RADIANS_TO_DEGREES * learner.slow_ao; - - capnp::MallocMessageBuilder msg; - cereal::Event::Builder event = msg.initRoot(); - event.setLogMonoTime(nanos_since_boot()); - auto live_params = event.initLiveParameters(); - live_params.setValid(valid); - live_params.setYawRate(localizer.x[0]); - live_params.setGyroBias(localizer.x[1]); - live_params.setAngleOffset(angle_offset_degrees); - live_params.setAngleOffsetAverage(angle_offset_average_degrees); - live_params.setStiffnessFactor(learner.x); - live_params.setSteerRatio(learner.sR); - - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - live_parameters_sock->send((char*)bytes.begin(), bytes.size()); - - // Save parameters every minute - if (save_counter % 6000 == 0) { - json11::Json json = json11::Json::object { - {"carVin", vin}, - {"carFingerprint", fingerprint}, - {"steerRatio", learner.sR}, - {"stiffnessFactor", learner.x}, - {"angleOffsetAverage", angle_offset_average_degrees}, - }; - - std::string out = json.dump(); - std::async(std::launch::async, - [out]{ - write_db_value("LiveParameters", out.c_str(), out.length()); - }); - } + if (sm.update(100) == 0) continue; + + if (sm.updated("controlsState")){ + localizer.handle_log(sm["controlsState"]); + save_counter++; + + double yaw_rate = -localizer.x[0]; + bool valid = learner.update(yaw_rate, localizer.car_speed, localizer.steering_angle); + + double angle_offset_degrees = RADIANS_TO_DEGREES * learner.ao; + double angle_offset_average_degrees = RADIANS_TO_DEGREES * learner.slow_ao; + + capnp::MallocMessageBuilder msg; + cereal::Event::Builder event = msg.initRoot(); + event.setLogMonoTime(nanos_since_boot()); + auto live_params = event.initLiveParameters(); + live_params.setValid(valid); + live_params.setYawRate(localizer.x[0]); + live_params.setGyroBias(localizer.x[1]); + live_params.setAngleOffset(angle_offset_degrees); + live_params.setAngleOffsetAverage(angle_offset_average_degrees); + live_params.setStiffnessFactor(learner.x); + live_params.setSteerRatio(learner.sR); + + pm.send("liveParameters", msg); + + // Save parameters every minute + if (save_counter % 6000 == 0) { + json11::Json json = json11::Json::object { + {"carVin", vin}, + {"carFingerprint", fingerprint}, + {"steerRatio", learner.sR}, + {"stiffnessFactor", learner.x}, + {"angleOffsetAverage", angle_offset_average_degrees}, + }; + + std::string out = json.dump(); + std::async(std::launch::async, + [out]{ + write_db_value("LiveParameters", out.c_str(), out.length()); + }); } - delete msg; } + if (sm.updated("sensorEvents")){ + localizer.handle_log(sm["sensorEvents"]); + } + if (sm.updated("cameraOdometry")){ + localizer.handle_log(sm["cameraOdometry"]); + } } - - delete live_parameters_sock; - delete controls_state_sock; - delete camera_odometry_sock; - delete sensor_events_sock; - delete poller; - delete c; - return 0; } diff --git a/selfdrive/locationd/ublox_msg.cc b/selfdrive/locationd/ublox_msg.cc index 42ea38e6d6..6f3bd6b635 100644 --- a/selfdrive/locationd/ublox_msg.cc +++ b/selfdrive/locationd/ublox_msg.cc @@ -1,6 +1,5 @@ #include #include -#include #include #include #include @@ -12,13 +11,8 @@ #include #include #include -#include -#include #include -#include -#include "cereal/gen/cpp/log.capnp.h" - #include "common/params.h" #include "common/swaglog.h" #include "common/timing.h" diff --git a/selfdrive/locationd/ubloxd.cc b/selfdrive/locationd/ubloxd.cc index 2c28f14228..13420f05a5 100644 --- a/selfdrive/locationd/ubloxd.cc +++ b/selfdrive/locationd/ubloxd.cc @@ -12,12 +12,8 @@ #include #include #include -#include -#include #include "messaging.hpp" -#include -#include "cereal/gen/cpp/log.capnp.h" #include "common/params.h" #include "common/swaglog.h" diff --git a/selfdrive/locationd/ubloxd_main.cc b/selfdrive/locationd/ubloxd_main.cc index 4e07932fff..43989a338a 100644 --- a/selfdrive/locationd/ubloxd_main.cc +++ b/selfdrive/locationd/ubloxd_main.cc @@ -12,13 +12,8 @@ #include #include #include -#include -#include #include "messaging.hpp" -#include -#include "cereal/gen/cpp/log.capnp.h" - #include "common/util.h" #include "common/params.h" #include "common/swaglog.h" @@ -33,7 +28,7 @@ void set_do_exit(int sig) { } using namespace ublox; - +const long ZMQ_POLL_TIMEOUT = 1000; // In miliseconds int ubloxd_main(poll_ubloxraw_msg_func poll_func, send_gps_event_func send_func) { LOGW("starting ubloxd"); signal(SIGINT, (sighandler_t) set_do_exit); @@ -41,30 +36,15 @@ int ubloxd_main(poll_ubloxraw_msg_func poll_func, send_gps_event_func send_func) UbloxMsgParser parser; - Context * c = Context::create(); - PubSocket * gpsLocationExternal = PubSocket::create(c, "gpsLocationExternal"); - PubSocket * ubloxGnss = PubSocket::create(c, "ubloxGnss"); - SubSocket * ubloxRaw = SubSocket::create(c, "ubloxRaw"); - - assert(gpsLocationExternal != NULL); - assert(ubloxGnss != NULL); - assert(ubloxRaw != NULL); - - Poller * poller = Poller::create({ubloxRaw}); - + SubMaster sm({"ubloxRaw"}); + PubMaster pm({"ubloxGnss", "gpsLocationExternal"}); while (!do_exit) { - Message * msg = poll_func(poller); - if (msg == NULL) continue; - - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); + if (sm.update(ZMQ_POLL_TIMEOUT) == 0) continue; - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - - const uint8_t *data = event.getUbloxRaw().begin(); - size_t len = event.getUbloxRaw().size(); + auto ubloxRaw = sm["ubloxRaw"].getUbloxRaw(); + const uint8_t *data = ubloxRaw.begin(); + size_t len = ubloxRaw.size(); size_t bytes_consumed = 0; while(bytes_consumed < len && !do_exit) { size_t bytes_consumed_this_time = 0U; @@ -76,7 +56,7 @@ int ubloxd_main(poll_ubloxraw_msg_func poll_func, send_gps_event_func send_func) auto words = parser.gen_solution(); if(words.size() > 0) { auto bytes = words.asBytes(); - send_func(gpsLocationExternal, bytes.begin(), bytes.size()); + pm.send("gpsLocationExternal", bytes.begin(), bytes.size()); } } else LOGW("Unknown nav msg id: 0x%02X", parser.msg_id()); @@ -86,14 +66,14 @@ int ubloxd_main(poll_ubloxraw_msg_func poll_func, send_gps_event_func send_func) auto words = parser.gen_raw(); if(words.size() > 0) { auto bytes = words.asBytes(); - send_func(ubloxGnss, bytes.begin(), bytes.size()); + pm.send("ubloxGnss", bytes.begin(), bytes.size()); } } else if(parser.msg_id() == MSG_RXM_SFRBX) { //LOGD("MSG_RXM_SFRBX"); auto words = parser.gen_nav_data(); if(words.size() > 0) { auto bytes = words.asBytes(); - send_func(ubloxGnss, bytes.begin(), bytes.size()); + pm.send("ubloxGnss", bytes.begin(), bytes.size()); } } else LOGW("Unknown rxm msg id: 0x%02X", parser.msg_id()); @@ -103,7 +83,7 @@ int ubloxd_main(poll_ubloxraw_msg_func poll_func, send_gps_event_func send_func) auto words = parser.gen_mon_hw(); if(words.size() > 0) { auto bytes = words.asBytes(); - send_func(ubloxGnss, bytes.begin(), bytes.size()); + pm.send("ubloxGnss", bytes.begin(), bytes.size()); } } else { LOGW("Unknown mon msg id: 0x%02X", parser.msg_id()); @@ -114,14 +94,7 @@ int ubloxd_main(poll_ubloxraw_msg_func poll_func, send_gps_event_func send_func) } bytes_consumed += bytes_consumed_this_time; } - delete msg; } - delete poller; - delete ubloxRaw; - delete ubloxGnss; - delete gpsLocationExternal; - delete c; - return 0; } diff --git a/selfdrive/locationd/ubloxd_test.cc b/selfdrive/locationd/ubloxd_test.cc index ec65577f2a..fc7146625f 100644 --- a/selfdrive/locationd/ubloxd_test.cc +++ b/selfdrive/locationd/ubloxd_test.cc @@ -12,14 +12,10 @@ #include #include #include -#include -#include #include #include "messaging.hpp" #include "impl_zmq.hpp" -#include -#include "cereal/gen/cpp/log.capnp.h" #include "common/params.h" #include "common/swaglog.h" diff --git a/selfdrive/logcatd/SConscript b/selfdrive/logcatd/SConscript index 1040c3af70..67ad673002 100644 --- a/selfdrive/logcatd/SConscript +++ b/selfdrive/logcatd/SConscript @@ -1,2 +1,2 @@ -Import('env', 'messaging') -env.Program('logcatd.cc', LIBS=[messaging, 'cutils', 'zmq', 'czmq', 'capnp', 'kj']) +Import('env', 'cereal', 'messaging') +env.Program('logcatd.cc', LIBS=[cereal, messaging, 'cutils', 'zmq', 'czmq', 'capnp', 'kj']) diff --git a/selfdrive/logcatd/logcatd.cc b/selfdrive/logcatd/logcatd.cc index 9a7f1ba520..181e29df11 100644 --- a/selfdrive/logcatd/logcatd.cc +++ b/selfdrive/logcatd/logcatd.cc @@ -7,9 +7,7 @@ #include #include -#include #include "common/timing.h" -#include "cereal/gen/cpp/log.capnp.h" #include "messaging.hpp" int main() { @@ -27,10 +25,7 @@ int main() { assert(crash_logger); struct logger *kernel_logger = android_logger_open(logger_list, (log_id_t)5); // LOG_ID_KERNEL assert(kernel_logger); - - Context * c = Context::create(); - PubSocket * androidLog = PubSocket::create(c, "androidLog"); - assert(androidLog != NULL); + PubMaster pm({"androidLog"}); while (1) { log_msg log_msg; @@ -57,15 +52,9 @@ int main() { androidEntry.setTag(entry.tag); androidEntry.setMessage(entry.message); - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - androidLog->send((char*)bytes.begin(), bytes.size()); + pm.send("androidLog", msg); } android_logger_list_close(logger_list); - - delete androidLog; - delete c; - return 0; } diff --git a/selfdrive/loggerd/SConscript b/selfdrive/loggerd/SConscript index cb5dfa6ee9..2e5dae5553 100644 --- a/selfdrive/loggerd/SConscript +++ b/selfdrive/loggerd/SConscript @@ -1,9 +1,9 @@ -Import('env', 'arch', 'messaging', 'common', 'visionipc') +Import('env', 'arch', 'cereal', 'messaging', 'common', 'visionipc') src = ['loggerd.cc', 'logger.cc'] libs = ['zmq', 'czmq', 'capnp', 'kj', 'z', 'avformat', 'avcodec', 'swscale', 'avutil', - 'yuv', 'bz2', common, messaging, visionipc] + 'yuv', 'bz2', common, cereal, messaging, visionipc] if arch == "aarch64": src += ['encoder.c', 'raw_logger.cc'] diff --git a/selfdrive/loggerd/loggerd.cc b/selfdrive/loggerd/loggerd.cc index 93e80abbf0..b7c7303c0e 100644 --- a/selfdrive/loggerd/loggerd.cc +++ b/selfdrive/loggerd/loggerd.cc @@ -21,10 +21,7 @@ #include #include - #include -#include - #ifdef QCOM #include #endif diff --git a/selfdrive/modeld/SConscript b/selfdrive/modeld/SConscript index f70057d649..1bf1d345de 100644 --- a/selfdrive/modeld/SConscript +++ b/selfdrive/modeld/SConscript @@ -1,7 +1,7 @@ -Import('env', 'arch', 'messaging', 'common', 'gpucommon', 'visionipc') +Import('env', 'arch', 'cereal', 'messaging', 'common', 'gpucommon', 'visionipc') lenv = env.Clone() -libs = [messaging, common, 'OpenCL', 'SNPE', 'capnp', 'zmq', 'kj', 'yuv', gpucommon, visionipc] +libs = [cereal, messaging, common, 'OpenCL', 'SNPE', 'capnp', 'zmq', 'kj', 'yuv', gpucommon, visionipc] TEST_THNEED = False diff --git a/selfdrive/modeld/dmonitoringmodeld.cc b/selfdrive/modeld/dmonitoringmodeld.cc index d17247f9e5..c987e44482 100644 --- a/selfdrive/modeld/dmonitoringmodeld.cc +++ b/selfdrive/modeld/dmonitoringmodeld.cc @@ -26,10 +26,8 @@ int main(int argc, char **argv) { set_realtime_priority(1); // messaging - Context *msg_context = Context::create(); - PubSocket *dmonitoring_sock = PubSocket::create(msg_context, "driverState"); - SubSocket *dmonstate_sock = SubSocket::create(msg_context, "dMonitoringState", "127.0.0.1", true); - assert(dmonstate_sock != NULL); + SubMaster sm({"dMonitoringState"}); + PubMaster pm({"driverState"}); // init the models DMonitoringModelState dmonitoringmodel; @@ -61,17 +59,10 @@ int main(int argc, char **argv) { //printf("frame_id: %d %dx%d\n", extra.frame_id, buf_info.width, buf_info.height); if (!dmonitoringmodel.is_rhd_checked) { if (chk_counter >= RHD_CHECK_INTERVAL) { - Message *msg = dmonstate_sock->receive(true); - if (msg != NULL) { - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - - dmonitoringmodel.is_rhd = event.getDMonitoringState().getIsRHD(); - dmonitoringmodel.is_rhd_checked = event.getDMonitoringState().getRhdChecked(); - delete msg; + if (sm.update(0) > 0) { + auto state = sm["dMonitoringState"].getDMonitoringState(); + dmonitoringmodel.is_rhd = state.getIsRHD(); + dmonitoringmodel.is_rhd_checked = state.getRhdChecked(); } chk_counter = 0; } @@ -85,7 +76,7 @@ int main(int argc, char **argv) { double t2 = millis_since_boot(); // send dm packet - dmonitoring_publish(dmonitoring_sock, extra.frame_id, res); + dmonitoring_publish(pm, extra.frame_id, res); LOGD("dmonitoring process: %.2fms, from last %.2fms", t2-t1, t1-last); last = t1; @@ -95,8 +86,6 @@ int main(int argc, char **argv) { visionstream_destroy(&stream); - delete dmonitoring_sock; - delete msg_context; dmonitoring_free(&dmonitoringmodel); return 0; diff --git a/selfdrive/modeld/modeld.cc b/selfdrive/modeld/modeld.cc index 3c5011eb2a..764f9aadeb 100644 --- a/selfdrive/modeld/modeld.cc +++ b/selfdrive/modeld/modeld.cc @@ -7,7 +7,7 @@ #include "common/swaglog.h" #include "models/driving.h" - +#include "messaging.hpp" volatile sig_atomic_t do_exit = 0; static void set_do_exit(int sig) { @@ -23,12 +23,7 @@ void* live_thread(void *arg) { int err; set_thread_name("live"); - Context * c = Context::create(); - SubSocket * live_calibration_sock = SubSocket::create(c, "liveCalibration"); - assert(live_calibration_sock != NULL); - - Poller * poller = Poller::create({live_calibration_sock}); - + SubMaster sm({"liveCalibration"}); /* import numpy as np from common.transformations.model import medmodel_frame_from_road_frame @@ -48,49 +43,31 @@ void* live_thread(void *arg) { 0.0, 0.0, 1.0; while (!do_exit) { - for (auto sock : poller->poll(10)){ - Message * msg = sock->receive(); - - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - - if (event.isLiveCalibration()) { - pthread_mutex_lock(&transform_lock); + if (sm.update(10) > 0){ + pthread_mutex_lock(&transform_lock); - auto extrinsic_matrix = event.getLiveCalibration().getExtrinsicMatrix(); - Eigen::Matrix extrinsic_matrix_eigen; - for (int i = 0; i < 4*3; i++){ - extrinsic_matrix_eigen(i / 4, i % 4) = extrinsic_matrix[i]; - } + auto extrinsic_matrix = sm["liveCalibration"].getLiveCalibration().getExtrinsicMatrix(); + Eigen::Matrix extrinsic_matrix_eigen; + for (int i = 0; i < 4*3; i++){ + extrinsic_matrix_eigen(i / 4, i % 4) = extrinsic_matrix[i]; + } - auto camera_frame_from_road_frame = eon_intrinsics * extrinsic_matrix_eigen; - Eigen::Matrix camera_frame_from_ground; - camera_frame_from_ground.col(0) = camera_frame_from_road_frame.col(0); - camera_frame_from_ground.col(1) = camera_frame_from_road_frame.col(1); - camera_frame_from_ground.col(2) = camera_frame_from_road_frame.col(3); + auto camera_frame_from_road_frame = eon_intrinsics * extrinsic_matrix_eigen; + Eigen::Matrix camera_frame_from_ground; + camera_frame_from_ground.col(0) = camera_frame_from_road_frame.col(0); + camera_frame_from_ground.col(1) = camera_frame_from_road_frame.col(1); + camera_frame_from_ground.col(2) = camera_frame_from_road_frame.col(3); - auto warp_matrix = camera_frame_from_ground * ground_from_medmodel_frame; + auto warp_matrix = camera_frame_from_ground * ground_from_medmodel_frame; - for (int i=0; i<3*3; i++) { - cur_transform.v[i] = warp_matrix(i / 3, i % 3); - } - - run_model = true; - pthread_mutex_unlock(&transform_lock); + for (int i=0; i<3*3; i++) { + cur_transform.v[i] = warp_matrix(i / 3, i % 3); } - delete msg; + run_model = true; + pthread_mutex_unlock(&transform_lock); } - } - - delete live_calibration_sock; - delete poller; - delete c; - return NULL; } @@ -104,14 +81,8 @@ int main(int argc, char **argv) { assert(err == 0); // messaging - Context *msg_context = Context::create(); - PubSocket *model_sock = PubSocket::create(msg_context, "model"); - PubSocket *posenet_sock = PubSocket::create(msg_context, "cameraOdometry"); - SubSocket *pathplan_sock = SubSocket::create(msg_context, "pathPlan", "127.0.0.1", true); - - assert(model_sock != NULL); - assert(posenet_sock != NULL); - assert(pathplan_sock != NULL); + PubMaster pm({"model", "cameraOdometry"}); + SubMaster sm({"pathPlan"}); // cl init cl_device_id device_id; @@ -194,18 +165,9 @@ int main(int argc, char **argv) { const bool run_model_this_iter = run_model; pthread_mutex_unlock(&transform_lock); - Message *msg = pathplan_sock->receive(true); - if (msg != NULL) { - // TODO: copy and pasted from camerad/main.cc - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - + if (sm.update(0) > 0){ // TODO: path planner timeout? - desire = ((int)event.getPathPlan().getDesire()) - 1; - delete msg; + desire = ((int)sm["pathPlan"].getPathPlan().getDesire()) - 1; } double mt1 = 0, mt2 = 0; @@ -227,8 +189,8 @@ int main(int argc, char **argv) { model_transform, NULL, vec_desire); mt2 = millis_since_boot(); - model_publish(model_sock, extra.frame_id, model_buf, extra.timestamp_eof); - posenet_publish(posenet_sock, extra.frame_id, model_buf, extra.timestamp_eof); + model_publish(pm, extra.frame_id, model_buf, extra.timestamp_eof); + posenet_publish(pm, extra.frame_id, model_buf, extra.timestamp_eof); LOGD("model process: %.2fms, from last %.2fms", mt2-mt1, mt1-last); last = mt1; @@ -240,11 +202,6 @@ int main(int argc, char **argv) { visionstream_destroy(&stream); - delete model_sock; - delete posenet_sock; - delete pathplan_sock; - delete msg_context; - model_free(&model); LOG("joining live_thread"); diff --git a/selfdrive/modeld/models/dmonitoring.cc b/selfdrive/modeld/models/dmonitoring.cc index 41de617ad6..21fac61c28 100644 --- a/selfdrive/modeld/models/dmonitoring.cc +++ b/selfdrive/modeld/models/dmonitoring.cc @@ -143,7 +143,7 @@ DMonitoringResult dmonitoring_eval_frame(DMonitoringModelState* s, void* stream_ return ret; } -void dmonitoring_publish(PubSocket* sock, uint32_t frame_id, const DMonitoringResult res) { +void dmonitoring_publish(PubMaster &pm, uint32_t frame_id, const DMonitoringResult res) { // make msg capnp::MallocMessageBuilder msg; cereal::Event::Builder event = msg.initRoot(); @@ -166,10 +166,7 @@ void dmonitoring_publish(PubSocket* sock, uint32_t frame_id, const DMonitoringRe framed.setLeftBlinkProb(res.left_blink_prob); framed.setRightBlinkProb(res.right_blink_prob); - // send message - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - sock->send((char*)bytes.begin(), bytes.size()); + pm.send("driverState", msg); } void dmonitoring_free(DMonitoringModelState* s) { diff --git a/selfdrive/modeld/models/dmonitoring.h b/selfdrive/modeld/models/dmonitoring.h index dc5d116e98..e8a496dee9 100644 --- a/selfdrive/modeld/models/dmonitoring.h +++ b/selfdrive/modeld/models/dmonitoring.h @@ -5,8 +5,6 @@ #include "commonmodel.h" #include "runners/run.h" -#include "cereal/gen/cpp/log.capnp.h" -#include #include "messaging.hpp" #ifdef __cplusplus @@ -37,7 +35,7 @@ typedef struct DMonitoringModelState { void dmonitoring_init(DMonitoringModelState* s); DMonitoringResult dmonitoring_eval_frame(DMonitoringModelState* s, void* stream_buf, int width, int height); -void dmonitoring_publish(PubSocket *sock, uint32_t frame_id, const DMonitoringResult res); +void dmonitoring_publish(PubMaster &pm, uint32_t frame_id, const DMonitoringResult res); void dmonitoring_free(DMonitoringModelState* s); #ifdef __cplusplus diff --git a/selfdrive/modeld/models/driving.cc b/selfdrive/modeld/models/driving.cc index 5aed70ee68..817fe85662 100644 --- a/selfdrive/modeld/models/driving.cc +++ b/selfdrive/modeld/models/driving.cc @@ -246,7 +246,7 @@ void fill_longi(cereal::ModelData::LongitudinalData::Builder longi, const float longi.setAccelerations(accel); } -void model_publish(PubSocket *sock, uint32_t frame_id, +void model_publish(PubMaster &pm, uint32_t frame_id, const ModelDataRaw net_outputs, uint64_t timestamp_eof) { // make msg capnp::MallocMessageBuilder msg; @@ -292,14 +292,10 @@ void model_publish(PubSocket *sock, uint32_t frame_id, auto meta = framed.initMeta(); fill_meta(meta, net_outputs.meta); - - // send message - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - sock->send((char*)bytes.begin(), bytes.size()); + pm.send("model", msg); } -void posenet_publish(PubSocket *sock, uint32_t frame_id, +void posenet_publish(PubMaster &pm, uint32_t frame_id, const ModelDataRaw net_outputs, uint64_t timestamp_eof) { capnp::MallocMessageBuilder msg; cereal::Event::Builder event = msg.initRoot(); @@ -331,7 +327,5 @@ void posenet_publish(PubSocket *sock, uint32_t frame_id, posenetd.setTimestampEof(timestamp_eof); posenetd.setFrameId(frame_id); - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - sock->send((char*)bytes.begin(), bytes.size()); - } + pm.send("cameraOdometry", msg); +} diff --git a/selfdrive/modeld/models/driving.h b/selfdrive/modeld/models/driving.h index 1a2ac94e1a..a852db4022 100644 --- a/selfdrive/modeld/models/driving.h +++ b/selfdrive/modeld/models/driving.h @@ -13,9 +13,7 @@ #include "commonmodel.h" #include "runners/run.h" -#include "cereal/gen/cpp/log.capnp.h" #include -#include #include "messaging.hpp" #define MODEL_WIDTH 512 @@ -73,8 +71,8 @@ ModelDataRaw model_eval_frame(ModelState* s, cl_command_queue q, void model_free(ModelState* s); void poly_fit(float *in_pts, float *in_stds, float *out); -void model_publish(PubSocket* sock, uint32_t frame_id, +void model_publish(PubMaster &pm, uint32_t frame_id, const ModelDataRaw data, uint64_t timestamp_eof); -void posenet_publish(PubSocket* sock, uint32_t frame_id, +void posenet_publish(PubMaster &pm, uint32_t frame_id, const ModelDataRaw data, uint64_t timestamp_eof); #endif diff --git a/selfdrive/proclogd/SConscript b/selfdrive/proclogd/SConscript index 2b87f8d5e5..e7677099e9 100644 --- a/selfdrive/proclogd/SConscript +++ b/selfdrive/proclogd/SConscript @@ -1,2 +1,2 @@ -Import('env', 'messaging') -env.Program('proclogd.cc', LIBS=[messaging, 'pthread', 'zmq', 'czmq', 'capnp', 'kj']) +Import('env', 'cereal', 'messaging') +env.Program('proclogd.cc', LIBS=[cereal, messaging, 'pthread', 'zmq', 'czmq', 'capnp', 'kj']) diff --git a/selfdrive/proclogd/proclogd.cc b/selfdrive/proclogd/proclogd.cc index 2e7503ca75..878ebc499e 100644 --- a/selfdrive/proclogd/proclogd.cc +++ b/selfdrive/proclogd/proclogd.cc @@ -5,9 +5,6 @@ #include #include - -#include -#include #include #include #include @@ -17,8 +14,6 @@ #include #include "messaging.hpp" -#include -#include "cereal/gen/cpp/log.capnp.h" #include "common/timing.h" #include "common/utilpp.h" @@ -35,10 +30,7 @@ struct ProcCache { int main() { int err; - - Context * c = Context::create(); - PubSocket * publisher = PubSocket::create(c, "procLog"); - assert(publisher != NULL); + PubMaster publisher({"procLog"}); double jiffy = sysconf(_SC_CLK_TCK); size_t page_size = sysconf(_SC_PAGE_SIZE); @@ -233,15 +225,10 @@ int main() { } } - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - publisher->send((char*)bytes.begin(), bytes.size()); + publisher.send("procLog", msg); usleep(2000000); // 2 secs } - delete publisher; - delete c; - return 0; } diff --git a/selfdrive/sensord/SConscript b/selfdrive/sensord/SConscript index 4c9e24329a..b578ac9514 100644 --- a/selfdrive/sensord/SConscript +++ b/selfdrive/sensord/SConscript @@ -1,5 +1,5 @@ -Import('env', 'common', 'messaging') -env.Program('_sensord', 'sensors.cc', LIBS=['hardware', common, messaging, 'capnp', 'zmq', 'kj']) +Import('env', 'common', 'cereal', 'messaging') +env.Program('_sensord', 'sensors.cc', LIBS=['hardware', common, cereal, messaging, 'capnp', 'zmq', 'kj']) lenv = env.Clone() lenv['LIBPATH'] += ['/system/vendor/lib64'] -lenv.Program('_gpsd', ['gpsd.cc'], LIBS=['hardware', common, 'diag', 'time_genoff', messaging, 'capnp', 'zmq', 'kj']) +lenv.Program('_gpsd', ['gpsd.cc'], LIBS=['hardware', common, 'diag', 'time_genoff', cereal, messaging, 'capnp', 'zmq', 'kj']) diff --git a/selfdrive/sensord/gpsd.cc b/selfdrive/sensord/gpsd.cc index 4f3b7079b1..7fdad45c6b 100644 --- a/selfdrive/sensord/gpsd.cc +++ b/selfdrive/sensord/gpsd.cc @@ -16,21 +16,15 @@ #include #include -#include - #include "messaging.hpp" #include "common/timing.h" #include "common/swaglog.h" -#include "cereal/gen/cpp/log.capnp.h" - volatile sig_atomic_t do_exit = 0; namespace { -Context *gps_context; -PubSocket *gps_publisher; -PubSocket *gps_location_publisher; +PubMaster *pm; const GpsInterface* gGpsInterface = NULL; const AGpsInterface* gAGpsInterface = NULL; @@ -53,10 +47,7 @@ void nmea_callback(GpsUtcTime timestamp, const char* nmea, int length) { nmeaData.setLocalWallTime(log_time_wall); nmeaData.setNmea(nmea); - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - // printf("gps send %d\n", bytes.size()); - gps_publisher->send((char*)bytes.begin(), bytes.size()); + pm->send("gpsNMEA", msg); } void location_callback(GpsLocation* location) { @@ -78,9 +69,7 @@ void location_callback(GpsLocation* location) { locationData.setTimestamp(location->timestamp); locationData.setSource(cereal::GpsLocationData::SensorSource::ANDROID); - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - gps_location_publisher->send((char*)bytes.begin(), bytes.size()); + pm->send("gpsLocation", msg); } pthread_t create_thread_callback(const char* name, void (*start)(void *), void* arg) { @@ -125,9 +114,8 @@ AGpsCallbacks agps_callbacks = { create_thread_callback, }; - - void gps_init() { + pm = new PubMaster({"gpsNMEA", "gpsLocation"}); LOG("*** init GPS"); hw_module_t* module = NULL; hw_get_module(GPS_HARDWARE_MODULE_ID, (hw_module_t const**)&module); @@ -156,15 +144,10 @@ void gps_init() { GPS_POSITION_RECURRENCE_PERIODIC, 100, 0, 0); - gps_context = Context::create(); - gps_publisher = PubSocket::create(gps_context, "gpsNMEA"); - gps_location_publisher = PubSocket::create(gps_context, "gpsLocation"); - - assert(gps_publisher != NULL); - assert(gps_location_publisher != NULL); } void gps_destroy() { + delete pm; gGpsInterface->stop(); gGpsInterface->cleanup(); } diff --git a/selfdrive/sensord/sensors.cc b/selfdrive/sensord/sensors.cc index 5e8741dbb3..b42482d853 100644 --- a/selfdrive/sensord/sensors.cc +++ b/selfdrive/sensord/sensors.cc @@ -13,18 +13,13 @@ #include #include - #include #include -#include - #include "messaging.hpp" #include "common/timing.h" #include "common/swaglog.h" -#include "cereal/gen/cpp/log.capnp.h" - #define SENSOR_ACCELEROMETER 1 #define SENSOR_MAGNETOMETER 2 #define SENSOR_GYRO 4 @@ -51,15 +46,10 @@ void sigpipe_handler(int sig) { re_init_sensors = true; } - void sensor_loop() { LOG("*** sensor loop"); - - while (!do_exit) { - Context * c = Context::create(); - PubSocket * sensor_events_sock = PubSocket::create(c, "sensorEvents"); - assert(sensor_events_sock != NULL); + PubMaster pm({"sensorEvents"}); struct sensors_poll_device_t* device; struct sensors_module_t* module; @@ -107,7 +97,6 @@ void sensor_loop() { static const size_t numEvents = 16; sensors_event_t buffer[numEvents]; - while (!do_exit) { int n = device->poll(device, buffer, numEvents); if (n == 0) continue; @@ -215,9 +204,7 @@ void sensor_loop() { log_i++; } - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - sensor_events_sock->send((char*)bytes.begin(), bytes.size()); + pm.send("sensorEvents", msg); if (re_init_sensors){ LOGE("Resetting sensors"); @@ -226,8 +213,6 @@ void sensor_loop() { } } sensors_close(device); - delete sensor_events_sock; - delete c; } } diff --git a/selfdrive/ui/ui.cc b/selfdrive/ui/ui.cc index 03eed94192..29c866cab0 100644 --- a/selfdrive/ui/ui.cc +++ b/selfdrive/ui/ui.cc @@ -1,4 +1,3 @@ -#include "ui.hpp" #include #include #include @@ -6,7 +5,6 @@ #include #include #include -#include #include #include "common/util.h" #include "common/timing.h" @@ -14,6 +12,7 @@ #include "common/touch.h" #include "common/visionimg.h" #include "common/params.h" +#include "ui.hpp" static int last_brightness = -1; static void set_brightness(UIState *s, int brightness) { @@ -72,9 +71,7 @@ static void update_offroad_layout_state(UIState *s) { auto layout = event.initUiLayoutState(); layout.setActiveApp(s->active_app); layout.setSidebarCollapsed(s->scene.uilayout_sidebarcollapsed); - auto words = capnp::messageToFlatArray(msg); - auto bytes = words.asBytes(); - s->offroad_sock->send((char*)bytes.begin(), bytes.size()); + s->pm->send("offroadLayout", msg); LOGD("setting active app to %d with sidebar %d", (int)s->active_app, s->scene.uilayout_sidebarcollapsed); } @@ -214,50 +211,13 @@ static void ui_init(UIState *s) { memset(s, 0, sizeof(UIState)); pthread_mutex_init(&s->lock, NULL); - - s->ctx = Context::create(); - s->model_sock = SubSocket::create(s->ctx, "model"); - s->controlsstate_sock = SubSocket::create(s->ctx, "controlsState"); - s->uilayout_sock = SubSocket::create(s->ctx, "uiLayoutState"); - s->livecalibration_sock = SubSocket::create(s->ctx, "liveCalibration"); - s->radarstate_sock = SubSocket::create(s->ctx, "radarState"); - s->thermal_sock = SubSocket::create(s->ctx, "thermal"); - s->health_sock = SubSocket::create(s->ctx, "health"); - s->ubloxgnss_sock = SubSocket::create(s->ctx, "ubloxGnss"); - s->driverstate_sock = SubSocket::create(s->ctx, "driverState"); - s->dmonitoring_sock = SubSocket::create(s->ctx, "dMonitoringState"); - s->offroad_sock = PubSocket::create(s->ctx, "offroadLayout"); - - assert(s->model_sock != NULL); - assert(s->controlsstate_sock != NULL); - assert(s->uilayout_sock != NULL); - assert(s->livecalibration_sock != NULL); - assert(s->radarstate_sock != NULL); - assert(s->thermal_sock != NULL); - assert(s->health_sock != NULL); - assert(s->ubloxgnss_sock != NULL); - assert(s->driverstate_sock != NULL); - assert(s->dmonitoring_sock != NULL); - assert(s->offroad_sock != NULL); - - s->poller = Poller::create({ - s->model_sock, - s->controlsstate_sock, - s->uilayout_sock, - s->livecalibration_sock, - s->radarstate_sock, - s->thermal_sock, - s->health_sock, - s->ubloxgnss_sock, - s->driverstate_sock, - s->dmonitoring_sock - }); - + s->sm = new SubMaster({"model", "controlsState", "uiLayoutState", "liveCalibration", "radarState", "thermal", + "health", "ubloxGnss", "driverState", "dMonitoringState", "offroadLayout" #ifdef SHOW_SPEEDLIMIT - s->map_data_sock = SubSocket::create(s->ctx, "liveMapData"); - assert(s->map_data_sock != NULL); - s->poller->registerSocket(s->map_data_sock); + , "liveMapData" #endif + }); + s->pm = new PubMaster({"offroadLayout"}); s->ipc_fd = -1; @@ -369,17 +329,11 @@ static void update_status(UIState *s, int status) { } } -void handle_message(UIState *s, Message* msg) { - auto amsg = kj::heapArray((msg->getSize() / sizeof(capnp::word)) + 1); - memcpy(amsg.begin(), msg->getData(), msg->getSize()); - capnp::FlatArrayMessageReader cmsg(amsg); - cereal::Event::Reader event = cmsg.getRoot(); - - auto which = event.which(); +void handle_message(UIState *s, SubMaster &sm) { UIScene &scene = s->scene; - if (which == cereal::Event::CONTROLS_STATE && s->started) { + if (s->started && sm.updated("controlsState")) { + auto event = sm["controlsState"]; auto data = event.getControlsState(); - s->controls_timeout = 1 * UI_FREQ; scene.frontview = data.getRearViewCam(); if (!scene.frontview){ s->controls_seen = true; } @@ -394,7 +348,6 @@ void handle_message(UIState *s, Message* msg) { scene.engageable = data.getEngageable(); scene.gps_planner_active = data.getGpsPlannerActive(); scene.monitoring_active = data.getDriverMonitoringOn(); - scene.decel_for_model = data.getDecelForModel(); auto alert_sound = data.getAlertSound(); const auto sound_none = cereal::CarControl::HUDControl::AudibleAlert::NONE; @@ -438,8 +391,9 @@ void handle_message(UIState *s, Message* msg) { } } } - } else if (which == cereal::Event::RADAR_STATE) { - auto data = event.getRadarState(); + } + if (sm.updated("radarState")) { + auto data = sm["radarState"].getRadarState(); auto leaddatad = data.getLeadOne(); scene.lead_status = leaddatad.getStatus(); @@ -451,40 +405,44 @@ void handle_message(UIState *s, Message* msg) { scene.lead_d_rel2 = leaddatad2.getDRel(); scene.lead_y_rel2 = leaddatad2.getYRel(); scene.lead_v_rel2 = leaddatad2.getVRel(); - s->livempc_or_radarstate_changed = true; - } else if (which == cereal::Event::LIVE_CALIBRATION) { + } + if (sm.updated("liveCalibration")) { scene.world_objects_visible = true; - auto extrinsicl = event.getLiveCalibration().getExtrinsicMatrix(); + auto extrinsicl = sm["liveCalibration"].getLiveCalibration().getExtrinsicMatrix(); for (int i = 0; i < 3 * 4; i++) { scene.extrinsic_matrix.v[i] = extrinsicl[i]; } - } else if (which == cereal::Event::MODEL) { - scene.model = read_model(event.getModel()); + } + if (sm.updated("model")) { + scene.model = read_model(sm["model"].getModel()); s->model_changed = true; - } else if (which == cereal::Event::LIVE_MPC) { - auto data = event.getLiveMpc(); - - auto x_list = data.getX(); - auto y_list = data.getY(); - for (int i = 0; i < 50; i++){ - scene.mpc_x[i] = x_list[i]; - scene.mpc_y[i] = y_list[i]; - } - s->livempc_or_radarstate_changed = true; - } else if (which == cereal::Event::UI_LAYOUT_STATE) { - auto data = event.getUiLayoutState(); - + } + // else if (which == cereal::Event::LIVE_MPC) { + // auto data = event.getLiveMpc(); + // auto x_list = data.getX(); + // auto y_list = data.getY(); + // for (int i = 0; i < 50; i++){ + // scene.mpc_x[i] = x_list[i]; + // scene.mpc_y[i] = y_list[i]; + // } + // s->livempc_or_radarstate_changed = true; + // } + if (sm.updated("uiLayoutState")) { + auto data = sm["uiLayoutState"].getUiLayoutState(); s->active_app = data.getActiveApp(); scene.uilayout_sidebarcollapsed = data.getSidebarCollapsed(); if (data.getMockEngaged() != scene.uilayout_mockengaged) { scene.uilayout_mockengaged = data.getMockEngaged(); } - } else if (which == cereal::Event::LIVE_MAP_DATA) { - scene.map_valid = event.getLiveMapData().getMapValid(); - } else if (which == cereal::Event::THERMAL) { - auto data = event.getThermal(); - + } +#ifdef SHOW_SPEEDLIMIT + if (sm.updated("liveMapData")) { + scene.map_valid = sm["liveMapData"].getLiveMapData().getMapValid(); + } +#endif + if (sm.updated("thermal")) { + auto data = sm["thermal"].getThermal(); scene.networkType = data.getNetworkType(); scene.networkStrength = data.getNetworkStrength(); scene.batteryPercent = data.getBatteryPercent(); @@ -494,22 +452,26 @@ void handle_message(UIState *s, Message* msg) { scene.paTemp = data.getPa0(); s->thermal_started = data.getStarted(); - } else if (which == cereal::Event::UBLOX_GNSS) { - auto data = event.getUbloxGnss(); + } + if (sm.updated("ubloxGnss")) { + auto data = sm["ubloxGnss"].getUbloxGnss(); if (data.which() == cereal::UbloxGnss::MEASUREMENT_REPORT) { scene.satelliteCount = data.getMeasurementReport().getNumMeas(); } - } else if (which == cereal::Event::HEALTH) { - scene.hwType = event.getHealth().getHwType(); + } + if (sm.updated("health")) { + scene.hwType = sm["health"].getHealth().getHwType(); s->hardware_timeout = 5*30; // 5 seconds at 30 fps - } else if (which == cereal::Event::DRIVER_STATE) { - auto data = event.getDriverState(); + } + if (sm.updated("driverState")) { + auto data = sm["driverState"].getDriverState(); scene.face_prob = data.getFaceProb(); auto fxy_list = data.getFacePosition(); scene.face_x = fxy_list[0]; scene.face_y = fxy_list[1]; - } else if (which == cereal::Event::D_MONITORING_STATE) { - auto data = event.getDMonitoringState(); + } + if (sm.updated("dMonitoringState")) { + auto data = sm["dMonitoringState"].getDMonitoringState(); scene.is_rhd = data.getIsRHD(); scene.awareness_status = data.getAwarenessStatus(); s->preview_started = data.getIsPreview(); @@ -535,19 +497,10 @@ void handle_message(UIState *s, Message* msg) { } static void check_messages(UIState *s) { - while(true) { - auto polls = s->poller->poll(0); - - if (polls.size() == 0) + while (true) { + if (s->sm->update(0) == 0) break; - - for (auto sock : polls){ - Message *msg = sock->receive(); - if (msg) { - handle_message(s, msg); - delete msg; - } - } + handle_message(s, *(s->sm)); } } @@ -772,17 +725,7 @@ static void* vision_connect_thread(void *args) { s->vision_connect_firstrun = true; // Drain sockets - while (true){ - auto polls = s->poller->poll(0); - if (polls.size() == 0) - break; - - for (auto sock : polls){ - Message * msg = sock->receive(); - if (msg == NULL) continue; - delete msg; - } - } + s->sm->drain(); pthread_mutex_unlock(&s->lock); } @@ -1071,6 +1014,7 @@ int main(int argc, char* argv[]) { err = pthread_join(connect_thread_handle, NULL); assert(err == 0); - + delete s->sm; + delete s->pm; return 0; } diff --git a/selfdrive/ui/ui.hpp b/selfdrive/ui/ui.hpp index 771b5501cc..9d3f15fa33 100644 --- a/selfdrive/ui/ui.hpp +++ b/selfdrive/ui/ui.hpp @@ -1,6 +1,8 @@ #ifndef _UI_H #define _UI_H -#include "cereal/gen/cpp/log.capnp.h" + +#include "messaging.hpp" + #ifdef __APPLE__ #include #define NANOVG_GL3_IMPLEMENTATION @@ -12,7 +14,6 @@ #define nvgCreate nvgCreateGLES3 #endif -#include #include #include "nanovg.h" @@ -21,7 +22,6 @@ #include "common/visionimg.h" #include "common/framebuffer.h" #include "common/modeldata.h" -#include "messaging.hpp" #include "sound.hpp" #define STATUS_STOPPED 0 @@ -203,21 +203,8 @@ typedef struct UIState { int img_network[6]; // sockets - Context *ctx; - SubSocket *model_sock; - SubSocket *controlsstate_sock; - SubSocket *livecalibration_sock; - SubSocket *radarstate_sock; - SubSocket *map_data_sock; - SubSocket *uilayout_sock; - SubSocket *thermal_sock; - SubSocket *health_sock; - SubSocket *ubloxgnss_sock; - SubSocket *driverstate_sock; - SubSocket *dmonitoring_sock; - PubSocket *offroad_sock; - Poller * poller; - Poller * ublox_poller; + SubMaster *sm; + PubMaster *pm; cereal::UiLayoutState::App active_app;