diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index 115ee2035..5dee9d3f5 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -84,29 +84,29 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct int ret; struct node *n = (struct node *) userdata; struct mqtt *m = (struct mqtt *) n->_vd; - struct sample *smp; + struct sample *smps[n->in.vectorize]; debug(5, "MQTT: Node %s received a message of %d bytes from broker %s", node_name(n), msg->payloadlen, m->host); - smp = sample_alloc(&m->pool); - if (!smp) { + ret = sample_alloc_many(&m->pool, smps, n->in.vectorize); + if (ret<0) { warn("Pool underrun in subscriber of %s", node_name(n)); return; } - ret = io_sscan(&m->io, msg->payload, msg->payloadlen, NULL, &smp, 1); + ret = io_sscan(&m->io, msg->payload, msg->payloadlen, NULL, smps, n->in.vectorize); if (ret < 0) { warn("MQTT: Node %s received an invalid message", node_name(n)); warn(" Payload: %s", (char *) msg->payload); return; } - if (ret != 1) { + if (ret == 0) { debug(4, "MQTT: skip empty message for node %s", node_name(n)); - sample_put(smp); + sample_put_many(smps, n->in.vectorize); return; } - queue_signalled_push(&m->queue, (void *) smp); + queue_signalled_push_many(&m->queue, (void *) smps, n->in.vectorize); } static void mqtt_subscribe_cb(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)