mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
mqtt: use threaded libmosquitto interface
This commit is contained in:
parent
c6fe0201da
commit
04f3a77ec0
1 changed files with 11 additions and 91 deletions
|
@ -20,8 +20,9 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*********************************************************************************/
|
||||
|
||||
#include <cstring>
|
||||
#include <mutex>
|
||||
#include <algorithm>
|
||||
#include <cstring>
|
||||
#include <mosquitto.h>
|
||||
|
||||
#include <villas/node_compat.hpp>
|
||||
|
@ -33,50 +34,6 @@ using namespace villas;
|
|||
using namespace villas::node;
|
||||
using namespace villas::utils;
|
||||
|
||||
// Each process has a list of clients for which a thread invokes the mosquitto loop
|
||||
static std::list<mosquitto *> clients;
|
||||
static std::mutex clients_lock;
|
||||
|
||||
static pthread_t thread;
|
||||
static Logger logger;
|
||||
|
||||
static
|
||||
void * mosquitto_loop_thread(void *ctx)
|
||||
{
|
||||
int ret;
|
||||
|
||||
auto logger = logging.get("mqtt");
|
||||
|
||||
// Set the cancel type of this thread to async
|
||||
ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
|
||||
if (ret != 0)
|
||||
throw RuntimeError("Unable to set cancel type of MQTT communication thread to asynchronous.");
|
||||
|
||||
while (true) {
|
||||
std::lock_guard<std::mutex> guard(clients_lock);
|
||||
|
||||
for (auto *client : clients) {
|
||||
// Execute mosquitto loop for this client
|
||||
ret = mosquitto_loop(client, 0, 1);
|
||||
if (ret) {
|
||||
logger->warn("Connection error: {}, attempting reconnect", mosquitto_strerror(ret));
|
||||
|
||||
ret = mosquitto_reconnect(client);
|
||||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
logger->warn("Reconnection to broker failed: {}", mosquitto_strerror(ret));
|
||||
else
|
||||
logger->warn("Successfully reconnected to broker: {}", mosquitto_strerror(ret));
|
||||
|
||||
ret = mosquitto_loop(client, 0, 1);
|
||||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
logger->warn("Persisting connection error: {}", mosquitto_strerror(ret));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
static
|
||||
void mqtt_log_cb(struct mosquitto *mosq, void *ctx, int level, const char *str)
|
||||
{
|
||||
|
@ -185,17 +142,12 @@ int villas::node::mqtt_reverse(NodeCompat *n)
|
|||
|
||||
int villas::node::mqtt_init(NodeCompat *n)
|
||||
{
|
||||
int ret;
|
||||
auto *m = n->getData<struct mqtt>();
|
||||
|
||||
m->client = mosquitto_new(nullptr, true, (void *) n);
|
||||
if (!m->client)
|
||||
return -1;
|
||||
|
||||
ret = mosquitto_threaded_set(m->client, true);
|
||||
if (ret)
|
||||
goto mosquitto_error;
|
||||
|
||||
mosquitto_log_callback_set(m->client, mqtt_log_cb);
|
||||
mosquitto_connect_callback_set(m->client, mqtt_connect_cb);
|
||||
mosquitto_disconnect_callback_set(m->client, mqtt_disconnect_cb);
|
||||
|
@ -227,11 +179,6 @@ int villas::node::mqtt_init(NodeCompat *n)
|
|||
m->ssl.ciphers = nullptr;
|
||||
|
||||
return 0;
|
||||
|
||||
mosquitto_error:
|
||||
n->logger->warn("{}", mosquitto_strerror(ret));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int villas::node::mqtt_parse(NodeCompat *n, json_t *json)
|
||||
|
@ -447,12 +394,9 @@ int villas::node::mqtt_start(NodeCompat *n)
|
|||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
goto mosquitto_error;
|
||||
|
||||
// Add client to global list of MQTT clients
|
||||
// so that thread can call mosquitto loop for this client
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(clients_lock);
|
||||
clients.push_back(m->client);
|
||||
}
|
||||
ret = mosquitto_loop_start(m->client);
|
||||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
goto mosquitto_error;
|
||||
|
||||
return 0;
|
||||
|
||||
|
@ -467,18 +411,14 @@ int villas::node::mqtt_stop(NodeCompat *n)
|
|||
int ret;
|
||||
auto *m = n->getData<struct mqtt>();
|
||||
|
||||
// Unregister client from global MQTT client list
|
||||
// so that mosquitto loop is no longer invoked for this client
|
||||
// important to do that before disconnecting from broker, otherwise, mosquitto thread will attempt to reconnect
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(clients_lock);
|
||||
clients.remove(m->client);
|
||||
}
|
||||
|
||||
ret = mosquitto_disconnect(m->client);
|
||||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
goto mosquitto_error;
|
||||
|
||||
ret = mosquitto_loop_stop(m->client, false);
|
||||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
goto mosquitto_error;
|
||||
|
||||
ret = queue_signalled_close(&m->queue);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
@ -495,20 +435,14 @@ int villas::node::mqtt_type_start(villas::node::SuperNode *sn)
|
|||
{
|
||||
int ret;
|
||||
|
||||
logger = logging.get("node:mqtt");
|
||||
|
||||
ret = mosquitto_lib_init();
|
||||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
goto mosquitto_error;
|
||||
|
||||
// Start thread here to run mosquitto loop for registered clients
|
||||
ret = pthread_create(&thread, nullptr, mosquitto_loop_thread, nullptr);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
|
||||
mosquitto_error:
|
||||
auto logger = logging.get("node:mqtt");
|
||||
logger->warn("{}", mosquitto_strerror(ret));
|
||||
|
||||
return ret;
|
||||
|
@ -518,28 +452,14 @@ int villas::node::mqtt_type_stop()
|
|||
{
|
||||
int ret;
|
||||
|
||||
// Stop thread here that executes mosquitto loop
|
||||
ret = pthread_cancel(thread);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
logger->debug("Called pthread_cancel() on MQTT communication management thread.");
|
||||
|
||||
ret = pthread_join(thread, nullptr);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
ret = mosquitto_lib_cleanup();
|
||||
if (ret != MOSQ_ERR_SUCCESS)
|
||||
goto mosquitto_error;
|
||||
|
||||
// When this is called the list of clients should be empty
|
||||
if (clients.size() > 0)
|
||||
throw RuntimeError("List of MQTT clients contains elements at time of destruction. Call node_stop for each MQTT node before stopping node type!");
|
||||
|
||||
return 0;
|
||||
|
||||
mosquitto_error:
|
||||
auto logger = logging.get("node:mqtt");
|
||||
logger->warn("{}", mosquitto_strerror(ret));
|
||||
|
||||
return ret;
|
||||
|
|
Loading…
Add table
Reference in a new issue