diff --git a/lib/nodes/mqtt.cpp b/lib/nodes/mqtt.cpp index 85906aa73..8c1bac69b 100644 --- a/lib/nodes/mqtt.cpp +++ b/lib/nodes/mqtt.cpp @@ -176,7 +176,7 @@ int mqtt_init(struct vnode *n) int ret; struct mqtt *m = (struct mqtt *) n->_vd; - m->client = mosquitto_new(n->name, 0, (void *) n); + m->client = mosquitto_new(nullptr, true, (void *) n); if (!m->client) return -1; @@ -314,6 +314,26 @@ int mqtt_check(struct vnode *n) return 0; } +int mqtt_prepare(struct vnode *n) +{ + int ret; + struct mqtt *m = (struct mqtt *) n->_vd; + + ret = io_init(&m->io, m->format, &n->in.signals, (int) SampleFlags::HAS_ALL & ~(int) SampleFlags::HAS_OFFSET); + if (ret) + return ret; + + ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals))); + if (ret) + return ret; + + ret = queue_signalled_init(&m->queue, 1024); + if (ret) + return ret; + + return 0; +} + char * mqtt_print(struct vnode *n) { struct mqtt *m = (struct mqtt *) n->_vd; @@ -394,18 +414,6 @@ int mqtt_start(struct vnode *n) goto mosquitto_error; } - ret = io_init(&m->io, m->format, &n->in.signals, (int) SampleFlags::HAS_ALL & ~(int) SampleFlags::HAS_OFFSET); - if (ret) - return ret; - - ret = pool_init(&m->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals))); - if (ret) - return ret; - - ret = queue_signalled_init(&m->queue, 1024); - if (ret) - return ret; - ret = mosquitto_connect(m->client, m->host, m->port, m->keepalive); if (ret != MOSQ_ERR_SUCCESS) goto mosquitto_error; @@ -567,6 +575,7 @@ static void register_plugin() { p.node.destroy = mqtt_destroy; p.node.parse = mqtt_parse; p.node.check = mqtt_check; + p.node.prepare = mqtt_prepare; p.node.print = mqtt_print; p.node.init = mqtt_init; p.node.destroy = mqtt_destroy;