diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index f9da98694..18eaf3f96 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -60,15 +60,13 @@ static void mqtt_connect_cb(struct mosquitto *mosq, void *userdata, int result) info("MQTT: Node %s connected to broker %s", node_name(n), m->host); - if(m->subscribe){ + if (m->subscribe) { ret = mosquitto_subscribe(m->client, NULL, m->subscribe, m->qos); if (ret) warn("MQTT: failed to subscribe to topic '%s' for node %s", m->subscribe, node_name(n)); } - else{ - warn("MQTT: no subscribe for node %s as no subscribe topic is given", node_name(n)); - } - + else + warn("MQTT: no subscribe for node %s as no subscribe topic is given", node_name(n)); } static void mqtt_disconnect_cb(struct mosquitto *mosq, void *userdata, int result) @@ -149,9 +147,11 @@ int mqtt_parse(struct node *n, json_t *cfg) json_error_t err; json_t *json_ssl = NULL; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s: s, s?: i, s?: i, s?: i, s?: b, s?: s, s?: s, s?: o }", - "publish", &publish, - "subscribe", &subscribe, + ret = json_unpack_ex(cfg, &err, 0, "{ s?: { s?: s }, s?: { s?: s }, s?: s, s: s, s?: i, s?: i, s?: i, s?: b, s?: s, s?: s, s?: o }", + "out", + "publish", &publish, + "in", + "subscribe", &subscribe, "format", &format, "host", &host, "port", &m->port, @@ -231,10 +231,10 @@ char * mqtt_print(struct node *n) strcatf(&buf, ", username=%s", m->username); if (m->publish) - strcatf(&buf, ", publish=%s", m->publish); + strcatf(&buf, ", out.publish=%s", m->publish); if (m->subscribe) - strcatf(&buf, ", subscribe=%s", m->subscribe); + strcatf(&buf, ", in.subscribe=%s", m->subscribe); return buf; } @@ -246,10 +246,6 @@ int mqtt_destroy(struct node *n) mosquitto_destroy(m->client); - ret = io_destroy(&m->io); - if (ret) - return ret; - ret = pool_destroy(&m->pool); if (ret) return ret; @@ -303,11 +299,15 @@ int mqtt_start(struct node *n) mosquitto_message_callback_set(m->client, mqtt_message_cb); mosquitto_subscribe_callback_set(m->client, mqtt_subscribe_cb); - ret = io_init(&m->io, m->format, n, SAMPLE_HAS_ALL); + ret = io_init(&m->io, m->format, &n->signals, SAMPLE_HAS_ALL); if (ret) return ret; - ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(n->samplelen), &memory_hugepage); + ret = io_check(&m->io); + if (ret) + return ret; + + ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(list_length(&n->signals)), &memory_hugepage); if (ret) return ret; @@ -339,6 +339,10 @@ int mqtt_stop(struct node *n) if (ret) return ret; + ret = io_destroy(&m->io); + if (ret) + return ret; + return 0; }