diff --git a/common b/common index 43fd33c9c..793427ac7 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit 43fd33c9c77db22b93f65c92a1acb25c141d42a3 +Subproject commit 793427ac7f06e8a70509264e9688a024fca9a7c9 diff --git a/lib/node.cpp b/lib/node.cpp index 9250f2cd5..186533bc8 100644 --- a/lib/node.cpp +++ b/lib/node.cpp @@ -54,7 +54,7 @@ int node_init(struct node *n, struct node_type *vt) n->_vt = vt; n->_vd = alloc(vt->size); - n->stats = nullptr; + //n->stats = nullptr; n->name = nullptr; n->_name = nullptr; n->_name_long = nullptr; diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index 9cee077d8..1e69d7ab4 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -30,6 +30,47 @@ using namespace villas::utils; +// Each process has a list of clients for which a thread invokes the mosquitto loop +static struct vlist clients; +static pthread_t thread; + +static void * mosquitto_loop_thread(void *ctx) +{ + int ret; + + // Set the cancel type of this thread to async + ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + if (ret != 0) { + error("Unable to set cancel type of MQTT communication thread to asynchronous."); + return nullptr; + } + + while (true) { + for (unsigned i = 0; i < vlist_length(&clients); i++) { + struct node *node = (struct node *) vlist_at(&clients, i); + struct mqtt *m = (struct mqtt *) node->_vd; + + // Execute mosquitto loop for this client + ret = mosquitto_loop(m->client, 0, 1); + if (ret) { + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(node), mosquitto_strerror(ret)); + + ret = mosquitto_reconnect(m->client); + if (ret != MOSQ_ERR_SUCCESS) + error("MQTT: reconnection to broker failed for node %s: %s", node_name(node), mosquitto_strerror(ret)); + else + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(node), mosquitto_strerror(ret)); + + ret = mosquitto_loop(m->client, -1, 1); + if (ret != MOSQ_ERR_SUCCESS) + error("MQTT: persisting connection error for node %s: %s", node_name(node), mosquitto_strerror(ret)); + } + } + } + + return nullptr; +} + static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) { switch (level) { @@ -100,6 +141,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct warning(" Payload: %s", (char *) msg->payload); return; } + if (ret == 0) { debug(4, "MQTT: skip empty message for node %s", node_name(n)); sample_decref_many(smps, n->in.vectorize); @@ -331,9 +373,9 @@ int mqtt_start(struct node *n) if (ret) goto mosquitto_error; - ret = mosquitto_loop_start(m->client); - if (ret) - goto mosquitto_error; + // Add client to global list of MQTT clients + // so that thread can call mosquitto loop for this client + vlist_push(&clients, n); return 0; @@ -348,11 +390,12 @@ int mqtt_stop(struct node *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; - ret = mosquitto_disconnect(m->client); - if (ret) - goto mosquitto_error; + // Unregister client from global MQTT client list + // so that mosquitto loop is no longer invoked for this client + // important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect + vlist_remove(&clients, vlist_index(&clients, n)); - ret = mosquitto_loop_stop(m->client, 0); + ret = mosquitto_disconnect(m->client); if (ret) goto mosquitto_error; @@ -372,10 +415,21 @@ int mqtt_type_start(villas::node::SuperNode *sn) { int ret; + ret = vlist_init(&clients); + if (ret) { + return ret; + } + ret = mosquitto_lib_init(); if (ret) goto mosquitto_error; + // Start thread here to run mosquitto loop for registered clients + ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); + if (ret) { + return ret; + } + return 0; mosquitto_error: @@ -388,10 +442,27 @@ int mqtt_type_stop() { int ret; + // Stop thread here that executes mosquitto loop + ret = pthread_cancel(thread); + if (ret) + return ret; + debug( 3, "Called pthread_cancel() on MQTT communication management thread."); + + ret = pthread_join(thread, nullptr); + if (ret) { + return ret; + } + ret = mosquitto_lib_cleanup(); if (ret) goto mosquitto_error; + // When this is called the list of clients should be empty + if (vlist_length(&clients) > 0) { + error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); + } + vlist_destroy(&clients, nullptr, false); + return 0; mosquitto_error: