diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index 455e0d419..17dca2585 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -28,10 +28,13 @@ #include #include +//#define MQTT_THREAD 1 + #ifdef MQTT_THREAD #include static struct { + pthread_t thread; pthread_mutex_t mutex; int length; struct pollfd *fds; @@ -46,7 +49,7 @@ static pthread_t thread; static int mqtt_register_client(struct mosquitto *mosq) { -// pthread_mutex_lock(&polling.mutex); + pthread_mutex_lock(&polling.mutex); /* Add this client to the pollfd list */ int i = polling.length++; @@ -55,7 +58,7 @@ static int mqtt_register_client(struct mosquitto *mosq) polling.clients = realloc(polling.clients, polling.length * sizeof(struct mosquitto *)); if (!polling.fds || !polling.clients) { -// pthread_mutex_unlock(&polling.mutex); + pthread_mutex_unlock(&polling.mutex); return -1; } @@ -63,14 +66,16 @@ static int mqtt_register_client(struct mosquitto *mosq) polling.fds[i].events = POLLIN; polling.fds[i].fd = mosquitto_socket(mosq); -// pthread_mutex_unlock(&polling.mutex); + pthread_mutex_unlock(&polling.mutex); + + info("MQTT: registered new client"); return 0; } static int mqtt_unregister_client(struct mosquitto *mosq) { -// pthread_mutex_lock(&polling.mutex); + pthread_mutex_lock(&polling.mutex); /* Find client */ int i; @@ -80,7 +85,7 @@ static int mqtt_unregister_client(struct mosquitto *mosq) } if (i >= polling.length) { -// pthread_mutex_unlock(&polling.mutex); + pthread_mutex_unlock(&polling.mutex); return -1; /* Otherwise something wrong happened! */ } @@ -90,7 +95,9 @@ static int mqtt_unregister_client(struct mosquitto *mosq) polling.length--; -// pthread_mutex_unlock(&polling.mutex); + pthread_mutex_unlock(&polling.mutex); + + info("MQTT: deregistered client"); return 0; } @@ -155,7 +162,9 @@ static void mqtt_connect_cb(struct mosquitto *mosq, void *userdata, int result) debug(5, "MQTT: Node %s connected to broker %s", node_name(n), m->host); -// mqtt_register_client(mosq); +#ifdef MQTT_THREAD + mqtt_register_client(mosq); +#endif } static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int result) @@ -165,7 +174,9 @@ static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int resul debug(5, "MQTT: Node %s disconnected from broker %s", node_name(n), m->host); -// mqtt_unregister_client(mosq) +#ifdef MQTT_THREAD + mqtt_unregister_client(mosq); +#endif } static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) @@ -390,10 +401,6 @@ int mqtt_start(struct node *n) if (ret) return ret; - ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); - if (ret) - return ret; - #ifdef MQTT_THREAD mqtt_register_client(m->client); #else @@ -402,6 +409,10 @@ int mqtt_start(struct node *n) return ret; #endif + ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); + if (ret) + return ret; + ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos); if (ret) return ret;