1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

kafka: fix whitespaces

This commit is contained in:
Steffen Vogel 2021-05-09 14:58:08 +02:00
parent d5d12dd38e
commit c594077534

View file

@ -162,8 +162,8 @@ int kafka_parse(struct vnode *n, json_t *json)
"format", &format,
"server", &server,
"timeout", &k->timeout,
"protocol", &protocol,
"client_id", &client_id,
"protocol", &protocol,
"client_id", &client_id,
"ssl", &json_ssl,
"sasl", &json_sasl
);
@ -176,7 +176,7 @@ int kafka_parse(struct vnode *n, json_t *json)
k->protocol = protocol ? strdup(protocol) : nullptr;
k->client_id = client_id ? strdup(client_id) : nullptr;
k->consumer.group_id = group_id ? strdup(group_id) : nullptr;
if (!k->produce && !k->consume)
throw ConfigError(json, "node-config-node-kafka", "At least one topic has to be specified for node {}", node_name(n));
@ -196,16 +196,15 @@ int kafka_parse(struct vnode *n, json_t *json)
k->ssl.calocation = calocation ? strdup(calocation) : nullptr;
}
if (json_sasl) {
if (json_sasl) {
const char *mechanism = nullptr;
const char *username = nullptr;
const char *password = nullptr;
const char *password = nullptr;
ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s, s?: s, s?: s }",
ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s, s?: s, s?: s }",
"mechanism", &mechanism,
"username", &username,
"password", &password
"password", &password
);
if (ret)
throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", "Failed to parse SASL configuration of node {}", node_name(n));
@ -215,7 +214,7 @@ int kafka_parse(struct vnode *n, json_t *json)
k->sasl.mechanism = mechanism ? strdup(mechanism) : nullptr;
k->sasl.username = username ? strdup(username) : nullptr;
k->sasl.password = password ? strdup(password) : nullptr;
k->sasl.password = password ? strdup(password) : nullptr;
}
k->format = format_type_lookup(format);
@ -253,7 +252,7 @@ char * kafka_print(struct vnode *n)
strcatf(&buf, "format=%s, bootstrap.server=%s, client.id=%s, security.protocol=%s", format_type_name(k->format),
k->server,
k->client_id,
k->client_id,
k->protocol
);
@ -271,10 +270,10 @@ int kafka_destroy(struct vnode *n)
{
int ret;
struct kafka *k = (struct kafka *) n->_vd;
if (k->producer.client)
rd_kafka_destroy(k->producer.client);
if (k->consumer.client)
rd_kafka_destroy(k->consumer.client);
@ -316,11 +315,11 @@ int kafka_start(struct vnode *n)
if (rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
ret = 1;
if (rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
ret = 1;
ret = 1;
if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) {
if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
ret = 1;
}
if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
ret = 1;
}
if (!strcmp(k->protocol, "SASL_PLAINTEXT") || !strcmp(k->protocol, "SASL_SSL")) {
if (rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanism, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
@ -374,7 +373,7 @@ kafka_config_error:
kafka_server_error:
n->logger->warn("Error subscribing to {} at {} ", k->consume, k->server);
return ret;
}
int kafka_stop(struct vnode *n)
@ -383,7 +382,7 @@ int kafka_stop(struct vnode *n)
// struct kafka *k = (struct kafka *) n->_vd;
// Unregister client from global kafka client list
// so that kafka loop is no longer invoked for this client
// so that kafka loop is no longer invoked for this client
// important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect
vlist_remove(&clients, vlist_index(&clients, n));
@ -481,8 +480,8 @@ int kafka_write(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *
return ret;
if (k->produce) {
ret = rd_kafka_produce(k->producer.topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
data, wbytes, NULL, 0, NULL);
ret = rd_kafka_produce(k->producer.topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
data, wbytes, NULL, 0, NULL);
if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) {
n->logger->warn("Publish failed");
@ -508,12 +507,12 @@ static struct plugin p;
__attribute__((constructor(110)))
static void register_plugin() {
p.name = "kafka";
p.name = "kafka";
p.description = "Kafka event message streaming (rdkafka)";
p.type = PluginType::NODE;
p.type = PluginType::NODE;
p.node.instances.state = State::DESTROYED;
p.node.vectorize = 0;
p.node.size = sizeof(struct kafka);
p.node.size = sizeof(struct kafka);
p.node.type.start = kafka_type_start;
p.node.type.stop = kafka_type_stop;
p.node.destroy = kafka_destroy;
@ -521,11 +520,11 @@ static void register_plugin() {
p.node.parse = kafka_parse;
p.node.prepare = kafka_prepare;
p.node.print = kafka_print;
p.node.init = kafka_init;
p.node.init = kafka_init;
p.node.destroy = kafka_destroy;
p.node.start = kafka_start;
p.node.stop = kafka_stop;
p.node.read = kafka_read;
p.node.stop = kafka_stop;
p.node.read = kafka_read;
p.node.write = kafka_write;
p.node.reverse = kafka_reverse;
p.node.poll_fds = kafka_poll_fds;