diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index 2b4819477..2c0e28264 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -314,38 +314,55 @@ int kafka_start(struct vnode *n) char *errstr; rd_kafka_conf_t *rdkconf = rd_kafka_conf_new(); - if (rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; + ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr)) + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; - if (rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; + ret = rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; - if (rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; + ret rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) { - if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.ca, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; + ret = rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.ca, errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; } if (!strcmp(k->protocol, "SASL_PLAINTEXT") || !strcmp(k->protocol, "SASL_SSL")) { - if (rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanism, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; - if (rd_kafka_conf_set(rdkconf, "sasl.username", k->sasl.username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; - if (rd_kafka_conf_set(rdkconf, "sasl.password", k->sasl.password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; - } + ret = rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanism, errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; - if (ret) - goto kafka_config_error; + ret = rd_kafka_conf_set(rdkconf, "sasl.username", k->sasl.username, errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + + ret = rd_kafka_conf_set(rdkconf, "sasl.password", k->sasl.password, errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + } if (k->produce) { k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr)); + if (!k->consumer.client) + goto kafka_config_error; + rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); - if (rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; + if (!topic_conf) + goto kafka_config_error; + + ret = rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; + k->producer.topic = rd_kafka_topic_new(k->producer.client, k->produce, topic_conf); + if (!k->producer.topic) + goto kafka_config_error; + n->logger->info("Connected producer to bootstrap server {}", k->server); } @@ -354,10 +371,14 @@ int kafka_start(struct vnode *n) rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0); rd_kafka_topic_partition_list_add(partitions, k->consume, 0); - if (rd_kafka_conf_set(rdkconf, "group.id", k->consumer.group_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; + ret = rd_kafka_conf_set(rdkconf, "group.id", k->consumer.group_id, errstr, sizeof(errstr)); + if (ret != RD_KAFKA_CONF_OK) + goto kafka_config_error; k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf, errstr, sizeof(errstr)); + if (!k->consumer.client) + goto kafka_config_error; + rd_kafka_subscribe(k->consumer.client, partitions); n->logger->info("Subscribed consumer from bootstrap server {}", k->server);