From 3f418073d49c7db13d34ab41a583226695d2d092 Mon Sep 17 00:00:00 2001 From: Felix Wege Date: Fri, 8 Jan 2021 15:16:15 +0100 Subject: [PATCH] mqtt: now uses mqtt_prepare() --- lib/nodes/mqtt.cpp | 70 +++++++++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 28 deletions(-) diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index a4ac99b4c..838e1f852 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -217,6 +217,47 @@ mosquitto_error: return ret; } +int mqtt_prepare(struct vnode *n){ + + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + if (m->username && m->password) { + ret = mosquitto_username_pw_set(m->client, m->username, m->password); + if (ret != MOSQ_ERR_SUCCESS) + goto mosquitto_error; + } + + if (m->ssl.enabled) { + ret = mosquitto_tls_set(m->client, m->ssl.cafile, m->ssl.capath, m->ssl.certfile, m->ssl.keyfile, nullptr); + if (ret != MOSQ_ERR_SUCCESS) + goto mosquitto_error; + + ret = mosquitto_tls_insecure_set(m->client, m->ssl.insecure); + if (ret != MOSQ_ERR_SUCCESS) + goto mosquitto_error; + } + + ret = io_init(&m->io, m->format, &n->in.signals, (int) SampleFlags::HAS_ALL & ~(int) SampleFlags::HAS_OFFSET); + if (ret) + return ret; + + ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals))); + if (ret) + return ret; + + ret = queue_signalled_init(&m->queue, 1024); + if (ret) + return ret; + + return 0; + +mosquitto_error: + warning("MQTT: %s", mosquitto_strerror(ret)); + + return ret; +} + int mqtt_parse(struct vnode *n, json_t *cfg) { int ret; @@ -374,34 +415,6 @@ int mqtt_start(struct vnode *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; - if (m->username && m->password) { - ret = mosquitto_username_pw_set(m->client, m->username, m->password); - if (ret != MOSQ_ERR_SUCCESS) - goto mosquitto_error; - } - - if (m->ssl.enabled) { - ret = mosquitto_tls_set(m->client, m->ssl.cafile, m->ssl.capath, m->ssl.certfile, m->ssl.keyfile, nullptr); - if (ret != MOSQ_ERR_SUCCESS) - goto mosquitto_error; - - ret = mosquitto_tls_insecure_set(m->client, m->ssl.insecure); - if (ret != MOSQ_ERR_SUCCESS) - goto mosquitto_error; - } - - ret = io_init(&m->io, m->format, &n->in.signals, (int) SampleFlags::HAS_ALL & ~(int) SampleFlags::HAS_OFFSET); - if (ret) - return ret; - - ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals))); - if (ret) - return ret; - - ret = queue_signalled_init(&m->queue, 1024); - if (ret) - return ret; - ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; @@ -561,6 +574,7 @@ static void register_plugin() { p.node.type.start = mqtt_type_start; p.node.type.stop = mqtt_type_stop; p.node.destroy = mqtt_destroy; + p.node.prepare = mqtt_prepare; p.node.parse = mqtt_parse; p.node.check = mqtt_check; p.node.print = mqtt_print;