diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index b88cab9d7..1e69d7ab4 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -37,36 +37,36 @@ static pthread_t thread; static void * mosquitto_loop_thread(void *ctx) { int ret; - // set the cancel type of this thread to async + + // Set the cancel type of this thread to async ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); if (ret != 0) { error("Unable to set cancel type of MQTT communication thread to asynchronous."); return nullptr; } - while(true){ + while (true) { for (unsigned i = 0; i < vlist_length(&clients); i++) { struct node *node = (struct node *) vlist_at(&clients, i); struct mqtt *m = (struct mqtt *) node->_vd; - // execute mosquitto loop for this client + // Execute mosquitto loop for this client ret = mosquitto_loop(m->client, 0, 1); - if(ret){ + if (ret) { warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret)); + ret = mosquitto_reconnect(m->client); - if(ret != MOSQ_ERR_SUCCESS){ + if (ret != MOSQ_ERR_SUCCESS) error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } - else{ + else warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } + ret = mosquitto_loop(m->client, -1, 1); - if(ret != MOSQ_ERR_SUCCESS){ + if (ret != MOSQ_ERR_SUCCESS) error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret)); - } } - } // for loop - } // while(1) + } + } return nullptr; } @@ -141,6 +141,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct warning(" Payload: %s", (char *) msg->payload); return; } + if (ret == 0) { debug(4, "MQTT: skip empty message for node %s", node_name(n)); sample_decref_many(smps, n->in.vectorize); @@ -372,7 +373,7 @@ int mqtt_start(struct node *n) if (ret) goto mosquitto_error; - // add client to global list of MQTT clients + // Add client to global list of MQTT clients // so that thread can call mosquitto loop for this client vlist_push(&clients, n); @@ -389,7 +390,7 @@ int mqtt_stop(struct node *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; - // unregister client from global MQTT client list + // Unregister client from global MQTT client list // so that mosquitto loop is no longer invoked for this client // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect vlist_remove(&clients, vlist_index(&clients, n)); @@ -423,7 +424,7 @@ int mqtt_type_start(villas::node::SuperNode *sn) if (ret) goto mosquitto_error; - // start thread here to run mosquitto loop for registered clients + // Start thread here to run mosquitto loop for registered clients ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); if (ret) { return ret; @@ -441,7 +442,7 @@ int mqtt_type_stop() { int ret; - // stop thread here that executes mosquitto loop + // Stop thread here that executes mosquitto loop ret = pthread_cancel(thread); if (ret) return ret; @@ -456,7 +457,7 @@ int mqtt_type_stop() if (ret) goto mosquitto_error; - // when this is called the list of clients should be empty + // When this is called the list of clients should be empty if (vlist_length(&clients) > 0) { error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); }