From 3f459d353989471c91ea6b254e9de84ce2637e18 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sun, 9 May 2021 18:23:37 +0200 Subject: [PATCH] kafka: fix segfault if a single node is used for both publishing and consuming --- lib/nodes/kafka.cpp | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index ec4191345..02b1226b2 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -376,7 +376,11 @@ int kafka_start(struct vnode *n) } if (k->produce) { - k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr)); + // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, + // so we will need to create a copy first + rd_kafka_conf_t *rdkconf_prod = rd_kafka_conf_dup(rdkconf); + + k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf_prod, errstr, sizeof(errstr)); if (!k->consumer.client) goto kafka_config_error; @@ -395,16 +399,19 @@ int kafka_start(struct vnode *n) n->logger->info("Connected producer to bootstrap server {}", k->server); } - // if (k->consume) { # rd_kafka_new() method not working for two simultaneously clients (SIGSEGV) - else if (k->consume) { + if (k->consume) { + // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, + // so we will need to create a copy first + rd_kafka_conf_t *rdkconf_cons = rd_kafka_conf_dup(rdkconf); + rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0); rd_kafka_topic_partition_list_add(partitions, k->consume, 0); - ret = rd_kafka_conf_set(rdkconf, "group.id", k->consumer.group_id, errstr, sizeof(errstr)); + ret = rd_kafka_conf_set(rdkconf_cons, "group.id", k->consumer.group_id, errstr, sizeof(errstr)); if (ret != RD_KAFKA_CONF_OK) goto kafka_config_error; - k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf, errstr, sizeof(errstr)); + k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf_cons, errstr, sizeof(errstr)); if (!k->consumer.client) goto kafka_config_error;