From 15b815112124ee08a229e297350c607622750791 Mon Sep 17 00:00:00 2001
From: Sonja Happ <sonja.happ@eonerc.rwth-aachen.de>
Date: Thu, 12 Sep 2019 17:05:18 +0200
Subject: [PATCH] use a thread per process to execute mosquitto loop,
 contributes to #248

---
 lib/nodes/mqtt.cpp | 73 ++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 67 insertions(+), 6 deletions(-)

diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp
index a1cf96595..5ab9185f6 100644
--- a/lib/nodes/mqtt.cpp
+++ b/lib/nodes/mqtt.cpp
@@ -28,6 +28,40 @@
 #include <villas/utils.hpp>
 #include <villas/format_type.h>
 
+// Each process has a list of clients for which a thread invokes the mosquitto loop
+static struct vlist clients;
+static pthread_t thread;
+
+static void * mosquitto_loop_thread(void *ctx)
+{
+    int ret;
+    while(true){
+        for (unsigned i = 0; i < vlist_length(&clients); i++) {
+            struct node *c = (struct node *) vlist_at(&clients, i);
+            struct mqtt *m = (struct mqtt *) c->_vd;
+
+            // execute mosquitto loop for this client
+            ret = mosquitto_loop(m->client, 0, 1);
+            if(ret){
+                warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(c), mosquitto_strerror(ret));
+                ret = mosquitto_reconnect(m->client);
+                if(ret != MOSQ_ERR_SUCCESS){
+                    error("MQTT: reconnection to broker failed for node %s: %s", node_name(c), mosquitto_strerror(ret));
+                }
+                else{
+                    warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(c), mosquitto_strerror(ret));
+                }
+                ret = mosquitto_loop(m->client, -1, 1);
+                if(ret != MOSQ_ERR_SUCCESS){
+                    error("MQTT: persisting connection error for node %s: %s", node_name(c), mosquitto_strerror(ret));
+                }
+            }
+        } // for loop
+    } // while(1)
+
+    return nullptr;
+}
+
 static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str)
 {
 	switch (level) {
@@ -329,9 +363,9 @@ int mqtt_start(struct node *n)
 	if (ret)
 		goto mosquitto_error;
 
-	ret = mosquitto_loop_start(m->client);
-	if (ret)
-		goto mosquitto_error;
+	// add client to global list of MQTT clients
+	// so that thread can call mosquitto loop for this client
+    vlist_push(&clients, n);
 
 	return 0;
 
@@ -350,9 +384,9 @@ int mqtt_stop(struct node *n)
 	if (ret)
 		goto mosquitto_error;
 
-	ret = mosquitto_loop_stop(m->client, 0);
-	if (ret)
-		goto mosquitto_error;
+	// unregister client from global MQTT client list
+	// so that mosquitto loop is no longer invoked  for this client
+	vlist_remove_all(&clients, n);
 
 	ret = io_destroy(&m->io);
 	if (ret)
@@ -370,10 +404,21 @@ int mqtt_type_start(villas::node::SuperNode *sn)
 {
 	int ret;
 
+    ret = vlist_init(&clients);
+    if (ret) {
+        return ret;
+    }
+
 	ret = mosquitto_lib_init();
 	if (ret)
 		goto mosquitto_error;
 
+	// start thread here to run mosquitto loop for registered clients
+    ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr);
+    if (ret) {
+        return ret;
+    }
+
 	return 0;
 
 mosquitto_error:
@@ -386,10 +431,26 @@ int mqtt_type_stop()
 {
 	int ret;
 
+	// stop thread here that executes mosquitto loop
+    ret = pthread_cancel(thread);
+    if (ret)
+        return ret;
+
+    ret = pthread_join(thread, nullptr);
+    if (ret) {
+        return ret;
+    }
+
 	ret = mosquitto_lib_cleanup();
 	if (ret)
 		goto mosquitto_error;
 
+	// when this is called the list of clients should be empty
+	if (vlist_length(&clients) > 0) {
+	    error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!");
+	}
+    vlist_destroy(&clients, nullptr, false);
+
 	return 0;
 
 mosquitto_error: