From 6f860af9d13999938786081f3e981c140eb3644c Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Wed, 8 May 2019 14:10:55 +0200 Subject: [PATCH 01/17] add unthreaded version of MQTT node type --- include/villas/nodes/mqtt_unthreaded.hpp | 49 ++++ lib/nodes/CMakeLists.txt | 2 +- lib/nodes/mqtt_unthreaded.cpp | 339 +++++++++++++++++++++++ 3 files changed, 389 insertions(+), 1 deletion(-) create mode 100644 include/villas/nodes/mqtt_unthreaded.hpp create mode 100644 lib/nodes/mqtt_unthreaded.cpp diff --git a/include/villas/nodes/mqtt_unthreaded.hpp b/include/villas/nodes/mqtt_unthreaded.hpp new file mode 100644 index 000000000..4b9a37af1 --- /dev/null +++ b/include/villas/nodes/mqtt_unthreaded.hpp @@ -0,0 +1,49 @@ +/** Node type: mqtt_unthreaded + * + * @file + * @author Steffen Vogel + * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** + * @addtogroup mqtt_unthreaded mqtt unthreaded node type + * @ingroup node + * @{ + */ + +#pragma once + +#include + +/** @see node_type::open */ +int mqtt_unthreaded_start(struct node *n); + +/** @see node_type::close */ +int mqtt_unthreaded_stop(struct node *n); + +/** @see node_type::read */ +int mqtt_unthreaded_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); + +/** @see node_type::write */ +int mqtt_unthreaded_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); + +/** perform MQTT network operations */ +int mqtt_unthreaded_loop(struct node *n); + +/** @} */ diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 48c0d2d5a..bc4110fe1 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -122,7 +122,7 @@ endif() # Enable MQTT support if(WITH_NODE_MQTT) - list(APPEND NODE_SRC mqtt.cpp) + list(APPEND NODE_SRC mqtt.cpp mqtt_unthreaded.cpp) list(APPEND INCLUDE_DIRS ${MOSQUITTO_INCLUDE_DIRS}) list(APPEND LIBRARIES ${MOSQUITTO_LIBRARIES}) endif() diff --git a/lib/nodes/mqtt_unthreaded.cpp b/lib/nodes/mqtt_unthreaded.cpp new file mode 100644 index 000000000..dafaefabb --- /dev/null +++ b/lib/nodes/mqtt_unthreaded.cpp @@ -0,0 +1,339 @@ +/** Node type: mqtt_unthreaded + * + * @author Steffen Vogel + * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include + +#include +#include +#include +#include + +static void mqtt_unthreaded_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) +{ + switch (level) { + case MOSQ_LOG_NONE: + case MOSQ_LOG_INFO: + case MOSQ_LOG_NOTICE: + info("MQTT: %s", str); + break; + + case MOSQ_LOG_WARNING: + warning("MQTT: %s", str); + break; + + case MOSQ_LOG_ERR: + error("MQTT: %s", str); + break; + + case MOSQ_LOG_DEBUG: + debug(5, "MQTT: %s", str); + break; + } +} + + +static void mqtt_unthreaded_connect_cb(struct mosquitto *mosq, void *userdata, int result) +{ + struct node *n = (struct node *) userdata; + struct mqtt *m = (struct mqtt *) n->_vd; + + int ret; + + info("MQTT: Node %s connected to broker %s", node_name(n), m->host); + + if (m->subscribe) { + ret = mosquitto_subscribe(m->client, nullptr, m->subscribe, m->qos); + if (ret) + warning("MQTT: failed to subscribe to topic '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); + } + else + warning("MQTT: no subscribe for node %s as no subscribe topic is given", node_name(n)); + + mqtt_unthreaded_loop(n); +} + +static void mqtt_unthreaded_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) +{ + int ret; + struct node *n = (struct node *) userdata; + struct mqtt *m = (struct mqtt *) n->_vd; + struct sample *smps[n->in.vectorize]; + + debug(5, "MQTT: Node %s received a message of %d bytes from broker %s", node_name(n), msg->payloadlen, m->host); + + ret = sample_alloc_many(&m->pool, smps, n->in.vectorize); + if (ret <= 0) { + warning("Pool underrun in subscriber of %s", node_name(n)); + return; + } + + ret = io_sscan(&m->io, (char *) msg->payload, msg->payloadlen, nullptr, smps, n->in.vectorize); + if (ret < 0) { + warning("MQTT: Node %s received an invalid message", node_name(n)); + warning(" Payload: %s", (char *) msg->payload); + return; + } + if (ret == 0) { + debug(4, "MQTT: skip empty message for node %s", node_name(n)); + sample_decref_many(smps, n->in.vectorize); + return; + } + + queue_signalled_push_many(&m->queue, (void **) smps, n->in.vectorize); + mqtt_unthreaded_loop(n); +} + +static void mqtt_unthreaded_disconnect_cb(struct mosquitto *mosq, void *userdata, int result) +{ + struct node *n = (struct node *) userdata; + struct mqtt *m = (struct mqtt *) n->_vd; + + info("MQTT: Node %s disconnected from broker %s", node_name(n), m->host); +} + + +static void mqtt_unthreaded_subscribe_cb(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) +{ + struct node *n = (struct node *) userdata; + struct mqtt *m = (struct mqtt *) n->_vd; + + info("MQTT: Node %s subscribed to broker %s", node_name(n), m->host); + n->state=STATE_STARTED; +} + +int mqtt_unthreaded_check(struct node *n) +{ + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + ret = mosquitto_sub_topic_check(m->subscribe); + if (ret != MOSQ_ERR_SUCCESS) + error("Invalid subscribe topic: '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); + + ret = mosquitto_pub_topic_check(m->publish); + if (ret != MOSQ_ERR_SUCCESS) + error("Invalid publish topic: '%s' for node %s: %s", m->publish, node_name(n), mosquitto_strerror(ret)); + + return 0; +} + + + +int mqtt_unthreaded_start(struct node *n) +{ + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + m->client = mosquitto_new(n->name, 0, (void *) n); + if (!m->client) + return -1; + + if (m->username && m->password) { + ret = mosquitto_username_pw_set(m->client, m->username, m->password); + if (ret) + goto mosquitto_error; + } + + if (m->ssl.enabled) { + ret = mosquitto_tls_set(m->client, m->ssl.cafile, m->ssl.capath, m->ssl.certfile, m->ssl.keyfile, nullptr); + if (ret) + goto mosquitto_error; + + ret = mosquitto_tls_insecure_set(m->client, m->ssl.insecure); + if (ret) + goto mosquitto_error; + } + + mosquitto_log_callback_set(m->client, mqtt_unthreaded_log_cb); + mosquitto_connect_callback_set(m->client, mqtt_unthreaded_connect_cb); + mosquitto_disconnect_callback_set(m->client, mqtt_unthreaded_disconnect_cb); + mosquitto_message_callback_set(m->client, mqtt_unthreaded_message_cb); + mosquitto_subscribe_callback_set(m->client, mqtt_unthreaded_subscribe_cb); + + ret = io_init(&m->io, m->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); + if (ret) + return ret; + + ret = io_check(&m->io); + if (ret) + return ret; + + ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals)), &memory_hugepage); + if (ret) + return ret; + + ret = queue_signalled_init(&m->queue, 1024, &memory_hugepage, 0); + if (ret) + return ret; + + ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); + if (ret) + goto mosquitto_error; + + //The following while loop exists upon completion of subscibe procedure + while(n->state == STATE_PREPARED){ + mqtt_unthreaded_loop(n); + } + + return 0; + +mosquitto_error: + warning("MQTT: %s", mosquitto_strerror(ret)); + + return ret; +} + +int mqtt_unthreaded_stop(struct node *n) +{ + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + ret = mosquitto_disconnect(m->client); + if (ret) + goto mosquitto_error; + + ret = io_destroy(&m->io); + if (ret) + return ret; + + return 0; + +mosquitto_error: + warning("MQTT: %s", mosquitto_strerror(ret)); + + return ret; +} + +int mqtt_unthreaded_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) +{ + int pulled=0; + mqtt_unthreaded_loop(n); + + if(cnt > 0){ + struct mqtt *m = (struct mqtt *) n->_vd; + struct sample *smpt[cnt]; + pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt); + + sample_copy_many(smps, smpt, pulled); + sample_decref_many(smpt, pulled); + } + return pulled; +} + +int mqtt_unthreaded_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) +{ + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + size_t wbytes; + + char data[1500]; + + ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); + if (ret < 0) + return ret; + + if (m->publish) { + ret = mosquitto_publish(m->client, nullptr /* mid */, m->publish, wbytes, data, m->qos, m->retain); + if (ret != MOSQ_ERR_SUCCESS) { + warning("MQTT: publish failed for node %s: %s", node_name(n), mosquitto_strerror(ret)); + return -abs(ret); + } + } + else + warning("MQTT: no publish for node %s possible because no publish topic is given", node_name(n)); + + mqtt_unthreaded_loop(n); + + return cnt; +} + +int mqtt_unthreaded_loop(struct node *n){ + + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + //Carry out network operations in a synchronous way. + ret = mosquitto_loop(m->client, 0, 1); + if(ret){ + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(n), mosquitto_strerror(ret)); + ret = mosquitto_reconnect(m->client); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: reconnection to broker failed for node %s: %s", node_name(n), mosquitto_strerror(ret)); + } + else{ + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(n), mosquitto_strerror(ret)); + } + ret = mosquitto_loop(m->client, -1, 1); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: persisting connection error for node %s: %s", node_name(n), mosquitto_strerror(ret)); + } + } + + return 0; +} + +int mqtt_unthreaded_poll_fds(struct node *n, int fds[]) +{ + struct mqtt *m = (struct mqtt *) n->_vd; + + fds[0] = queue_signalled_fd(&m->queue); + + return 1; +} + +static struct plugin p; + +__attribute__((constructor(110))) +static void register_plugin() { + if (plugins.state == STATE_DESTROYED) + vlist_init(&plugins); + + p.name = "mqtt_unthreaded"; + p.description = "Message Queuing Telemetry Transport (libmosquitto), unthreaded variant"; + p.type = PLUGIN_TYPE_NODE; + p.node.instances.state = STATE_DESTROYED; + p.node.vectorize = 0; + p.node.size = sizeof(struct mqtt); + p.node.type.start = mqtt_type_start; + p.node.type.stop = mqtt_type_stop; + p.node.destroy = mqtt_destroy; + p.node.parse = mqtt_parse; + p.node.check = mqtt_unthreaded_check; + p.node.print = mqtt_print; + p.node.start = mqtt_unthreaded_start; + p.node.stop = mqtt_unthreaded_stop; + p.node.read = mqtt_unthreaded_read; + p.node.write = mqtt_unthreaded_write; + p.node.reverse = mqtt_reverse; + p.node.poll_fds = mqtt_unthreaded_poll_fds; + + vlist_init(&p.node.instances); + vlist_push(&plugins, &p); +} + +__attribute__((destructor(110))) +static void deregister_plugin() { + if (plugins.state != STATE_DESTROYED) + vlist_remove_all(&plugins, &p); +} From 15b815112124ee08a229e297350c607622750791 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Thu, 12 Sep 2019 17:05:18 +0200 Subject: [PATCH 02/17] use a thread per process to execute mosquitto loop, contributes to #248 --- lib/nodes/mqtt.cpp | 73 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 67 insertions(+), 6 deletions(-) diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index a1cf96595..5ab9185f6 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -28,6 +28,40 @@ #include #include +// Each process has a list of clients for which a thread invokes the mosquitto loop +static struct vlist clients; +static pthread_t thread; + +static void * mosquitto_loop_thread(void *ctx) +{ + int ret; + while(true){ + for (unsigned i = 0; i < vlist_length(&clients); i++) { + struct node *c = (struct node *) vlist_at(&clients, i); + struct mqtt *m = (struct mqtt *) c->_vd; + + // execute mosquitto loop for this client + ret = mosquitto_loop(m->client, 0, 1); + if(ret){ + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(c), mosquitto_strerror(ret)); + ret = mosquitto_reconnect(m->client); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: reconnection to broker failed for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + else{ + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + ret = mosquitto_loop(m->client, -1, 1); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: persisting connection error for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + } + } // for loop + } // while(1) + + return nullptr; +} + static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) { switch (level) { @@ -329,9 +363,9 @@ int mqtt_start(struct node *n) if (ret) goto mosquitto_error; - ret = mosquitto_loop_start(m->client); - if (ret) - goto mosquitto_error; + // add client to global list of MQTT clients + // so that thread can call mosquitto loop for this client + vlist_push(&clients, n); return 0; @@ -350,9 +384,9 @@ int mqtt_stop(struct node *n) if (ret) goto mosquitto_error; - ret = mosquitto_loop_stop(m->client, 0); - if (ret) - goto mosquitto_error; + // unregister client from global MQTT client list + // so that mosquitto loop is no longer invoked for this client + vlist_remove_all(&clients, n); ret = io_destroy(&m->io); if (ret) @@ -370,10 +404,21 @@ int mqtt_type_start(villas::node::SuperNode *sn) { int ret; + ret = vlist_init(&clients); + if (ret) { + return ret; + } + ret = mosquitto_lib_init(); if (ret) goto mosquitto_error; + // start thread here to run mosquitto loop for registered clients + ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); + if (ret) { + return ret; + } + return 0; mosquitto_error: @@ -386,10 +431,26 @@ int mqtt_type_stop() { int ret; + // stop thread here that executes mosquitto loop + ret = pthread_cancel(thread); + if (ret) + return ret; + + ret = pthread_join(thread, nullptr); + if (ret) { + return ret; + } + ret = mosquitto_lib_cleanup(); if (ret) goto mosquitto_error; + // when this is called the list of clients should be empty + if (vlist_length(&clients) > 0) { + error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); + } + vlist_destroy(&clients, nullptr, false); + return 0; mosquitto_error: From fc72f73f4076d688fdc178b2c07db6e7307f7a7a Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Fri, 13 Sep 2019 09:02:40 +0200 Subject: [PATCH 03/17] renaming variable to improve readability --- lib/nodes/mqtt.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index 5ab9185f6..1f0880b8d 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -37,23 +37,23 @@ static void * mosquitto_loop_thread(void *ctx) int ret; while(true){ for (unsigned i = 0; i < vlist_length(&clients); i++) { - struct node *c = (struct node *) vlist_at(&clients, i); - struct mqtt *m = (struct mqtt *) c->_vd; + struct node *node = (struct node *) vlist_at(&clients, i); + struct mqtt *m = (struct mqtt *) node->_vd; // execute mosquitto loop for this client ret = mosquitto_loop(m->client, 0, 1); if(ret){ - warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(c), mosquitto_strerror(ret)); + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret)); ret = mosquitto_reconnect(m->client); if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: reconnection to broker failed for node %s: %s", node_name(c), mosquitto_strerror(ret)); + error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret)); } else{ - warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(c), mosquitto_strerror(ret)); + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret)); } ret = mosquitto_loop(m->client, -1, 1); if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: persisting connection error for node %s: %s", node_name(c), mosquitto_strerror(ret)); + error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret)); } } } // for loop From b2184665be066fbfee1c7672b8c0665bab16cb16 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Fri, 13 Sep 2019 09:56:50 +0200 Subject: [PATCH 04/17] improve unthreaded implementation of MQTT node type - set cancel type of MQTT communication management thread to asynchronous - fix removing node from list in mqtt_stop - add a debug output upon invocation of pthread cancel in mqtt_type_stop - contributes to #248 --- lib/nodes/mqtt.cpp | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index 1f0880b8d..c22b1b6c3 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -35,6 +35,13 @@ static pthread_t thread; static void * mosquitto_loop_thread(void *ctx) { int ret; + // set the cancel type of this thread to async + ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + if (ret != 0) { + error("Unable to set cancel type of MQTT communication thread to asynchronous."); + return nullptr; + } + while(true){ for (unsigned i = 0; i < vlist_length(&clients); i++) { struct node *node = (struct node *) vlist_at(&clients, i); @@ -380,14 +387,15 @@ int mqtt_stop(struct node *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; + // unregister client from global MQTT client list + // so that mosquitto loop is no longer invoked for this client + // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect + vlist_remove(&clients, vlist_index(&clients, n)); + ret = mosquitto_disconnect(m->client); if (ret) goto mosquitto_error; - // unregister client from global MQTT client list - // so that mosquitto loop is no longer invoked for this client - vlist_remove_all(&clients, n); - ret = io_destroy(&m->io); if (ret) return ret; @@ -435,6 +443,7 @@ int mqtt_type_stop() ret = pthread_cancel(thread); if (ret) return ret; + debug( 3, "Called pthread_cancel() on MQTT communication management thread."); ret = pthread_join(thread, nullptr); if (ret) { From d76bcdbb076a0a5d342e1e2defcab328db5a3b00 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Fri, 13 Sep 2019 10:15:03 +0200 Subject: [PATCH 05/17] remove files of mqtt_unthreaded node type since this is implemented in mqtt type now --- include/villas/nodes/mqtt_unthreaded.hpp | 49 ---- lib/nodes/CMakeLists.txt | 2 +- lib/nodes/mqtt_unthreaded.cpp | 339 ----------------------- 3 files changed, 1 insertion(+), 389 deletions(-) delete mode 100644 include/villas/nodes/mqtt_unthreaded.hpp delete mode 100644 lib/nodes/mqtt_unthreaded.cpp diff --git a/include/villas/nodes/mqtt_unthreaded.hpp b/include/villas/nodes/mqtt_unthreaded.hpp deleted file mode 100644 index 4b9a37af1..000000000 --- a/include/villas/nodes/mqtt_unthreaded.hpp +++ /dev/null @@ -1,49 +0,0 @@ -/** Node type: mqtt_unthreaded - * - * @file - * @author Steffen Vogel - * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -/** - * @addtogroup mqtt_unthreaded mqtt unthreaded node type - * @ingroup node - * @{ - */ - -#pragma once - -#include - -/** @see node_type::open */ -int mqtt_unthreaded_start(struct node *n); - -/** @see node_type::close */ -int mqtt_unthreaded_stop(struct node *n); - -/** @see node_type::read */ -int mqtt_unthreaded_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); - -/** @see node_type::write */ -int mqtt_unthreaded_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); - -/** perform MQTT network operations */ -int mqtt_unthreaded_loop(struct node *n); - -/** @} */ diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index bc4110fe1..48c0d2d5a 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -122,7 +122,7 @@ endif() # Enable MQTT support if(WITH_NODE_MQTT) - list(APPEND NODE_SRC mqtt.cpp mqtt_unthreaded.cpp) + list(APPEND NODE_SRC mqtt.cpp) list(APPEND INCLUDE_DIRS ${MOSQUITTO_INCLUDE_DIRS}) list(APPEND LIBRARIES ${MOSQUITTO_LIBRARIES}) endif() diff --git a/lib/nodes/mqtt_unthreaded.cpp b/lib/nodes/mqtt_unthreaded.cpp deleted file mode 100644 index dafaefabb..000000000 --- a/lib/nodes/mqtt_unthreaded.cpp +++ /dev/null @@ -1,339 +0,0 @@ -/** Node type: mqtt_unthreaded - * - * @author Steffen Vogel - * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#include -#include - -#include -#include -#include -#include - -static void mqtt_unthreaded_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) -{ - switch (level) { - case MOSQ_LOG_NONE: - case MOSQ_LOG_INFO: - case MOSQ_LOG_NOTICE: - info("MQTT: %s", str); - break; - - case MOSQ_LOG_WARNING: - warning("MQTT: %s", str); - break; - - case MOSQ_LOG_ERR: - error("MQTT: %s", str); - break; - - case MOSQ_LOG_DEBUG: - debug(5, "MQTT: %s", str); - break; - } -} - - -static void mqtt_unthreaded_connect_cb(struct mosquitto *mosq, void *userdata, int result) -{ - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - - int ret; - - info("MQTT: Node %s connected to broker %s", node_name(n), m->host); - - if (m->subscribe) { - ret = mosquitto_subscribe(m->client, nullptr, m->subscribe, m->qos); - if (ret) - warning("MQTT: failed to subscribe to topic '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); - } - else - warning("MQTT: no subscribe for node %s as no subscribe topic is given", node_name(n)); - - mqtt_unthreaded_loop(n); -} - -static void mqtt_unthreaded_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) -{ - int ret; - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - struct sample *smps[n->in.vectorize]; - - debug(5, "MQTT: Node %s received a message of %d bytes from broker %s", node_name(n), msg->payloadlen, m->host); - - ret = sample_alloc_many(&m->pool, smps, n->in.vectorize); - if (ret <= 0) { - warning("Pool underrun in subscriber of %s", node_name(n)); - return; - } - - ret = io_sscan(&m->io, (char *) msg->payload, msg->payloadlen, nullptr, smps, n->in.vectorize); - if (ret < 0) { - warning("MQTT: Node %s received an invalid message", node_name(n)); - warning(" Payload: %s", (char *) msg->payload); - return; - } - if (ret == 0) { - debug(4, "MQTT: skip empty message for node %s", node_name(n)); - sample_decref_many(smps, n->in.vectorize); - return; - } - - queue_signalled_push_many(&m->queue, (void **) smps, n->in.vectorize); - mqtt_unthreaded_loop(n); -} - -static void mqtt_unthreaded_disconnect_cb(struct mosquitto *mosq, void *userdata, int result) -{ - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - - info("MQTT: Node %s disconnected from broker %s", node_name(n), m->host); -} - - -static void mqtt_unthreaded_subscribe_cb(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) -{ - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - - info("MQTT: Node %s subscribed to broker %s", node_name(n), m->host); - n->state=STATE_STARTED; -} - -int mqtt_unthreaded_check(struct node *n) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - ret = mosquitto_sub_topic_check(m->subscribe); - if (ret != MOSQ_ERR_SUCCESS) - error("Invalid subscribe topic: '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); - - ret = mosquitto_pub_topic_check(m->publish); - if (ret != MOSQ_ERR_SUCCESS) - error("Invalid publish topic: '%s' for node %s: %s", m->publish, node_name(n), mosquitto_strerror(ret)); - - return 0; -} - - - -int mqtt_unthreaded_start(struct node *n) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - m->client = mosquitto_new(n->name, 0, (void *) n); - if (!m->client) - return -1; - - if (m->username && m->password) { - ret = mosquitto_username_pw_set(m->client, m->username, m->password); - if (ret) - goto mosquitto_error; - } - - if (m->ssl.enabled) { - ret = mosquitto_tls_set(m->client, m->ssl.cafile, m->ssl.capath, m->ssl.certfile, m->ssl.keyfile, nullptr); - if (ret) - goto mosquitto_error; - - ret = mosquitto_tls_insecure_set(m->client, m->ssl.insecure); - if (ret) - goto mosquitto_error; - } - - mosquitto_log_callback_set(m->client, mqtt_unthreaded_log_cb); - mosquitto_connect_callback_set(m->client, mqtt_unthreaded_connect_cb); - mosquitto_disconnect_callback_set(m->client, mqtt_unthreaded_disconnect_cb); - mosquitto_message_callback_set(m->client, mqtt_unthreaded_message_cb); - mosquitto_subscribe_callback_set(m->client, mqtt_unthreaded_subscribe_cb); - - ret = io_init(&m->io, m->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); - if (ret) - return ret; - - ret = io_check(&m->io); - if (ret) - return ret; - - ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals)), &memory_hugepage); - if (ret) - return ret; - - ret = queue_signalled_init(&m->queue, 1024, &memory_hugepage, 0); - if (ret) - return ret; - - ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); - if (ret) - goto mosquitto_error; - - //The following while loop exists upon completion of subscibe procedure - while(n->state == STATE_PREPARED){ - mqtt_unthreaded_loop(n); - } - - return 0; - -mosquitto_error: - warning("MQTT: %s", mosquitto_strerror(ret)); - - return ret; -} - -int mqtt_unthreaded_stop(struct node *n) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - ret = mosquitto_disconnect(m->client); - if (ret) - goto mosquitto_error; - - ret = io_destroy(&m->io); - if (ret) - return ret; - - return 0; - -mosquitto_error: - warning("MQTT: %s", mosquitto_strerror(ret)); - - return ret; -} - -int mqtt_unthreaded_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) -{ - int pulled=0; - mqtt_unthreaded_loop(n); - - if(cnt > 0){ - struct mqtt *m = (struct mqtt *) n->_vd; - struct sample *smpt[cnt]; - pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt); - - sample_copy_many(smps, smpt, pulled); - sample_decref_many(smpt, pulled); - } - return pulled; -} - -int mqtt_unthreaded_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - size_t wbytes; - - char data[1500]; - - ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); - if (ret < 0) - return ret; - - if (m->publish) { - ret = mosquitto_publish(m->client, nullptr /* mid */, m->publish, wbytes, data, m->qos, m->retain); - if (ret != MOSQ_ERR_SUCCESS) { - warning("MQTT: publish failed for node %s: %s", node_name(n), mosquitto_strerror(ret)); - return -abs(ret); - } - } - else - warning("MQTT: no publish for node %s possible because no publish topic is given", node_name(n)); - - mqtt_unthreaded_loop(n); - - return cnt; -} - -int mqtt_unthreaded_loop(struct node *n){ - - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - //Carry out network operations in a synchronous way. - ret = mosquitto_loop(m->client, 0, 1); - if(ret){ - warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(n), mosquitto_strerror(ret)); - ret = mosquitto_reconnect(m->client); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: reconnection to broker failed for node %s: %s", node_name(n), mosquitto_strerror(ret)); - } - else{ - warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(n), mosquitto_strerror(ret)); - } - ret = mosquitto_loop(m->client, -1, 1); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: persisting connection error for node %s: %s", node_name(n), mosquitto_strerror(ret)); - } - } - - return 0; -} - -int mqtt_unthreaded_poll_fds(struct node *n, int fds[]) -{ - struct mqtt *m = (struct mqtt *) n->_vd; - - fds[0] = queue_signalled_fd(&m->queue); - - return 1; -} - -static struct plugin p; - -__attribute__((constructor(110))) -static void register_plugin() { - if (plugins.state == STATE_DESTROYED) - vlist_init(&plugins); - - p.name = "mqtt_unthreaded"; - p.description = "Message Queuing Telemetry Transport (libmosquitto), unthreaded variant"; - p.type = PLUGIN_TYPE_NODE; - p.node.instances.state = STATE_DESTROYED; - p.node.vectorize = 0; - p.node.size = sizeof(struct mqtt); - p.node.type.start = mqtt_type_start; - p.node.type.stop = mqtt_type_stop; - p.node.destroy = mqtt_destroy; - p.node.parse = mqtt_parse; - p.node.check = mqtt_unthreaded_check; - p.node.print = mqtt_print; - p.node.start = mqtt_unthreaded_start; - p.node.stop = mqtt_unthreaded_stop; - p.node.read = mqtt_unthreaded_read; - p.node.write = mqtt_unthreaded_write; - p.node.reverse = mqtt_reverse; - p.node.poll_fds = mqtt_unthreaded_poll_fds; - - vlist_init(&p.node.instances); - vlist_push(&plugins, &p); -} - -__attribute__((destructor(110))) -static void deregister_plugin() { - if (plugins.state != STATE_DESTROYED) - vlist_remove_all(&plugins, &p); -} From a887264659c052e328f39a0cf0236a7ad07e07ab Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Mon, 16 Sep 2019 09:48:22 +0200 Subject: [PATCH 06/17] fix indentation --- lib/nodes/mqtt.cpp | 104 ++++++++++++++++++++++----------------------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index c22b1b6c3..152243188 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -34,39 +34,39 @@ static pthread_t thread; static void * mosquitto_loop_thread(void *ctx) { - int ret; - // set the cancel type of this thread to async - ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); - if (ret != 0) { - error("Unable to set cancel type of MQTT communication thread to asynchronous."); - return nullptr; - } + int ret; + // set the cancel type of this thread to async + ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + if (ret != 0) { + error("Unable to set cancel type of MQTT communication thread to asynchronous."); + return nullptr; + } - while(true){ - for (unsigned i = 0; i < vlist_length(&clients); i++) { - struct node *node = (struct node *) vlist_at(&clients, i); - struct mqtt *m = (struct mqtt *) node->_vd; + while(true){ + for (unsigned i = 0; i < vlist_length(&clients); i++) { + struct node *node = (struct node *) vlist_at(&clients, i); + struct mqtt *m = (struct mqtt *) node->_vd; - // execute mosquitto loop for this client - ret = mosquitto_loop(m->client, 0, 1); - if(ret){ - warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret)); - ret = mosquitto_reconnect(m->client); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } - else{ - warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } - ret = mosquitto_loop(m->client, -1, 1); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } - } - } // for loop - } // while(1) + // execute mosquitto loop for this client + ret = mosquitto_loop(m->client, 0, 1); + if(ret){ + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret)); + ret = mosquitto_reconnect(m->client); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret)); + } + else{ + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret)); + } + ret = mosquitto_loop(m->client, -1, 1); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret)); + } + } + } // for loop + } // while(1) - return nullptr; + return nullptr; } static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) @@ -372,7 +372,7 @@ int mqtt_start(struct node *n) // add client to global list of MQTT clients // so that thread can call mosquitto loop for this client - vlist_push(&clients, n); + vlist_push(&clients, n); return 0; @@ -387,10 +387,10 @@ int mqtt_stop(struct node *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; - // unregister client from global MQTT client list - // so that mosquitto loop is no longer invoked for this client - // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect - vlist_remove(&clients, vlist_index(&clients, n)); + // unregister client from global MQTT client list + // so that mosquitto loop is no longer invoked for this client + // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect + vlist_remove(&clients, vlist_index(&clients, n)); ret = mosquitto_disconnect(m->client); if (ret) @@ -412,20 +412,20 @@ int mqtt_type_start(villas::node::SuperNode *sn) { int ret; - ret = vlist_init(&clients); - if (ret) { - return ret; - } + ret = vlist_init(&clients); + if (ret) { + return ret; + } ret = mosquitto_lib_init(); if (ret) goto mosquitto_error; // start thread here to run mosquitto loop for registered clients - ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); - if (ret) { - return ret; - } + ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); + if (ret) { + return ret; + } return 0; @@ -440,15 +440,15 @@ int mqtt_type_stop() int ret; // stop thread here that executes mosquitto loop - ret = pthread_cancel(thread); - if (ret) - return ret; - debug( 3, "Called pthread_cancel() on MQTT communication management thread."); + ret = pthread_cancel(thread); + if (ret) + return ret; + debug( 3, "Called pthread_cancel() on MQTT communication management thread."); - ret = pthread_join(thread, nullptr); - if (ret) { - return ret; - } + ret = pthread_join(thread, nullptr); + if (ret) { + return ret; + } ret = mosquitto_lib_cleanup(); if (ret) @@ -458,7 +458,7 @@ int mqtt_type_stop() if (vlist_length(&clients) > 0) { error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); } - vlist_destroy(&clients, nullptr, false); + vlist_destroy(&clients, nullptr, false); return 0; From f3db8f6192609932016c095bef5e2f143d32cad6 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Wed, 8 May 2019 14:10:55 +0200 Subject: [PATCH 07/17] add unthreaded version of MQTT node type --- include/villas/nodes/mqtt_unthreaded.hpp | 49 ++++ lib/nodes/CMakeLists.txt | 2 +- lib/nodes/mqtt_unthreaded.cpp | 339 +++++++++++++++++++++++ 3 files changed, 389 insertions(+), 1 deletion(-) create mode 100644 include/villas/nodes/mqtt_unthreaded.hpp create mode 100644 lib/nodes/mqtt_unthreaded.cpp diff --git a/include/villas/nodes/mqtt_unthreaded.hpp b/include/villas/nodes/mqtt_unthreaded.hpp new file mode 100644 index 000000000..4b9a37af1 --- /dev/null +++ b/include/villas/nodes/mqtt_unthreaded.hpp @@ -0,0 +1,49 @@ +/** Node type: mqtt_unthreaded + * + * @file + * @author Steffen Vogel + * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** + * @addtogroup mqtt_unthreaded mqtt unthreaded node type + * @ingroup node + * @{ + */ + +#pragma once + +#include + +/** @see node_type::open */ +int mqtt_unthreaded_start(struct node *n); + +/** @see node_type::close */ +int mqtt_unthreaded_stop(struct node *n); + +/** @see node_type::read */ +int mqtt_unthreaded_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); + +/** @see node_type::write */ +int mqtt_unthreaded_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); + +/** perform MQTT network operations */ +int mqtt_unthreaded_loop(struct node *n); + +/** @} */ diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 516743f66..db8759a8d 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -126,7 +126,7 @@ endif() # Enable MQTT support if(WITH_NODE_MQTT) - list(APPEND NODE_SRC mqtt.cpp) + list(APPEND NODE_SRC mqtt.cpp mqtt_unthreaded.cpp) list(APPEND INCLUDE_DIRS ${MOSQUITTO_INCLUDE_DIRS}) list(APPEND LIBRARIES ${MOSQUITTO_LIBRARIES}) endif() diff --git a/lib/nodes/mqtt_unthreaded.cpp b/lib/nodes/mqtt_unthreaded.cpp new file mode 100644 index 000000000..dafaefabb --- /dev/null +++ b/lib/nodes/mqtt_unthreaded.cpp @@ -0,0 +1,339 @@ +/** Node type: mqtt_unthreaded + * + * @author Steffen Vogel + * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include + +#include +#include +#include +#include + +static void mqtt_unthreaded_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) +{ + switch (level) { + case MOSQ_LOG_NONE: + case MOSQ_LOG_INFO: + case MOSQ_LOG_NOTICE: + info("MQTT: %s", str); + break; + + case MOSQ_LOG_WARNING: + warning("MQTT: %s", str); + break; + + case MOSQ_LOG_ERR: + error("MQTT: %s", str); + break; + + case MOSQ_LOG_DEBUG: + debug(5, "MQTT: %s", str); + break; + } +} + + +static void mqtt_unthreaded_connect_cb(struct mosquitto *mosq, void *userdata, int result) +{ + struct node *n = (struct node *) userdata; + struct mqtt *m = (struct mqtt *) n->_vd; + + int ret; + + info("MQTT: Node %s connected to broker %s", node_name(n), m->host); + + if (m->subscribe) { + ret = mosquitto_subscribe(m->client, nullptr, m->subscribe, m->qos); + if (ret) + warning("MQTT: failed to subscribe to topic '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); + } + else + warning("MQTT: no subscribe for node %s as no subscribe topic is given", node_name(n)); + + mqtt_unthreaded_loop(n); +} + +static void mqtt_unthreaded_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) +{ + int ret; + struct node *n = (struct node *) userdata; + struct mqtt *m = (struct mqtt *) n->_vd; + struct sample *smps[n->in.vectorize]; + + debug(5, "MQTT: Node %s received a message of %d bytes from broker %s", node_name(n), msg->payloadlen, m->host); + + ret = sample_alloc_many(&m->pool, smps, n->in.vectorize); + if (ret <= 0) { + warning("Pool underrun in subscriber of %s", node_name(n)); + return; + } + + ret = io_sscan(&m->io, (char *) msg->payload, msg->payloadlen, nullptr, smps, n->in.vectorize); + if (ret < 0) { + warning("MQTT: Node %s received an invalid message", node_name(n)); + warning(" Payload: %s", (char *) msg->payload); + return; + } + if (ret == 0) { + debug(4, "MQTT: skip empty message for node %s", node_name(n)); + sample_decref_many(smps, n->in.vectorize); + return; + } + + queue_signalled_push_many(&m->queue, (void **) smps, n->in.vectorize); + mqtt_unthreaded_loop(n); +} + +static void mqtt_unthreaded_disconnect_cb(struct mosquitto *mosq, void *userdata, int result) +{ + struct node *n = (struct node *) userdata; + struct mqtt *m = (struct mqtt *) n->_vd; + + info("MQTT: Node %s disconnected from broker %s", node_name(n), m->host); +} + + +static void mqtt_unthreaded_subscribe_cb(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) +{ + struct node *n = (struct node *) userdata; + struct mqtt *m = (struct mqtt *) n->_vd; + + info("MQTT: Node %s subscribed to broker %s", node_name(n), m->host); + n->state=STATE_STARTED; +} + +int mqtt_unthreaded_check(struct node *n) +{ + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + ret = mosquitto_sub_topic_check(m->subscribe); + if (ret != MOSQ_ERR_SUCCESS) + error("Invalid subscribe topic: '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); + + ret = mosquitto_pub_topic_check(m->publish); + if (ret != MOSQ_ERR_SUCCESS) + error("Invalid publish topic: '%s' for node %s: %s", m->publish, node_name(n), mosquitto_strerror(ret)); + + return 0; +} + + + +int mqtt_unthreaded_start(struct node *n) +{ + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + m->client = mosquitto_new(n->name, 0, (void *) n); + if (!m->client) + return -1; + + if (m->username && m->password) { + ret = mosquitto_username_pw_set(m->client, m->username, m->password); + if (ret) + goto mosquitto_error; + } + + if (m->ssl.enabled) { + ret = mosquitto_tls_set(m->client, m->ssl.cafile, m->ssl.capath, m->ssl.certfile, m->ssl.keyfile, nullptr); + if (ret) + goto mosquitto_error; + + ret = mosquitto_tls_insecure_set(m->client, m->ssl.insecure); + if (ret) + goto mosquitto_error; + } + + mosquitto_log_callback_set(m->client, mqtt_unthreaded_log_cb); + mosquitto_connect_callback_set(m->client, mqtt_unthreaded_connect_cb); + mosquitto_disconnect_callback_set(m->client, mqtt_unthreaded_disconnect_cb); + mosquitto_message_callback_set(m->client, mqtt_unthreaded_message_cb); + mosquitto_subscribe_callback_set(m->client, mqtt_unthreaded_subscribe_cb); + + ret = io_init(&m->io, m->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); + if (ret) + return ret; + + ret = io_check(&m->io); + if (ret) + return ret; + + ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals)), &memory_hugepage); + if (ret) + return ret; + + ret = queue_signalled_init(&m->queue, 1024, &memory_hugepage, 0); + if (ret) + return ret; + + ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); + if (ret) + goto mosquitto_error; + + //The following while loop exists upon completion of subscibe procedure + while(n->state == STATE_PREPARED){ + mqtt_unthreaded_loop(n); + } + + return 0; + +mosquitto_error: + warning("MQTT: %s", mosquitto_strerror(ret)); + + return ret; +} + +int mqtt_unthreaded_stop(struct node *n) +{ + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + ret = mosquitto_disconnect(m->client); + if (ret) + goto mosquitto_error; + + ret = io_destroy(&m->io); + if (ret) + return ret; + + return 0; + +mosquitto_error: + warning("MQTT: %s", mosquitto_strerror(ret)); + + return ret; +} + +int mqtt_unthreaded_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) +{ + int pulled=0; + mqtt_unthreaded_loop(n); + + if(cnt > 0){ + struct mqtt *m = (struct mqtt *) n->_vd; + struct sample *smpt[cnt]; + pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt); + + sample_copy_many(smps, smpt, pulled); + sample_decref_many(smpt, pulled); + } + return pulled; +} + +int mqtt_unthreaded_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) +{ + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + size_t wbytes; + + char data[1500]; + + ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); + if (ret < 0) + return ret; + + if (m->publish) { + ret = mosquitto_publish(m->client, nullptr /* mid */, m->publish, wbytes, data, m->qos, m->retain); + if (ret != MOSQ_ERR_SUCCESS) { + warning("MQTT: publish failed for node %s: %s", node_name(n), mosquitto_strerror(ret)); + return -abs(ret); + } + } + else + warning("MQTT: no publish for node %s possible because no publish topic is given", node_name(n)); + + mqtt_unthreaded_loop(n); + + return cnt; +} + +int mqtt_unthreaded_loop(struct node *n){ + + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + //Carry out network operations in a synchronous way. + ret = mosquitto_loop(m->client, 0, 1); + if(ret){ + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(n), mosquitto_strerror(ret)); + ret = mosquitto_reconnect(m->client); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: reconnection to broker failed for node %s: %s", node_name(n), mosquitto_strerror(ret)); + } + else{ + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(n), mosquitto_strerror(ret)); + } + ret = mosquitto_loop(m->client, -1, 1); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: persisting connection error for node %s: %s", node_name(n), mosquitto_strerror(ret)); + } + } + + return 0; +} + +int mqtt_unthreaded_poll_fds(struct node *n, int fds[]) +{ + struct mqtt *m = (struct mqtt *) n->_vd; + + fds[0] = queue_signalled_fd(&m->queue); + + return 1; +} + +static struct plugin p; + +__attribute__((constructor(110))) +static void register_plugin() { + if (plugins.state == STATE_DESTROYED) + vlist_init(&plugins); + + p.name = "mqtt_unthreaded"; + p.description = "Message Queuing Telemetry Transport (libmosquitto), unthreaded variant"; + p.type = PLUGIN_TYPE_NODE; + p.node.instances.state = STATE_DESTROYED; + p.node.vectorize = 0; + p.node.size = sizeof(struct mqtt); + p.node.type.start = mqtt_type_start; + p.node.type.stop = mqtt_type_stop; + p.node.destroy = mqtt_destroy; + p.node.parse = mqtt_parse; + p.node.check = mqtt_unthreaded_check; + p.node.print = mqtt_print; + p.node.start = mqtt_unthreaded_start; + p.node.stop = mqtt_unthreaded_stop; + p.node.read = mqtt_unthreaded_read; + p.node.write = mqtt_unthreaded_write; + p.node.reverse = mqtt_reverse; + p.node.poll_fds = mqtt_unthreaded_poll_fds; + + vlist_init(&p.node.instances); + vlist_push(&plugins, &p); +} + +__attribute__((destructor(110))) +static void deregister_plugin() { + if (plugins.state != STATE_DESTROYED) + vlist_remove_all(&plugins, &p); +} From 19523377f4543ef711a84fde409f0ceac7b4bfb2 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Thu, 12 Sep 2019 17:05:18 +0200 Subject: [PATCH 08/17] use a thread per process to execute mosquitto loop, contributes to #248 --- lib/nodes/mqtt.cpp | 73 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 67 insertions(+), 6 deletions(-) diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index 9cee077d8..dc169bf01 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -30,6 +30,40 @@ using namespace villas::utils; +// Each process has a list of clients for which a thread invokes the mosquitto loop +static struct vlist clients; +static pthread_t thread; + +static void * mosquitto_loop_thread(void *ctx) +{ + int ret; + while(true){ + for (unsigned i = 0; i < vlist_length(&clients); i++) { + struct node *c = (struct node *) vlist_at(&clients, i); + struct mqtt *m = (struct mqtt *) c->_vd; + + // execute mosquitto loop for this client + ret = mosquitto_loop(m->client, 0, 1); + if(ret){ + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(c), mosquitto_strerror(ret)); + ret = mosquitto_reconnect(m->client); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: reconnection to broker failed for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + else{ + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + ret = mosquitto_loop(m->client, -1, 1); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: persisting connection error for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + } + } // for loop + } // while(1) + + return nullptr; +} + static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) { switch (level) { @@ -331,9 +365,9 @@ int mqtt_start(struct node *n) if (ret) goto mosquitto_error; - ret = mosquitto_loop_start(m->client); - if (ret) - goto mosquitto_error; + // add client to global list of MQTT clients + // so that thread can call mosquitto loop for this client + vlist_push(&clients, n); return 0; @@ -352,9 +386,9 @@ int mqtt_stop(struct node *n) if (ret) goto mosquitto_error; - ret = mosquitto_loop_stop(m->client, 0); - if (ret) - goto mosquitto_error; + // unregister client from global MQTT client list + // so that mosquitto loop is no longer invoked for this client + vlist_remove_all(&clients, n); ret = io_destroy(&m->io); if (ret) @@ -372,10 +406,21 @@ int mqtt_type_start(villas::node::SuperNode *sn) { int ret; + ret = vlist_init(&clients); + if (ret) { + return ret; + } + ret = mosquitto_lib_init(); if (ret) goto mosquitto_error; + // start thread here to run mosquitto loop for registered clients + ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); + if (ret) { + return ret; + } + return 0; mosquitto_error: @@ -388,10 +433,26 @@ int mqtt_type_stop() { int ret; + // stop thread here that executes mosquitto loop + ret = pthread_cancel(thread); + if (ret) + return ret; + + ret = pthread_join(thread, nullptr); + if (ret) { + return ret; + } + ret = mosquitto_lib_cleanup(); if (ret) goto mosquitto_error; + // when this is called the list of clients should be empty + if (vlist_length(&clients) > 0) { + error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); + } + vlist_destroy(&clients, nullptr, false); + return 0; mosquitto_error: From 3624cb9fcac842fc0c8045510fdc6a2d8c455124 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Fri, 13 Sep 2019 09:02:40 +0200 Subject: [PATCH 09/17] renaming variable to improve readability --- lib/nodes/mqtt.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index dc169bf01..96f87b37a 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -39,23 +39,23 @@ static void * mosquitto_loop_thread(void *ctx) int ret; while(true){ for (unsigned i = 0; i < vlist_length(&clients); i++) { - struct node *c = (struct node *) vlist_at(&clients, i); - struct mqtt *m = (struct mqtt *) c->_vd; + struct node *node = (struct node *) vlist_at(&clients, i); + struct mqtt *m = (struct mqtt *) node->_vd; // execute mosquitto loop for this client ret = mosquitto_loop(m->client, 0, 1); if(ret){ - warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(c), mosquitto_strerror(ret)); + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret)); ret = mosquitto_reconnect(m->client); if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: reconnection to broker failed for node %s: %s", node_name(c), mosquitto_strerror(ret)); + error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret)); } else{ - warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(c), mosquitto_strerror(ret)); + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret)); } ret = mosquitto_loop(m->client, -1, 1); if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: persisting connection error for node %s: %s", node_name(c), mosquitto_strerror(ret)); + error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret)); } } } // for loop From e52d2f77eeab1897bf8235a03d12ac5913f9d1fb Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Fri, 13 Sep 2019 09:56:50 +0200 Subject: [PATCH 10/17] improve unthreaded implementation of MQTT node type - set cancel type of MQTT communication management thread to asynchronous - fix removing node from list in mqtt_stop - add a debug output upon invocation of pthread cancel in mqtt_type_stop - contributes to #248 --- lib/nodes/mqtt.cpp | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index 96f87b37a..0770f21a5 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -37,6 +37,13 @@ static pthread_t thread; static void * mosquitto_loop_thread(void *ctx) { int ret; + // set the cancel type of this thread to async + ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + if (ret != 0) { + error("Unable to set cancel type of MQTT communication thread to asynchronous."); + return nullptr; + } + while(true){ for (unsigned i = 0; i < vlist_length(&clients); i++) { struct node *node = (struct node *) vlist_at(&clients, i); @@ -382,14 +389,15 @@ int mqtt_stop(struct node *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; + // unregister client from global MQTT client list + // so that mosquitto loop is no longer invoked for this client + // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect + vlist_remove(&clients, vlist_index(&clients, n)); + ret = mosquitto_disconnect(m->client); if (ret) goto mosquitto_error; - // unregister client from global MQTT client list - // so that mosquitto loop is no longer invoked for this client - vlist_remove_all(&clients, n); - ret = io_destroy(&m->io); if (ret) return ret; @@ -437,6 +445,7 @@ int mqtt_type_stop() ret = pthread_cancel(thread); if (ret) return ret; + debug( 3, "Called pthread_cancel() on MQTT communication management thread."); ret = pthread_join(thread, nullptr); if (ret) { From ff8151bee0e01678a76f40f635896e67bf75d81b Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Fri, 13 Sep 2019 10:15:03 +0200 Subject: [PATCH 11/17] remove files of mqtt_unthreaded node type since this is implemented in mqtt type now --- include/villas/nodes/mqtt_unthreaded.hpp | 49 ---- lib/nodes/CMakeLists.txt | 2 +- lib/nodes/mqtt_unthreaded.cpp | 339 ----------------------- 3 files changed, 1 insertion(+), 389 deletions(-) delete mode 100644 include/villas/nodes/mqtt_unthreaded.hpp delete mode 100644 lib/nodes/mqtt_unthreaded.cpp diff --git a/include/villas/nodes/mqtt_unthreaded.hpp b/include/villas/nodes/mqtt_unthreaded.hpp deleted file mode 100644 index 4b9a37af1..000000000 --- a/include/villas/nodes/mqtt_unthreaded.hpp +++ /dev/null @@ -1,49 +0,0 @@ -/** Node type: mqtt_unthreaded - * - * @file - * @author Steffen Vogel - * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -/** - * @addtogroup mqtt_unthreaded mqtt unthreaded node type - * @ingroup node - * @{ - */ - -#pragma once - -#include - -/** @see node_type::open */ -int mqtt_unthreaded_start(struct node *n); - -/** @see node_type::close */ -int mqtt_unthreaded_stop(struct node *n); - -/** @see node_type::read */ -int mqtt_unthreaded_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); - -/** @see node_type::write */ -int mqtt_unthreaded_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); - -/** perform MQTT network operations */ -int mqtt_unthreaded_loop(struct node *n); - -/** @} */ diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index db8759a8d..516743f66 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -126,7 +126,7 @@ endif() # Enable MQTT support if(WITH_NODE_MQTT) - list(APPEND NODE_SRC mqtt.cpp mqtt_unthreaded.cpp) + list(APPEND NODE_SRC mqtt.cpp) list(APPEND INCLUDE_DIRS ${MOSQUITTO_INCLUDE_DIRS}) list(APPEND LIBRARIES ${MOSQUITTO_LIBRARIES}) endif() diff --git a/lib/nodes/mqtt_unthreaded.cpp b/lib/nodes/mqtt_unthreaded.cpp deleted file mode 100644 index dafaefabb..000000000 --- a/lib/nodes/mqtt_unthreaded.cpp +++ /dev/null @@ -1,339 +0,0 @@ -/** Node type: mqtt_unthreaded - * - * @author Steffen Vogel - * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#include -#include - -#include -#include -#include -#include - -static void mqtt_unthreaded_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) -{ - switch (level) { - case MOSQ_LOG_NONE: - case MOSQ_LOG_INFO: - case MOSQ_LOG_NOTICE: - info("MQTT: %s", str); - break; - - case MOSQ_LOG_WARNING: - warning("MQTT: %s", str); - break; - - case MOSQ_LOG_ERR: - error("MQTT: %s", str); - break; - - case MOSQ_LOG_DEBUG: - debug(5, "MQTT: %s", str); - break; - } -} - - -static void mqtt_unthreaded_connect_cb(struct mosquitto *mosq, void *userdata, int result) -{ - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - - int ret; - - info("MQTT: Node %s connected to broker %s", node_name(n), m->host); - - if (m->subscribe) { - ret = mosquitto_subscribe(m->client, nullptr, m->subscribe, m->qos); - if (ret) - warning("MQTT: failed to subscribe to topic '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); - } - else - warning("MQTT: no subscribe for node %s as no subscribe topic is given", node_name(n)); - - mqtt_unthreaded_loop(n); -} - -static void mqtt_unthreaded_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) -{ - int ret; - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - struct sample *smps[n->in.vectorize]; - - debug(5, "MQTT: Node %s received a message of %d bytes from broker %s", node_name(n), msg->payloadlen, m->host); - - ret = sample_alloc_many(&m->pool, smps, n->in.vectorize); - if (ret <= 0) { - warning("Pool underrun in subscriber of %s", node_name(n)); - return; - } - - ret = io_sscan(&m->io, (char *) msg->payload, msg->payloadlen, nullptr, smps, n->in.vectorize); - if (ret < 0) { - warning("MQTT: Node %s received an invalid message", node_name(n)); - warning(" Payload: %s", (char *) msg->payload); - return; - } - if (ret == 0) { - debug(4, "MQTT: skip empty message for node %s", node_name(n)); - sample_decref_many(smps, n->in.vectorize); - return; - } - - queue_signalled_push_many(&m->queue, (void **) smps, n->in.vectorize); - mqtt_unthreaded_loop(n); -} - -static void mqtt_unthreaded_disconnect_cb(struct mosquitto *mosq, void *userdata, int result) -{ - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - - info("MQTT: Node %s disconnected from broker %s", node_name(n), m->host); -} - - -static void mqtt_unthreaded_subscribe_cb(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) -{ - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - - info("MQTT: Node %s subscribed to broker %s", node_name(n), m->host); - n->state=STATE_STARTED; -} - -int mqtt_unthreaded_check(struct node *n) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - ret = mosquitto_sub_topic_check(m->subscribe); - if (ret != MOSQ_ERR_SUCCESS) - error("Invalid subscribe topic: '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); - - ret = mosquitto_pub_topic_check(m->publish); - if (ret != MOSQ_ERR_SUCCESS) - error("Invalid publish topic: '%s' for node %s: %s", m->publish, node_name(n), mosquitto_strerror(ret)); - - return 0; -} - - - -int mqtt_unthreaded_start(struct node *n) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - m->client = mosquitto_new(n->name, 0, (void *) n); - if (!m->client) - return -1; - - if (m->username && m->password) { - ret = mosquitto_username_pw_set(m->client, m->username, m->password); - if (ret) - goto mosquitto_error; - } - - if (m->ssl.enabled) { - ret = mosquitto_tls_set(m->client, m->ssl.cafile, m->ssl.capath, m->ssl.certfile, m->ssl.keyfile, nullptr); - if (ret) - goto mosquitto_error; - - ret = mosquitto_tls_insecure_set(m->client, m->ssl.insecure); - if (ret) - goto mosquitto_error; - } - - mosquitto_log_callback_set(m->client, mqtt_unthreaded_log_cb); - mosquitto_connect_callback_set(m->client, mqtt_unthreaded_connect_cb); - mosquitto_disconnect_callback_set(m->client, mqtt_unthreaded_disconnect_cb); - mosquitto_message_callback_set(m->client, mqtt_unthreaded_message_cb); - mosquitto_subscribe_callback_set(m->client, mqtt_unthreaded_subscribe_cb); - - ret = io_init(&m->io, m->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); - if (ret) - return ret; - - ret = io_check(&m->io); - if (ret) - return ret; - - ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals)), &memory_hugepage); - if (ret) - return ret; - - ret = queue_signalled_init(&m->queue, 1024, &memory_hugepage, 0); - if (ret) - return ret; - - ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); - if (ret) - goto mosquitto_error; - - //The following while loop exists upon completion of subscibe procedure - while(n->state == STATE_PREPARED){ - mqtt_unthreaded_loop(n); - } - - return 0; - -mosquitto_error: - warning("MQTT: %s", mosquitto_strerror(ret)); - - return ret; -} - -int mqtt_unthreaded_stop(struct node *n) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - ret = mosquitto_disconnect(m->client); - if (ret) - goto mosquitto_error; - - ret = io_destroy(&m->io); - if (ret) - return ret; - - return 0; - -mosquitto_error: - warning("MQTT: %s", mosquitto_strerror(ret)); - - return ret; -} - -int mqtt_unthreaded_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) -{ - int pulled=0; - mqtt_unthreaded_loop(n); - - if(cnt > 0){ - struct mqtt *m = (struct mqtt *) n->_vd; - struct sample *smpt[cnt]; - pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt); - - sample_copy_many(smps, smpt, pulled); - sample_decref_many(smpt, pulled); - } - return pulled; -} - -int mqtt_unthreaded_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - size_t wbytes; - - char data[1500]; - - ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); - if (ret < 0) - return ret; - - if (m->publish) { - ret = mosquitto_publish(m->client, nullptr /* mid */, m->publish, wbytes, data, m->qos, m->retain); - if (ret != MOSQ_ERR_SUCCESS) { - warning("MQTT: publish failed for node %s: %s", node_name(n), mosquitto_strerror(ret)); - return -abs(ret); - } - } - else - warning("MQTT: no publish for node %s possible because no publish topic is given", node_name(n)); - - mqtt_unthreaded_loop(n); - - return cnt; -} - -int mqtt_unthreaded_loop(struct node *n){ - - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - //Carry out network operations in a synchronous way. - ret = mosquitto_loop(m->client, 0, 1); - if(ret){ - warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(n), mosquitto_strerror(ret)); - ret = mosquitto_reconnect(m->client); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: reconnection to broker failed for node %s: %s", node_name(n), mosquitto_strerror(ret)); - } - else{ - warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(n), mosquitto_strerror(ret)); - } - ret = mosquitto_loop(m->client, -1, 1); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: persisting connection error for node %s: %s", node_name(n), mosquitto_strerror(ret)); - } - } - - return 0; -} - -int mqtt_unthreaded_poll_fds(struct node *n, int fds[]) -{ - struct mqtt *m = (struct mqtt *) n->_vd; - - fds[0] = queue_signalled_fd(&m->queue); - - return 1; -} - -static struct plugin p; - -__attribute__((constructor(110))) -static void register_plugin() { - if (plugins.state == STATE_DESTROYED) - vlist_init(&plugins); - - p.name = "mqtt_unthreaded"; - p.description = "Message Queuing Telemetry Transport (libmosquitto), unthreaded variant"; - p.type = PLUGIN_TYPE_NODE; - p.node.instances.state = STATE_DESTROYED; - p.node.vectorize = 0; - p.node.size = sizeof(struct mqtt); - p.node.type.start = mqtt_type_start; - p.node.type.stop = mqtt_type_stop; - p.node.destroy = mqtt_destroy; - p.node.parse = mqtt_parse; - p.node.check = mqtt_unthreaded_check; - p.node.print = mqtt_print; - p.node.start = mqtt_unthreaded_start; - p.node.stop = mqtt_unthreaded_stop; - p.node.read = mqtt_unthreaded_read; - p.node.write = mqtt_unthreaded_write; - p.node.reverse = mqtt_reverse; - p.node.poll_fds = mqtt_unthreaded_poll_fds; - - vlist_init(&p.node.instances); - vlist_push(&plugins, &p); -} - -__attribute__((destructor(110))) -static void deregister_plugin() { - if (plugins.state != STATE_DESTROYED) - vlist_remove_all(&plugins, &p); -} From 69f76420ee24c985eb20aad4f9fe981a036ee8c1 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Mon, 16 Sep 2019 09:48:22 +0200 Subject: [PATCH 12/17] fix indentation --- lib/nodes/mqtt.cpp | 104 ++++++++++++++++++++++----------------------- 1 file changed, 52 insertions(+), 52 deletions(-) diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index 0770f21a5..b88cab9d7 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -36,39 +36,39 @@ static pthread_t thread; static void * mosquitto_loop_thread(void *ctx) { - int ret; - // set the cancel type of this thread to async - ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); - if (ret != 0) { - error("Unable to set cancel type of MQTT communication thread to asynchronous."); - return nullptr; - } + int ret; + // set the cancel type of this thread to async + ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + if (ret != 0) { + error("Unable to set cancel type of MQTT communication thread to asynchronous."); + return nullptr; + } - while(true){ - for (unsigned i = 0; i < vlist_length(&clients); i++) { - struct node *node = (struct node *) vlist_at(&clients, i); - struct mqtt *m = (struct mqtt *) node->_vd; + while(true){ + for (unsigned i = 0; i < vlist_length(&clients); i++) { + struct node *node = (struct node *) vlist_at(&clients, i); + struct mqtt *m = (struct mqtt *) node->_vd; - // execute mosquitto loop for this client - ret = mosquitto_loop(m->client, 0, 1); - if(ret){ - warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret)); - ret = mosquitto_reconnect(m->client); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } - else{ - warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } - ret = mosquitto_loop(m->client, -1, 1); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } - } - } // for loop - } // while(1) + // execute mosquitto loop for this client + ret = mosquitto_loop(m->client, 0, 1); + if(ret){ + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret)); + ret = mosquitto_reconnect(m->client); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret)); + } + else{ + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret)); + } + ret = mosquitto_loop(m->client, -1, 1); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret)); + } + } + } // for loop + } // while(1) - return nullptr; + return nullptr; } static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) @@ -374,7 +374,7 @@ int mqtt_start(struct node *n) // add client to global list of MQTT clients // so that thread can call mosquitto loop for this client - vlist_push(&clients, n); + vlist_push(&clients, n); return 0; @@ -389,10 +389,10 @@ int mqtt_stop(struct node *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; - // unregister client from global MQTT client list - // so that mosquitto loop is no longer invoked for this client - // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect - vlist_remove(&clients, vlist_index(&clients, n)); + // unregister client from global MQTT client list + // so that mosquitto loop is no longer invoked for this client + // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect + vlist_remove(&clients, vlist_index(&clients, n)); ret = mosquitto_disconnect(m->client); if (ret) @@ -414,20 +414,20 @@ int mqtt_type_start(villas::node::SuperNode *sn) { int ret; - ret = vlist_init(&clients); - if (ret) { - return ret; - } + ret = vlist_init(&clients); + if (ret) { + return ret; + } ret = mosquitto_lib_init(); if (ret) goto mosquitto_error; // start thread here to run mosquitto loop for registered clients - ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); - if (ret) { - return ret; - } + ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); + if (ret) { + return ret; + } return 0; @@ -442,15 +442,15 @@ int mqtt_type_stop() int ret; // stop thread here that executes mosquitto loop - ret = pthread_cancel(thread); - if (ret) - return ret; - debug( 3, "Called pthread_cancel() on MQTT communication management thread."); + ret = pthread_cancel(thread); + if (ret) + return ret; + debug( 3, "Called pthread_cancel() on MQTT communication management thread."); - ret = pthread_join(thread, nullptr); - if (ret) { - return ret; - } + ret = pthread_join(thread, nullptr); + if (ret) { + return ret; + } ret = mosquitto_lib_cleanup(); if (ret) @@ -460,7 +460,7 @@ int mqtt_type_stop() if (vlist_length(&clients) > 0) { error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); } - vlist_destroy(&clients, nullptr, false); + vlist_destroy(&clients, nullptr, false); return 0; From 31623673a4cd31b10367278c2c058f61680133bd Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Thu, 3 Oct 2019 11:28:19 +0200 Subject: [PATCH 13/17] minor code-style fixes --- lib/nodes/mqtt.cpp | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index b88cab9d7..1e69d7ab4 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -37,36 +37,36 @@ static pthread_t thread; static void * mosquitto_loop_thread(void *ctx) { int ret; - // set the cancel type of this thread to async + + // Set the cancel type of this thread to async ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); if (ret != 0) { error("Unable to set cancel type of MQTT communication thread to asynchronous."); return nullptr; } - while(true){ + while (true) { for (unsigned i = 0; i < vlist_length(&clients); i++) { struct node *node = (struct node *) vlist_at(&clients, i); struct mqtt *m = (struct mqtt *) node->_vd; - // execute mosquitto loop for this client + // Execute mosquitto loop for this client ret = mosquitto_loop(m->client, 0, 1); - if(ret){ + if (ret) { warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret)); + ret = mosquitto_reconnect(m->client); - if(ret != MOSQ_ERR_SUCCESS){ + if (ret != MOSQ_ERR_SUCCESS) error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } - else{ + else warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } + ret = mosquitto_loop(m->client, -1, 1); - if(ret != MOSQ_ERR_SUCCESS){ + if (ret != MOSQ_ERR_SUCCESS) error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } } - } // for loop - } // while(1) + } + } return nullptr; } @@ -141,6 +141,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct warning(" Payload: %s", (char *) msg->payload); return; } + if (ret == 0) { debug(4, "MQTT: skip empty message for node %s", node_name(n)); sample_decref_many(smps, n->in.vectorize); @@ -372,7 +373,7 @@ int mqtt_start(struct node *n) if (ret) goto mosquitto_error; - // add client to global list of MQTT clients + // Add client to global list of MQTT clients // so that thread can call mosquitto loop for this client vlist_push(&clients, n); @@ -389,7 +390,7 @@ int mqtt_stop(struct node *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; - // unregister client from global MQTT client list + // Unregister client from global MQTT client list // so that mosquitto loop is no longer invoked for this client // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect vlist_remove(&clients, vlist_index(&clients, n)); @@ -423,7 +424,7 @@ int mqtt_type_start(villas::node::SuperNode *sn) if (ret) goto mosquitto_error; - // start thread here to run mosquitto loop for registered clients + // Start thread here to run mosquitto loop for registered clients ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); if (ret) { return ret; @@ -441,7 +442,7 @@ int mqtt_type_stop() { int ret; - // stop thread here that executes mosquitto loop + // Stop thread here that executes mosquitto loop ret = pthread_cancel(thread); if (ret) return ret; @@ -456,7 +457,7 @@ int mqtt_type_stop() if (ret) goto mosquitto_error; - // when this is called the list of clients should be empty + // When this is called the list of clients should be empty if (vlist_length(&clients) > 0) { error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); } From c72fd1718dfa952e2a292c4033c2d7c024b67261 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Thu, 3 Oct 2019 11:34:59 +0200 Subject: [PATCH 14/17] fix invalid variable name --- lib/formats/msg.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/formats/msg.cpp b/lib/formats/msg.cpp index 4568440af..9fdcfc456 100644 --- a/lib/formats/msg.cpp +++ b/lib/formats/msg.cpp @@ -110,8 +110,8 @@ int msg_from_sample(struct msg *msg_in, struct sample *smp, struct vlist *signal { msg_in->type = MSG_TYPE_DATA; msg_in->version = MSG_VERSION; - msg_in->rsvd1 = 0; - msg_in->resv2 = 0; + msg_in->reserved1 = 0; + msg_in->reserved2 = 0; msg_in->length = (uint16_t) smp->length; msg_in->sequence = (uint32_t) smp->sequence; msg_in->ts.sec = smp->ts.origin.tv_sec; From df29094a14de8c42e597889256571acacc7b77e3 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Thu, 3 Oct 2019 11:44:29 +0200 Subject: [PATCH 15/17] fix merge of pps_ts hook --- lib/hooks/pps_ts.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/hooks/pps_ts.cpp b/lib/hooks/pps_ts.cpp index fa5ea693c..0cbc04b4c 100644 --- a/lib/hooks/pps_ts.cpp +++ b/lib/hooks/pps_ts.cpp @@ -63,7 +63,7 @@ public: int ret; json_error_t err; - assert(state != STATE_STARTED); + assert(state != State::STARTED); ret = json_unpack_ex(cfg, &err, 0, "{ s: i, s?: f }", "signal_index", &idx, @@ -74,12 +74,12 @@ public: info("parsed config thresh=%f signal_index=%d", thresh, idx); - state = STATE_PARSED; + state = State::PARSED; } - virtual int process(sample *smp) + virtual villas::node::Hook::Reason process(sample *smp) { - assert(state == STATE_STARTED); + assert(state == State::STARTED); /* Get value of PPS signal */ float value = smp->data[idx].f; // TODO check if it is really float @@ -109,7 +109,7 @@ public: lastValue = value; if (edgeCounter < 2) - return HOOK_SKIP_SAMPLE; + return Hook::Reason::SKIP_SAMPLE; else if (edgeCounter == 2 && isEdge) realTime.tv_nsec = 0; else @@ -122,9 +122,9 @@ public: /* Update timestamp */ smp->ts.origin = realTime; - smp->flags |= SAMPLE_HAS_TS_ORIGIN; + smp->flags |= (int) SampleFlags::HAS_TS_ORIGIN; - return HOOK_OK; + return Hook::Reason::OK; } }; @@ -132,7 +132,7 @@ public: static HookPlugin p( "pps_ts", "Timestamp samples based GPS PPS signal", - HOOK_NODE_READ | HOOK_NODE_WRITE | HOOK_PATH, + (int) Hook::Flags::NODE_READ | (int) Hook::Flags::NODE_WRITE | (int) Hook::Flags::PATH, 99 ); From ed10e3d47facb23a57fb9474e6594621f1165760 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Thu, 17 Oct 2019 10:41:50 +0200 Subject: [PATCH 16/17] comment init of node's "stats" shared pointer with nullptr since this causes node_init to crash arbitrarily --- lib/node.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/node.cpp b/lib/node.cpp index 9250f2cd5..186533bc8 100644 --- a/lib/node.cpp +++ b/lib/node.cpp @@ -54,7 +54,7 @@ int node_init(struct node *n, struct node_type *vt) n->_vt = vt; n->_vd = alloc(vt->size); - n->stats = nullptr; + //n->stats = nullptr; n->name = nullptr; n->_name = nullptr; n->_name_long = nullptr; From fdd2a3b266fcb57d3366fa57bef6085a8c62db47 Mon Sep 17 00:00:00 2001 From: Sonja Happ Date: Thu, 17 Oct 2019 10:42:53 +0200 Subject: [PATCH 17/17] only build unit tests of common if project is toplevel project --- common | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common b/common index 43fd33c9c..793427ac7 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit 43fd33c9c77db22b93f65c92a1acb25c141d42a3 +Subproject commit 793427ac7f06e8a70509264e9688a024fca9a7c9