diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index a1cf96595..5ab9185f6 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -28,6 +28,40 @@ #include #include +// Each process has a list of clients for which a thread invokes the mosquitto loop +static struct vlist clients; +static pthread_t thread; + +static void * mosquitto_loop_thread(void *ctx) +{ + int ret; + while(true){ + for (unsigned i = 0; i < vlist_length(&clients); i++) { + struct node *c = (struct node *) vlist_at(&clients, i); + struct mqtt *m = (struct mqtt *) c->_vd; + + // execute mosquitto loop for this client + ret = mosquitto_loop(m->client, 0, 1); + if(ret){ + warning("MQTT: connection error for node %s: %s, attempting reconnect", node_name(c), mosquitto_strerror(ret)); + ret = mosquitto_reconnect(m->client); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: reconnection to broker failed for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + else{ + warning("MQTT: successfully reconnected to broker for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + ret = mosquitto_loop(m->client, -1, 1); + if(ret != MOSQ_ERR_SUCCESS){ + error("MQTT: persisting connection error for node %s: %s", node_name(c), mosquitto_strerror(ret)); + } + } + } // for loop + } // while(1) + + return nullptr; +} + static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) { switch (level) { @@ -329,9 +363,9 @@ int mqtt_start(struct node *n) if (ret) goto mosquitto_error; - ret = mosquitto_loop_start(m->client); - if (ret) - goto mosquitto_error; + // add client to global list of MQTT clients + // so that thread can call mosquitto loop for this client + vlist_push(&clients, n); return 0; @@ -350,9 +384,9 @@ int mqtt_stop(struct node *n) if (ret) goto mosquitto_error; - ret = mosquitto_loop_stop(m->client, 0); - if (ret) - goto mosquitto_error; + // unregister client from global MQTT client list + // so that mosquitto loop is no longer invoked for this client + vlist_remove_all(&clients, n); ret = io_destroy(&m->io); if (ret) @@ -370,10 +404,21 @@ int mqtt_type_start(villas::node::SuperNode *sn) { int ret; + ret = vlist_init(&clients); + if (ret) { + return ret; + } + ret = mosquitto_lib_init(); if (ret) goto mosquitto_error; + // start thread here to run mosquitto loop for registered clients + ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr); + if (ret) { + return ret; + } + return 0; mosquitto_error: @@ -386,10 +431,26 @@ int mqtt_type_stop() { int ret; + // stop thread here that executes mosquitto loop + ret = pthread_cancel(thread); + if (ret) + return ret; + + ret = pthread_join(thread, nullptr); + if (ret) { + return ret; + } + ret = mosquitto_lib_cleanup(); if (ret) goto mosquitto_error; + // when this is called the list of clients should be empty + if (vlist_length(&clients) > 0) { + error("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!"); + } + vlist_destroy(&clients, nullptr, false); + return 0; mosquitto_error: