diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index ec4191345..02b1226b2 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -376,7 +376,11 @@ int kafka_start(struct vnode *n) } if (k->produce) { - k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr)); + // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, + // so we will need to create a copy first + rd_kafka_conf_t *rdkconf_prod = rd_kafka_conf_dup(rdkconf); + + k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf_prod, errstr, sizeof(errstr)); if (!k->consumer.client) goto kafka_config_error; @@ -395,16 +399,19 @@ int kafka_start(struct vnode *n) n->logger->info("Connected producer to bootstrap server {}", k->server); } - // if (k->consume) { # rd_kafka_new() method not working for two simultaneously clients (SIGSEGV) - else if (k->consume) { + if (k->consume) { + // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, + // so we will need to create a copy first + rd_kafka_conf_t *rdkconf_cons = rd_kafka_conf_dup(rdkconf); + rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0); rd_kafka_topic_partition_list_add(partitions, k->consume, 0); - ret = rd_kafka_conf_set(rdkconf, "group.id", k->consumer.group_id, errstr, sizeof(errstr)); + ret = rd_kafka_conf_set(rdkconf_cons, "group.id", k->consumer.group_id, errstr, sizeof(errstr)); if (ret != RD_KAFKA_CONF_OK) goto kafka_config_error; - k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf, errstr, sizeof(errstr)); + k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf_cons, errstr, sizeof(errstr)); if (!k->consumer.client) goto kafka_config_error;