diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index f48106661..6ff753b16 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -28,110 +28,6 @@ #include #include -//#define MQTT_THREAD 1 - -#ifdef MQTT_THREAD -#include - -static struct { - pthread_t thread; - pthread_mutex_t mutex; - int length; - struct pollfd *fds; - struct mosquitto **clients; -} polling = { - .length = 0, - .fds = NULL, - .clients = NULL -}; - -static pthread_t thread; - -static int mqtt_register_client(struct mosquitto *mosq) -{ - pthread_mutex_lock(&polling.mutex); - - /* Add this client to the pollfd list */ - int i = polling.length++; - - polling.fds = realloc(polling.fds, polling.length * sizeof(struct pollfd)); - polling.clients = realloc(polling.clients, polling.length * sizeof(struct mosquitto *)); - - if (!polling.fds || !polling.clients) { - pthread_mutex_unlock(&polling.mutex); - return -1; - } - - polling.clients[i] = mosq; - polling.fds[i].events = POLLIN; - polling.fds[i].fd = mosquitto_socket(mosq); - - pthread_mutex_unlock(&polling.mutex); - - info("MQTT: registered new client"); - - return 0; -} - -static int mqtt_unregister_client(struct mosquitto *mosq) -{ - pthread_mutex_lock(&polling.mutex); - - /* Find client */ - int i; - for (i = 0; i < polling.length; i++) { - if (polling.clients[i] == mosq) - break; - } - - if (i >= polling.length) { - pthread_mutex_unlock(&polling.mutex); - return -1; /* Otherwise something wrong happened! */ - } - - /* Remove this client to the pollfd list */ - memmove(polling.fds + i, polling.fds + i + 1, (polling.length - i - 1) * sizeof(struct pollfd)); - memmove(polling.clients + i, polling.clients + i + 1, (polling.length - i - 1) * sizeof(struct mosquitto *)); - - polling.length--; - - pthread_mutex_unlock(&polling.mutex); - - info("MQTT: deregistered client"); - - return 0; -} - -static void * mqtt_thread(void *ctx) -{ - int ret; - - debug(5, "MQTT: started thread"); - - while (1) { -// pthread_mutex_lock(&polling.mutex); - - debug(5, "MQTT: Polling on %d clients", polling.length); - - ret = poll(polling.fds, polling.length, -1); - if (ret < 0) - serror("Failed to poll"); - - for (int i = 0; i < polling.length; i++) { - if (polling.fds[i].revents & POLLIN) { - ret = mosquitto_loop(polling.clients[i], -1, 1); - if (ret) - warn("MQTT: Loop failed for clients %p!", polling.clients[i]); - } - } - -// pthread_mutex_unlock(&polling.mutex); - } - - return NULL; -} -#endif - static void mqtt_log_cb(struct mosquitto *mosq, void *userdata, int level, const char *str) { switch (level) { @@ -160,11 +56,13 @@ static void mqtt_connect_cb(struct mosquitto *mosq, void *userdata, int result) struct node *n = (struct node *) userdata; struct mqtt *m = (struct mqtt *) n->_vd; + int ret; + debug(5, "MQTT: Node %s connected to broker %s", node_name(n), m->host); -#ifdef MQTT_THREAD - mqtt_register_client(mosq); -#endif + ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos); + if (ret) + warn("MQTT: failed to subscribe to topic '%s' for node %s", m->subscribe, node_name(n)); } static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int result) @@ -173,10 +71,6 @@ static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int resul struct mqtt *m = (struct mqtt *) n->_vd; debug(5, "MQTT: Node %s disconnected from broker %s", node_name(n), m->host); - -#ifdef MQTT_THREAD - mqtt_unregister_client(mosq); -#endif } static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *msg) @@ -195,8 +89,15 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct } ret = io_sscan(&m->io, msg->payload, msg->payloadlen, NULL, &smp, 1); - if (ret != 1) + if (ret < 0) { + warn("MQTT: Node %s received an invalid message", node_name(n)); + warn(" Payload: %s", (char *) msg->payload); return; + } + if (ret != 1) { + debug(4, "MQTT: skip empty message for node %s", node_name(n)); + return; + } queue_signalled_push(&m->queue, (void *) smp); } @@ -293,6 +194,15 @@ int mqtt_parse(struct node *n, json_t *cfg) if (!m->format) error("Invalid format '%s' for node %s", format, node_name(n)); + // Some checks + ret = mosquitto_sub_topic_check(m->subscribe); + if (ret != MOSQ_ERR_SUCCESS) + error("Invalid subscribe topic: '%s' for node %s", m->subscribe, node_name(n)); + + ret = mosquitto_pub_topic_check(m->publish); + if (ret != MOSQ_ERR_SUCCESS) + error("Invalid publish topic: '%s' for node %s", m->publish, node_name(n)); + return 0; } @@ -364,12 +274,6 @@ int mqtt_start(struct node *n) if (!m->client) return -1; -#ifdef MQTT_THREAD - ret = mosquitto_threaded_set(m->client, 1); - if (ret) - return ret; -#endif - if (m->username && m->password) { ret = mosquitto_username_pw_set(m->client, m->username, m->password); if (ret) @@ -404,19 +308,11 @@ int mqtt_start(struct node *n) if (ret) return ret; -#ifdef MQTT_THREAD - mqtt_register_client(m->client); -#else - ret = mosquitto_loop_start(m->client); - if (ret) - return ret; -#endif - ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); if (ret) return ret; - ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos); + ret = mosquitto_loop_start(m->client); if (ret) return ret; @@ -432,13 +328,9 @@ int mqtt_stop(struct node *n) if (ret) return ret; -#ifdef MQTT_THREAD - mqtt_unregister_client(m->client); -#else ret = mosquitto_loop_stop(m->client, 0); if (ret) return ret; -#endif return 0; } @@ -451,16 +343,6 @@ int mqtt_init() if (ret) return ret; -#ifdef MQTT_THREAD - ret = pthread_mutex_init(&polling.mutex, NULL); - if (ret) - return ret; - - ret = pthread_create(&thread, NULL, mqtt_thread, NULL); - if (ret) - return ret; -#endif - return 0; } @@ -468,20 +350,6 @@ int mqtt_deinit() { int ret; -#ifdef MQTT_THREAD - ret = pthread_cancel(thread); - if (ret) - return ret; - - ret = pthread_join(thread, NULL); - if (ret) - return ret; - - ret = pthread_mutex_destroy(&polling.mutex); - if (ret) - return ret; -#endif - ret = mosquitto_lib_cleanup(); if (ret) return ret;