1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

kafka: fix segfault if a single node is used for both publishing and consuming

This commit is contained in:
Steffen Vogel 2021-05-09 18:23:37 +02:00
parent 6c66b7cc75
commit 3f459d3539

View file

@ -376,7 +376,11 @@ int kafka_start(struct vnode *n)
}
if (k->produce) {
k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr));
// rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object,
// so we will need to create a copy first
rd_kafka_conf_t *rdkconf_prod = rd_kafka_conf_dup(rdkconf);
k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf_prod, errstr, sizeof(errstr));
if (!k->consumer.client)
goto kafka_config_error;
@ -395,16 +399,19 @@ int kafka_start(struct vnode *n)
n->logger->info("Connected producer to bootstrap server {}", k->server);
}
// if (k->consume) { # rd_kafka_new() method not working for two simultaneously clients (SIGSEGV)
else if (k->consume) {
if (k->consume) {
// rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object,
// so we will need to create a copy first
rd_kafka_conf_t *rdkconf_cons = rd_kafka_conf_dup(rdkconf);
rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0);
rd_kafka_topic_partition_list_add(partitions, k->consume, 0);
ret = rd_kafka_conf_set(rdkconf, "group.id", k->consumer.group_id, errstr, sizeof(errstr));
ret = rd_kafka_conf_set(rdkconf_cons, "group.id", k->consumer.group_id, errstr, sizeof(errstr));
if (ret != RD_KAFKA_CONF_OK)
goto kafka_config_error;
k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf, errstr, sizeof(errstr));
k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf_cons, errstr, sizeof(errstr));
if (!k->consumer.client)
goto kafka_config_error;