mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
kafka: harmonize error checking
This commit is contained in:
parent
db2d730d28
commit
5a9af6553c
1 changed files with 42 additions and 21 deletions
|
@ -314,38 +314,55 @@ int kafka_start(struct vnode *n)
|
|||
char *errstr;
|
||||
rd_kafka_conf_t *rdkconf = rd_kafka_conf_new();
|
||||
|
||||
if (rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr))
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
goto kafka_config_error;
|
||||
|
||||
if (rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
ret = rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, sizeof(errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
goto kafka_config_error;
|
||||
|
||||
if (rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
ret rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
goto kafka_config_error;
|
||||
|
||||
if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) {
|
||||
if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.ca, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
ret = rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.ca, errstr, sizeof(errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
goto kafka_config_error;
|
||||
}
|
||||
|
||||
if (!strcmp(k->protocol, "SASL_PLAINTEXT") || !strcmp(k->protocol, "SASL_SSL")) {
|
||||
if (rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanism, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
if (rd_kafka_conf_set(rdkconf, "sasl.username", k->sasl.username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
if (rd_kafka_conf_set(rdkconf, "sasl.password", k->sasl.password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
}
|
||||
ret = rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanism, errstr, sizeof(errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
goto kafka_config_error;
|
||||
|
||||
if (ret)
|
||||
goto kafka_config_error;
|
||||
ret = rd_kafka_conf_set(rdkconf, "sasl.username", k->sasl.username, errstr, sizeof(errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
goto kafka_config_error;
|
||||
|
||||
ret = rd_kafka_conf_set(rdkconf, "sasl.password", k->sasl.password, errstr, sizeof(errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
goto kafka_config_error;
|
||||
}
|
||||
|
||||
if (k->produce) {
|
||||
k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr));
|
||||
if (!k->consumer.client)
|
||||
goto kafka_config_error;
|
||||
|
||||
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
|
||||
if (rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
if (!topic_conf)
|
||||
goto kafka_config_error;
|
||||
|
||||
ret = rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
goto kafka_config_error;
|
||||
|
||||
k->producer.topic = rd_kafka_topic_new(k->producer.client, k->produce, topic_conf);
|
||||
if (!k->producer.topic)
|
||||
goto kafka_config_error;
|
||||
|
||||
n->logger->info("Connected producer to bootstrap server {}", k->server);
|
||||
}
|
||||
|
||||
|
@ -354,10 +371,14 @@ int kafka_start(struct vnode *n)
|
|||
rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0);
|
||||
rd_kafka_topic_partition_list_add(partitions, k->consume, 0);
|
||||
|
||||
if (rd_kafka_conf_set(rdkconf, "group.id", k->consumer.group_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
ret = rd_kafka_conf_set(rdkconf, "group.id", k->consumer.group_id, errstr, sizeof(errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
goto kafka_config_error;
|
||||
|
||||
k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf, errstr, sizeof(errstr));
|
||||
if (!k->consumer.client)
|
||||
goto kafka_config_error;
|
||||
|
||||
rd_kafka_subscribe(k->consumer.client, partitions);
|
||||
|
||||
n->logger->info("Subscribed consumer from bootstrap server {}", k->server);
|
||||
|
|
Loading…
Add table
Reference in a new issue