diff --git a/include/villas/nodes/mqtt_unthreaded.hpp b/include/villas/nodes/mqtt_unthreaded.hpp deleted file mode 100644 index 4b9a37af1..000000000 --- a/include/villas/nodes/mqtt_unthreaded.hpp +++ /dev/null @@ -1,49 +0,0 @@ -/** Node type: mqtt_unthreaded - * - * @file - * @author Steffen Vogel - * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -/** - * @addtogroup mqtt_unthreaded mqtt unthreaded node type - * @ingroup node - * @{ - */ - -#pragma once - -#include - -/** @see node_type::open */ -int mqtt_unthreaded_start(struct node *n); - -/** @see node_type::close */ -int mqtt_unthreaded_stop(struct node *n); - -/** @see node_type::read */ -int mqtt_unthreaded_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); - -/** @see node_type::write */ -int mqtt_unthreaded_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); - -/** perform MQTT network operations */ -int mqtt_unthreaded_loop(struct node *n); - -/** @} */ diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index db8759a8d..516743f66 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -126,7 +126,7 @@ endif() # Enable MQTT support if(WITH_NODE_MQTT) - list(APPEND NODE_SRC mqtt.cpp mqtt_unthreaded.cpp) + list(APPEND NODE_SRC mqtt.cpp) list(APPEND INCLUDE_DIRS ${MOSQUITTO_INCLUDE_DIRS}) list(APPEND LIBRARIES ${MOSQUITTO_LIBRARIES}) endif() diff --git a/lib/nodes/mqtt_unthreaded.cpp b/lib/nodes/mqtt_unthreaded.cpp deleted file mode 100644 index dafaefabb..000000000 --- a/lib/nodes/mqtt_unthreaded.cpp +++ /dev/null @@ -1,339 +0,0 @@ -/** Node type: mqtt_unthreaded - * - * @author Steffen Vogel - * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -#include -#include - -#include -#include -#include -#include - -static void mqtt_unthreaded_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) -{ - switch (level) { - case MOSQ_LOG_NONE: - case MOSQ_LOG_INFO: - case MOSQ_LOG_NOTICE: - info("MQTT: %s", str); - break; - - case MOSQ_LOG_WARNING: - warning("MQTT: %s", str); - break; - - case MOSQ_LOG_ERR: - error("MQTT: %s", str); - break; - - case MOSQ_LOG_DEBUG: - debug(5, "MQTT: %s", str); - break; - } -} - - -static void mqtt_unthreaded_connect_cb(struct mosquitto *mosq, void *userdata, int result) -{ - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - - int ret; - - info("MQTT: Node %s connected to broker %s", node_name(n), m->host); - - if (m->subscribe) { - ret = mosquitto_subscribe(m->client, nullptr, m->subscribe, m->qos); - if (ret) - warning("MQTT: failed to subscribe to topic '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); - } - else - warning("MQTT: no subscribe for node %s as no subscribe topic is given", node_name(n)); - - mqtt_unthreaded_loop(n); -} - -static void mqtt_unthreaded_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) -{ - int ret; - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - struct sample *smps[n->in.vectorize]; - - debug(5, "MQTT: Node %s received a message of %d bytes from broker %s", node_name(n), msg->payloadlen, m->host); - - ret = sample_alloc_many(&m->pool, smps, n->in.vectorize); - if (ret <= 0) { - warning("Pool underrun in subscriber of %s", node_name(n)); - return; - } - - ret = io_sscan(&m->io, (char *) msg->payload, msg->payloadlen, nullptr, smps, n->in.vectorize); - if (ret < 0) { - warning("MQTT: Node %s received an invalid message", node_name(n)); - warning(" Payload: %s", (char *) msg->payload); - return; - } - if (ret == 0) { - debug(4, "MQTT: skip empty message for node %s", node_name(n)); - sample_decref_many(smps, n->in.vectorize); - return; - } - - queue_signalled_push_many(&m->queue, (void **) smps, n->in.vectorize); - mqtt_unthreaded_loop(n); -} - -static void mqtt_unthreaded_disconnect_cb(struct mosquitto *mosq, void *userdata, int result) -{ - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - - info("MQTT: Node %s disconnected from broker %s", node_name(n), m->host); -} - - -static void mqtt_unthreaded_subscribe_cb(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) -{ - struct node *n = (struct node *) userdata; - struct mqtt *m = (struct mqtt *) n->_vd; - - info("MQTT: Node %s subscribed to broker %s", node_name(n), m->host); - n->state=STATE_STARTED; -} - -int mqtt_unthreaded_check(struct node *n) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - ret = mosquitto_sub_topic_check(m->subscribe); - if (ret != MOSQ_ERR_SUCCESS) - error("Invalid subscribe topic: '%s' for node %s: %s", m->subscribe, node_name(n), mosquitto_strerror(ret)); - - ret = mosquitto_pub_topic_check(m->publish); - if (ret != MOSQ_ERR_SUCCESS) - error("Invalid publish topic: '%s' for node %s: %s", m->publish, node_name(n), mosquitto_strerror(ret)); - - return 0; -} - - - -int mqtt_unthreaded_start(struct node *n) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - m->client = mosquitto_new(n->name, 0, (void *) n); - if (!m->client) - return -1; - - if (m->username && m->password) { - ret = mosquitto_username_pw_set(m->client, m->username, m->password); - if (ret) - goto mosquitto_error; - } - - if (m->ssl.enabled) { - ret = mosquitto_tls_set(m->client, m->ssl.cafile, m->ssl.capath, m->ssl.certfile, m->ssl.keyfile, nullptr); - if (ret) - goto mosquitto_error; - - ret = mosquitto_tls_insecure_set(m->client, m->ssl.insecure); - if (ret) - goto mosquitto_error; - } - - mosquitto_log_callback_set(m->client, mqtt_unthreaded_log_cb); - mosquitto_connect_callback_set(m->client, mqtt_unthreaded_connect_cb); - mosquitto_disconnect_callback_set(m->client, mqtt_unthreaded_disconnect_cb); - mosquitto_message_callback_set(m->client, mqtt_unthreaded_message_cb); - mosquitto_subscribe_callback_set(m->client, mqtt_unthreaded_subscribe_cb); - - ret = io_init(&m->io, m->format, &n->in.signals, SAMPLE_HAS_ALL & ~SAMPLE_HAS_OFFSET); - if (ret) - return ret; - - ret = io_check(&m->io); - if (ret) - return ret; - - ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals)), &memory_hugepage); - if (ret) - return ret; - - ret = queue_signalled_init(&m->queue, 1024, &memory_hugepage, 0); - if (ret) - return ret; - - ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); - if (ret) - goto mosquitto_error; - - //The following while loop exists upon completion of subscibe procedure - while(n->state == STATE_PREPARED){ - mqtt_unthreaded_loop(n); - } - - return 0; - -mosquitto_error: - warning("MQTT: %s", mosquitto_strerror(ret)); - - return ret; -} - -int mqtt_unthreaded_stop(struct node *n) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - ret = mosquitto_disconnect(m->client); - if (ret) - goto mosquitto_error; - - ret = io_destroy(&m->io); - if (ret) - return ret; - - return 0; - -mosquitto_error: - warning("MQTT: %s", mosquitto_strerror(ret)); - - return ret; -} - -int mqtt_unthreaded_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) -{ - int pulled=0; - mqtt_unthreaded_loop(n); - - if(cnt > 0){ - struct mqtt *m = (struct mqtt *) n->_vd; - struct sample *smpt[cnt]; - pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt); - - sample_copy_many(smps, smpt, pulled); - sample_decref_many(smpt, pulled); - } - return pulled; -} - -int mqtt_unthreaded_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) -{ - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - size_t wbytes; - - char data[1500]; - - ret = io_sprint(&m->io, data, sizeof(data), &wbytes, smps, cnt); - if (ret < 0) - return ret; - - if (m->publish) { - ret = mosquitto_publish(m->client, nullptr /* mid */, m->publish, wbytes, data, m->qos, m->retain); - if (ret != MOSQ_ERR_SUCCESS) { - warning("MQTT: publish failed for node %s: %s", node_name(n), mosquitto_strerror(ret)); - return -abs(ret); - } - } - else - warning("MQTT: no publish for node %s possible because no publish topic is given", node_name(n)); - - mqtt_unthreaded_loop(n); - - return cnt; -} - -int mqtt_unthreaded_loop(struct node *n){ - - int ret; - struct mqtt *m = (struct mqtt *) n->_vd; - - //Carry out network operations in a synchronous way. - ret = mosquitto_loop(m->client, 0, 1); - if(ret){ - warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(n), mosquitto_strerror(ret)); - ret = mosquitto_reconnect(m->client); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: reconnection to broker failed for node %s: %s", node_name(n), mosquitto_strerror(ret)); - } - else{ - warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(n), mosquitto_strerror(ret)); - } - ret = mosquitto_loop(m->client, -1, 1); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: persisting connection error for node %s: %s", node_name(n), mosquitto_strerror(ret)); - } - } - - return 0; -} - -int mqtt_unthreaded_poll_fds(struct node *n, int fds[]) -{ - struct mqtt *m = (struct mqtt *) n->_vd; - - fds[0] = queue_signalled_fd(&m->queue); - - return 1; -} - -static struct plugin p; - -__attribute__((constructor(110))) -static void register_plugin() { - if (plugins.state == STATE_DESTROYED) - vlist_init(&plugins); - - p.name = "mqtt_unthreaded"; - p.description = "Message Queuing Telemetry Transport (libmosquitto), unthreaded variant"; - p.type = PLUGIN_TYPE_NODE; - p.node.instances.state = STATE_DESTROYED; - p.node.vectorize = 0; - p.node.size = sizeof(struct mqtt); - p.node.type.start = mqtt_type_start; - p.node.type.stop = mqtt_type_stop; - p.node.destroy = mqtt_destroy; - p.node.parse = mqtt_parse; - p.node.check = mqtt_unthreaded_check; - p.node.print = mqtt_print; - p.node.start = mqtt_unthreaded_start; - p.node.stop = mqtt_unthreaded_stop; - p.node.read = mqtt_unthreaded_read; - p.node.write = mqtt_unthreaded_write; - p.node.reverse = mqtt_reverse; - p.node.poll_fds = mqtt_unthreaded_poll_fds; - - vlist_init(&p.node.instances); - vlist_push(&plugins, &p); -} - -__attribute__((destructor(110))) -static void deregister_plugin() { - if (plugins.state != STATE_DESTROYED) - vlist_remove_all(&plugins, &p); -}