diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index c22b1b6c3..152243188 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -34,39 +34,39 @@ static pthread_t thread; static void * mosquitto_loop_thread(void *ctx) { - int ret; - // set the cancel type of this thread to async - ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); - if (ret != 0) { - error("Unable to set cancel type of MQTT communication thread to asynchronous."); - return nullptr; - } + int ret; + // set the cancel type of this thread to async + ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + if (ret != 0) { + error("Unable to set cancel type of MQTT communication thread to asynchronous."); + return nullptr; + } - while(true){ - for (unsigned i = 0; i < vlist_length(&clients); i++) { - struct node *node = (struct node *) vlist_at(&clients, i); - struct mqtt *m = (struct mqtt *) node->_vd; + while(true){ + for (unsigned i = 0; i < vlist_length(&clients); i++) { + struct node *node = (struct node *) vlist_at(&clients, i); + struct mqtt *m = (struct mqtt *) node->_vd; - // execute mosquitto loop for this client - ret = mosquitto_loop(m->client, 0, 1); - if(ret){ - warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret)); - ret = mosquitto_reconnect(m->client); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } - else{ - warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } - ret = mosquitto_loop(m->client, -1, 1); - if(ret != MOSQ_ERR_SUCCESS){ - error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } - } - } // for loop - } // while(1) + // execute mosquitto loop for this client + ret = mosquitto_loop(m->client, 0, 1); + if(ret){ + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret)); + ret = mosquitto_reconnect(m->client); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret)); + } + else{ + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret)); + } + ret = mosquitto_loop(m->client, -1, 1); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret)); + } + } + } // for loop + } // while(1) - return nullptr; + return nullptr; } static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) @@ -372,7 +372,7 @@ int mqtt_start(struct node *n) // add client to global list of MQTT clients // so that thread can call mosquitto loop for this client - vlist_push(&clients, n); + vlist_push(&clients, n); return 0; @@ -387,10 +387,10 @@ int mqtt_stop(struct node *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; - // unregister client from global MQTT client list - // so that mosquitto loop is no longer invoked for this client - // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect - vlist_remove(&clients, vlist_index(&clients, n)); + // unregister client from global MQTT client list + // so that mosquitto loop is no longer invoked for this client + // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect + vlist_remove(&clients, vlist_index(&clients, n)); ret = mosquitto_disconnect(m->client); if (ret) @@ -412,20 +412,20 @@ int mqtt_type_start(villas::node::SuperNode *sn) { int ret; - ret = vlist_init(&clients); - if (ret) { - return ret; - } + ret = vlist_init(&clients); + if (ret) { + return ret; + } ret = mosquitto_lib_init(); if (ret) goto mosquitto_error; // start thread here to run mosquitto loop for registered clients - ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); - if (ret) { - return ret; - } + ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); + if (ret) { + return ret; + } return 0; @@ -440,15 +440,15 @@ int mqtt_type_stop() int ret; // stop thread here that executes mosquitto loop - ret = pthread_cancel(thread); - if (ret) - return ret; - debug( 3, "Called pthread_cancel() on MQTT communication management thread."); + ret = pthread_cancel(thread); + if (ret) + return ret; + debug( 3, "Called pthread_cancel() on MQTT communication management thread."); - ret = pthread_join(thread, nullptr); - if (ret) { - return ret; - } + ret = pthread_join(thread, nullptr); + if (ret) { + return ret; + } ret = mosquitto_lib_cleanup(); if (ret) @@ -458,7 +458,7 @@ int mqtt_type_stop() if (vlist_length(&clients) > 0) { error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); } - vlist_destroy(&clients, nullptr, false); + vlist_destroy(&clients, nullptr, false); return 0;