diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index 982d945a1..75ad0f2b8 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -21,6 +21,7 @@ *********************************************************************************/ #include +#include #include #include @@ -37,7 +38,35 @@ static struct vlist clients; static pthread_t thread; static Logger logger; -static void kafka_message(void *ctx, const rd_kafka_message_t *msg) +static void kafka_logger_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf) +{ + + switch (level) { + case LOG_EMERG: + case LOG_CRIT: + case LOG_ERR: + logger->error("{}: {}", fac, buf); + break; + + case LOG_ALERT: + case LOG_WARNING: + logger->warn("{}: {}", fac, buf); + break; + + case LOG_DEBUG: + logger->debug("{}: {}", fac, buf); + break; + + case LOG_NOTICE: + case LOG_INFO: + default: + logger->info("{}: {}", fac, buf); + break; + + } +} + +static void kafka_message_cb(void *ctx, const rd_kafka_message_t *msg) { int ret; struct vnode *n = (struct vnode *) ctx; @@ -88,7 +117,7 @@ static void * kafka_loop_thread(void *ctx) if (k->consumer.client) { rd_kafka_message_t *msg = rd_kafka_consumer_poll(k->consumer.client, k->timeout * 1000); if (msg) { - kafka_message((void *) n, msg); + kafka_message_cb((void *) n, msg); rd_kafka_message_destroy(msg); } } @@ -312,6 +341,8 @@ int kafka_start(struct vnode *n) char *errstr; rd_kafka_conf_t *rdkconf = rd_kafka_conf_new(); + ret = rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb); + ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr)) if (ret != RD_KAFKA_CONF_OK) goto kafka_config_error; @@ -538,7 +569,7 @@ static void register_plugin() { p.name = "kafka"; p.description = "Kafka event message streaming (rdkafka)"; p.type = PluginType::NODE; - p.node.instances.state = State::DESTROYED; + p.node.instances.state = State::DESTROYED; p.node.vectorize = 0; p.node.size = sizeof(struct kafka); p.node.type.start = kafka_type_start;