diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index 9cee077d8..dc169bf01 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -30,6 +30,40 @@ using namespace villas::utils; +// Each process has a list of clients for which a thread invokes the mosquitto loop +static struct vlist clients; +static pthread_t thread; + +static void * mosquitto_loop_thread(void *ctx) +{ + int ret; + while(true){ + for (unsigned i = 0; i < vlist_length(&clients); i++) { + struct node *c = (struct node *) vlist_at(&clients, i); + struct mqtt *m = (struct mqtt *) c->_vd; + + // execute mosquitto loop for this client + ret = mosquitto_loop(m->client, 0, 1); + if(ret){ + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(c), mosquitto_strerror(ret)); + ret = mosquitto_reconnect(m->client); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: reconnection to broker failed for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + else{ + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + ret = mosquitto_loop(m->client, -1, 1); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: persisting connection error for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + } + } // for loop + } // while(1) + + return nullptr; +} + static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) { switch (level) { @@ -331,9 +365,9 @@ int mqtt_start(struct node *n) if (ret) goto mosquitto_error; - ret = mosquitto_loop_start(m->client); - if (ret) - goto mosquitto_error; + // add client to global list of MQTT clients + // so that thread can call mosquitto loop for this client + vlist_push(&clients, n); return 0; @@ -352,9 +386,9 @@ int mqtt_stop(struct node *n) if (ret) goto mosquitto_error; - ret = mosquitto_loop_stop(m->client, 0); - if (ret) - goto mosquitto_error; + // unregister client from global MQTT client list + // so that mosquitto loop is no longer invoked for this client + vlist_remove_all(&clients, n); ret = io_destroy(&m->io); if (ret) @@ -372,10 +406,21 @@ int mqtt_type_start(villas::node::SuperNode *sn) { int ret; + ret = vlist_init(&clients); + if (ret) { + return ret; + } + ret = mosquitto_lib_init(); if (ret) goto mosquitto_error; + // start thread here to run mosquitto loop for registered clients + ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); + if (ret) { + return ret; + } + return 0; mosquitto_error: @@ -388,10 +433,26 @@ int mqtt_type_stop() { int ret; + // stop thread here that executes mosquitto loop + ret = pthread_cancel(thread); + if (ret) + return ret; + + ret = pthread_join(thread, nullptr); + if (ret) { + return ret; + } + ret = mosquitto_lib_cleanup(); if (ret) goto mosquitto_error; + // when this is called the list of clients should be empty + if (vlist_length(&clients) > 0) { + error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); + } + vlist_destroy(&clients, nullptr, false); + return 0; mosquitto_error: