diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index 8e1ddbed1..dac739a6d 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -20,8 +20,9 @@ * along with this program. If not, see . *********************************************************************************/ -#include #include +#include +#include #include #include @@ -33,50 +34,6 @@ using namespace villas; using namespace villas::node; using namespace villas::utils; -// Each process has a list of clients for which a thread invokes the mosquitto loop -static std::list clients; -static std::mutex clients_lock; - -static pthread_t thread; -static Logger logger; - -static -void * mosquitto_loop_thread(void *ctx) -{ - int ret; - - auto logger = logging.get("mqtt"); - - // Set the cancel type of this thread to async - ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); - if (ret != 0) - throw RuntimeError("Unable to set cancel type of MQTT communication thread to asynchronous."); - - while (true) { - std::lock_guard guard(clients_lock); - - for (auto *client : clients) { - // Execute mosquitto loop for this client - ret = mosquitto_loop(client, 0, 1); - if (ret) { - logger->warn("Connection error: {}, attempting reconnect", mosquitto_strerror(ret)); - - ret = mosquitto_reconnect(client); - if (ret != MOSQ_ERR_SUCCESS) - logger->warn("Reconnection to broker failed: {}", mosquitto_strerror(ret)); - else - logger->warn("Successfully reconnected to broker: {}", mosquitto_strerror(ret)); - - ret = mosquitto_loop(client, 0, 1); - if (ret != MOSQ_ERR_SUCCESS) - logger->warn("Persisting connection error: {}", mosquitto_strerror(ret)); - } - } - } - - return nullptr; -} - static void mqtt_log_cb(struct mosquitto *mosq, void *ctx, int level, const char *str) { @@ -185,17 +142,12 @@ int villas::node::mqtt_reverse(NodeCompat *n) int villas::node::mqtt_init(NodeCompat *n) { - int ret; auto *m = n->getData(); m->client = mosquitto_new(nullptr, true, (void *) n); if (!m->client) return -1; - ret = mosquitto_threaded_set(m->client, true); - if (ret) - goto mosquitto_error; - mosquitto_log_callback_set(m->client, mqtt_log_cb); mosquitto_connect_callback_set(m->client, mqtt_connect_cb); mosquitto_disconnect_callback_set(m->client, mqtt_disconnect_cb); @@ -227,11 +179,6 @@ int villas::node::mqtt_init(NodeCompat *n) m->ssl.ciphers = nullptr; return 0; - -mosquitto_error: - n->logger->warn("{}", mosquitto_strerror(ret)); - - return ret; } int villas::node::mqtt_parse(NodeCompat *n, json_t *json) @@ -447,12 +394,9 @@ int villas::node::mqtt_start(NodeCompat *n) if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; - // Add client to global list of MQTT clients - // so that thread can call mosquitto loop for this client - { - std::lock_guard guard(clients_lock); - clients.push_back(m->client); - } + ret = mosquitto_loop_start(m->client); + if (ret != MOSQ_ERR_SUCCESS) + goto mosquitto_error; return 0; @@ -467,18 +411,14 @@ int villas::node::mqtt_stop(NodeCompat *n) int ret; auto *m = n->getData(); - // Unregister client from global MQTT client list - // so that mosquitto loop is no longer invoked for this client - // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect - { - std::lock_guard guard(clients_lock); - clients.remove(m->client); - } - ret = mosquitto_disconnect(m->client); if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; + ret = mosquitto_loop_stop(m->client, false); + if (ret != MOSQ_ERR_SUCCESS) + goto mosquitto_error; + ret = queue_signalled_close(&m->queue); if (ret) return ret; @@ -495,20 +435,14 @@ int villas::node::mqtt_type_start(villas::node::SuperNode *sn) { int ret; - logger = logging.get("node:mqtt"); - ret = mosquitto_lib_init(); if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; - // Start thread here to run mosquitto loop for registered clients - ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); - if (ret) - return ret; - return 0; mosquitto_error: + auto logger = logging.get("node:mqtt"); logger->warn("{}", mosquitto_strerror(ret)); return ret; @@ -518,28 +452,14 @@ int villas::node::mqtt_type_stop() { int ret; - // Stop thread here that executes mosquitto loop - ret = pthread_cancel(thread); - if (ret) - return ret; - - logger->debug("Called pthread_cancel() on MQTT communication management thread."); - - ret = pthread_join(thread, nullptr); - if (ret) - return ret; - ret = mosquitto_lib_cleanup(); if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; - // When this is called the list of clients should be empty - if (clients.size() > 0) - throw RuntimeError("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); - return 0; mosquitto_error: + auto logger = logging.get("node:mqtt"); logger->warn("{}", mosquitto_strerror(ret)); return ret;