diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index 7734e8d23..ec4ce77a3 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -67,20 +67,20 @@ static void kafka_logger_cb(const rd_kafka_t *rk, int level, const char *fac, co static void kafka_message_cb(void *ctx, const rd_kafka_message_t *msg) { - int ret; + int ret, cnt = n->in.vectorize; struct vnode *n = (struct vnode *) ctx; struct kafka *k = (struct kafka *) n->_vd; - struct sample *smps[n->in.vectorize]; + struct sample *smps[cnt]; n->logger->debug("Received a message of {} bytes from broker {}", msg->len, k->server); - ret = sample_alloc_many(&k->pool, smps, n->in.vectorize); + ret = sample_alloc_many(&k->pool, smps, cnt); if (ret <= 0) { n->logger->warn("Pool underrun in consumer"); return; } - ret = k->formatter->sscan((char *) msg->payload, msg->len, nullptr, smps, n->in.vectorize); + ret = k->formatter->sscan((char *) msg->payload, msg->len, nullptr, smps, cnt); if (ret < 0) { n->logger->warn("Received an invalid message"); n->logger->warn(" Payload: {}", (char *) msg->payload); @@ -89,11 +89,11 @@ static void kafka_message_cb(void *ctx, const rd_kafka_message_t *msg) if (ret == 0) { n->logger->debug("Skip empty message"); - sample_decref_many(smps, n->in.vectorize); + sample_decref_many(smps, cnt); return; } - ret = queue_signalled_push_many(&k->queue, (void **) smps, n->in.vectorize); + ret = queue_signalled_push_many(&k->queue, (void **) smps, cnt); if (ret < (int) n->in.vectorize) n->logger->warn("Failed to enqueue samples"); }