1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

kafka: add custom logger

This commit is contained in:
Steffen Vogel 2021-05-09 18:10:34 +02:00
parent f1b484449e
commit 56f4009e57

View file

@ -21,6 +21,7 @@
*********************************************************************************/
#include <cstring>
#include <sys/syslog.h>
#include <librdkafka/rdkafkacpp.h>
#include <villas/nodes/kafka.hpp>
@ -37,7 +38,35 @@ static struct vlist clients;
static pthread_t thread;
static Logger logger;
static void kafka_message(void *ctx, const rd_kafka_message_t *msg)
static void kafka_logger_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
{
switch (level) {
case LOG_EMERG:
case LOG_CRIT:
case LOG_ERR:
logger->error("{}: {}", fac, buf);
break;
case LOG_ALERT:
case LOG_WARNING:
logger->warn("{}: {}", fac, buf);
break;
case LOG_DEBUG:
logger->debug("{}: {}", fac, buf);
break;
case LOG_NOTICE:
case LOG_INFO:
default:
logger->info("{}: {}", fac, buf);
break;
}
}
static void kafka_message_cb(void *ctx, const rd_kafka_message_t *msg)
{
int ret;
struct vnode *n = (struct vnode *) ctx;
@ -88,7 +117,7 @@ static void * kafka_loop_thread(void *ctx)
if (k->consumer.client) {
rd_kafka_message_t *msg = rd_kafka_consumer_poll(k->consumer.client, k->timeout * 1000);
if (msg) {
kafka_message((void *) n, msg);
kafka_message_cb((void *) n, msg);
rd_kafka_message_destroy(msg);
}
}
@ -312,6 +341,8 @@ int kafka_start(struct vnode *n)
char *errstr;
rd_kafka_conf_t *rdkconf = rd_kafka_conf_new();
ret = rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb);
ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr))
if (ret != RD_KAFKA_CONF_OK)
goto kafka_config_error;
@ -538,7 +569,7 @@ static void register_plugin() {
p.name = "kafka";
p.description = "Kafka event message streaming (rdkafka)";
p.type = PluginType::NODE;
p.node.instances.state = State::DESTROYED;
p.node.instances.state = State::DESTROYED;
p.node.vectorize = 0;
p.node.size = sizeof(struct kafka);
p.node.type.start = kafka_type_start;