1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

mqtt: remove broken threading mode and fixed a race condition

This commit is contained in:
Steffen Vogel 2018-05-26 02:23:32 +02:00
parent f054d92845
commit 851bfa78b4

View file

@ -28,110 +28,6 @@
#include <villas/utils.h>
#include <villas/format_type.h>
//#define MQTT_THREAD 1
#ifdef MQTT_THREAD
#include <pthread.h>
static struct {
pthread_t thread;
pthread_mutex_t mutex;
int length;
struct pollfd *fds;
struct mosquitto **clients;
} polling = {
.length = 0,
.fds = NULL,
.clients = NULL
};
static pthread_t thread;
static int mqtt_register_client(struct mosquitto *mosq)
{
pthread_mutex_lock(&polling.mutex);
/* Add this client to the pollfd list */
int i = polling.length++;
polling.fds = realloc(polling.fds, polling.length * sizeof(struct pollfd));
polling.clients = realloc(polling.clients, polling.length * sizeof(struct mosquitto *));
if (!polling.fds || !polling.clients) {
pthread_mutex_unlock(&polling.mutex);
return -1;
}
polling.clients[i] = mosq;
polling.fds[i].events = POLLIN;
polling.fds[i].fd = mosquitto_socket(mosq);
pthread_mutex_unlock(&polling.mutex);
info("MQTT: registered new client");
return 0;
}
static int mqtt_unregister_client(struct mosquitto *mosq)
{
pthread_mutex_lock(&polling.mutex);
/* Find client */
int i;
for (i = 0; i < polling.length; i++) {
if (polling.clients[i] == mosq)
break;
}
if (i >= polling.length) {
pthread_mutex_unlock(&polling.mutex);
return -1; /* Otherwise something wrong happened! */
}
/* Remove this client to the pollfd list */
memmove(polling.fds + i, polling.fds + i + 1, (polling.length - i - 1) * sizeof(struct pollfd));
memmove(polling.clients + i, polling.clients + i + 1, (polling.length - i - 1) * sizeof(struct mosquitto *));
polling.length--;
pthread_mutex_unlock(&polling.mutex);
info("MQTT: deregistered client");
return 0;
}
static void * mqtt_thread(void *ctx)
{
int ret;
debug(5, "MQTT: started thread");
while (1) {
// pthread_mutex_lock(&polling.mutex);
debug(5, "MQTT: Polling on %d clients", polling.length);
ret = poll(polling.fds, polling.length, -1);
if (ret < 0)
serror("Failed to poll");
for (int i = 0; i < polling.length; i++) {
if (polling.fds[i].revents & POLLIN) {
ret = mosquitto_loop(polling.clients[i], -1, 1);
if (ret)
warn("MQTT: Loop failed for clients %p!", polling.clients[i]);
}
}
// pthread_mutex_unlock(&polling.mutex);
}
return NULL;
}
#endif
static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
switch (level) {
@ -160,11 +56,13 @@ static void mqtt_connect_cb(struct mosquitto *mosq, void *userdata, int result)
struct node *n = (struct node *) userdata;
struct mqtt *m = (struct mqtt *) n->_vd;
int ret;
debug(5, "MQTT: Node %s connected to broker %s", node_name(n), m->host);
#ifdef MQTT_THREAD
mqtt_register_client(mosq);
#endif
ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos);
if (ret)
warn("MQTT: failed to subscribe to topic '%s' for node %s", m->subscribe, node_name(n));
}
static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int result)
@ -173,10 +71,6 @@ static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int resul
struct mqtt *m = (struct mqtt *) n->_vd;
debug(5, "MQTT: Node %s disconnected from broker %s", node_name(n), m->host);
#ifdef MQTT_THREAD
mqtt_unregister_client(mosq);
#endif
}
static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg)
@ -195,8 +89,15 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct
}
ret = io_sscan(&m->io, msg->payload, msg->payloadlen, NULL, &smp, 1);
if (ret != 1)
if (ret < 0) {
warn("MQTT: Node %s received an invalid message", node_name(n));
warn(" Payload: %s", (char *) msg->payload);
return;
}
if (ret != 1) {
debug(4, "MQTT: skip empty message for node %s", node_name(n));
return;
}
queue_signalled_push(&m->queue, (void *) smp);
}
@ -293,6 +194,15 @@ int mqtt_parse(struct node *n, json_t *cfg)
if (!m->format)
error("Invalid format '%s' for node %s", format, node_name(n));
// Some checks
ret = mosquitto_sub_topic_check(m->subscribe);
if (ret != MOSQ_ERR_SUCCESS)
error("Invalid subscribe topic: '%s' for node %s", m->subscribe, node_name(n));
ret = mosquitto_pub_topic_check(m->publish);
if (ret != MOSQ_ERR_SUCCESS)
error("Invalid publish topic: '%s' for node %s", m->publish, node_name(n));
return 0;
}
@ -364,12 +274,6 @@ int mqtt_start(struct node *n)
if (!m->client)
return -1;
#ifdef MQTT_THREAD
ret = mosquitto_threaded_set(m->client, 1);
if (ret)
return ret;
#endif
if (m->username && m->password) {
ret = mosquitto_username_pw_set(m->client, m->username, m->password);
if (ret)
@ -404,19 +308,11 @@ int mqtt_start(struct node *n)
if (ret)
return ret;
#ifdef MQTT_THREAD
mqtt_register_client(m->client);
#else
ret = mosquitto_loop_start(m->client);
if (ret)
return ret;
#endif
ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive);
if (ret)
return ret;
ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos);
ret = mosquitto_loop_start(m->client);
if (ret)
return ret;
@ -432,13 +328,9 @@ int mqtt_stop(struct node *n)
if (ret)
return ret;
#ifdef MQTT_THREAD
mqtt_unregister_client(m->client);
#else
ret = mosquitto_loop_stop(m->client, 0);
if (ret)
return ret;
#endif
return 0;
}
@ -451,16 +343,6 @@ int mqtt_init()
if (ret)
return ret;
#ifdef MQTT_THREAD
ret = pthread_mutex_init(&polling.mutex, NULL);
if (ret)
return ret;
ret = pthread_create(&thread, NULL, mqtt_thread, NULL);
if (ret)
return ret;
#endif
return 0;
}
@ -468,20 +350,6 @@ int mqtt_deinit()
{
int ret;
#ifdef MQTT_THREAD
ret = pthread_cancel(thread);
if (ret)
return ret;
ret = pthread_join(thread, NULL);
if (ret)
return ret;
ret = pthread_mutex_destroy(&polling.mutex);
if (ret)
return ret;
#endif
ret = mosquitto_lib_cleanup();
if (ret)
return ret;