1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

mqtt: fix some bugs in threading mode

This commit is contained in:
Steffen Vogel 2018-05-24 09:08:20 +02:00
parent 46c4713767
commit 9e7bde5efa

View file

@ -28,10 +28,13 @@
#include <villas/utils.h>
#include <villas/format_type.h>
//#define MQTT_THREAD 1
#ifdef MQTT_THREAD
#include <pthread.h>
static struct {
pthread_t thread;
pthread_mutex_t mutex;
int length;
struct pollfd *fds;
@ -46,7 +49,7 @@ static pthread_t thread;
static int mqtt_register_client(struct mosquitto *mosq)
{
// pthread_mutex_lock(&polling.mutex);
pthread_mutex_lock(&polling.mutex);
/* Add this client to the pollfd list */
int i = polling.length++;
@ -55,7 +58,7 @@ static int mqtt_register_client(struct mosquitto *mosq)
polling.clients = realloc(polling.clients, polling.length * sizeof(struct mosquitto *));
if (!polling.fds || !polling.clients) {
// pthread_mutex_unlock(&polling.mutex);
pthread_mutex_unlock(&polling.mutex);
return -1;
}
@ -63,14 +66,16 @@ static int mqtt_register_client(struct mosquitto *mosq)
polling.fds[i].events = POLLIN;
polling.fds[i].fd = mosquitto_socket(mosq);
// pthread_mutex_unlock(&polling.mutex);
pthread_mutex_unlock(&polling.mutex);
info("MQTT: registered new client");
return 0;
}
static int mqtt_unregister_client(struct mosquitto *mosq)
{
// pthread_mutex_lock(&polling.mutex);
pthread_mutex_lock(&polling.mutex);
/* Find client */
int i;
@ -80,7 +85,7 @@ static int mqtt_unregister_client(struct mosquitto *mosq)
}
if (i >= polling.length) {
// pthread_mutex_unlock(&polling.mutex);
pthread_mutex_unlock(&polling.mutex);
return -1; /* Otherwise something wrong happened! */
}
@ -90,7 +95,9 @@ static int mqtt_unregister_client(struct mosquitto *mosq)
polling.length--;
// pthread_mutex_unlock(&polling.mutex);
pthread_mutex_unlock(&polling.mutex);
info("MQTT: deregistered client");
return 0;
}
@ -155,7 +162,9 @@ static void mqtt_connect_cb(struct mosquitto *mosq, void *userdata, int result)
debug(5, "MQTT: Node %s connected to broker %s", node_name(n), m->host);
// mqtt_register_client(mosq);
#ifdef MQTT_THREAD
mqtt_register_client(mosq);
#endif
}
static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int result)
@ -165,7 +174,9 @@ static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int resul
debug(5, "MQTT: Node %s disconnected from broker %s", node_name(n), m->host);
// mqtt_unregister_client(mosq)
#ifdef MQTT_THREAD
mqtt_unregister_client(mosq);
#endif
}
static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg)
@ -390,10 +401,6 @@ int mqtt_start(struct node *n)
if (ret)
return ret;
ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive);
if (ret)
return ret;
#ifdef MQTT_THREAD
mqtt_register_client(m->client);
#else
@ -402,6 +409,10 @@ int mqtt_start(struct node *n)
return ret;
#endif
ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive);
if (ret)
return ret;
ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos);
if (ret)
return ret;