diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index e7799115e..b43314ff8 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -162,8 +162,8 @@ int kafka_parse(struct vnode *n, json_t *json) "format", &format, "server", &server, "timeout", &k->timeout, - "protocol", &protocol, - "client_id", &client_id, + "protocol", &protocol, + "client_id", &client_id, "ssl", &json_ssl, "sasl", &json_sasl ); @@ -176,7 +176,7 @@ int kafka_parse(struct vnode *n, json_t *json) k->protocol = protocol ? strdup(protocol) : nullptr; k->client_id = client_id ? strdup(client_id) : nullptr; k->consumer.group_id = group_id ? strdup(group_id) : nullptr; - + if (!k->produce && !k->consume) throw ConfigError(json, "node-config-node-kafka", "At least one topic has to be specified for node {}", node_name(n)); @@ -196,16 +196,15 @@ int kafka_parse(struct vnode *n, json_t *json) k->ssl.calocation = calocation ? strdup(calocation) : nullptr; } - if (json_sasl) { - + if (json_sasl) { const char *mechanism = nullptr; const char *username = nullptr; - const char *password = nullptr; + const char *password = nullptr; - ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s, s?: s, s?: s }", + ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s, s?: s, s?: s }", "mechanism", &mechanism, "username", &username, - "password", &password + "password", &password ); if (ret) throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", "Failed to parse SASL configuration of node {}", node_name(n)); @@ -215,7 +214,7 @@ int kafka_parse(struct vnode *n, json_t *json) k->sasl.mechanism = mechanism ? strdup(mechanism) : nullptr; k->sasl.username = username ? strdup(username) : nullptr; - k->sasl.password = password ? strdup(password) : nullptr; + k->sasl.password = password ? strdup(password) : nullptr; } k->format = format_type_lookup(format); @@ -253,7 +252,7 @@ char * kafka_print(struct vnode *n) strcatf(&buf, "format=%s, bootstrap.server=%s, client.id=%s, security.protocol=%s", format_type_name(k->format), k->server, - k->client_id, + k->client_id, k->protocol ); @@ -271,10 +270,10 @@ int kafka_destroy(struct vnode *n) { int ret; struct kafka *k = (struct kafka *) n->_vd; - + if (k->producer.client) rd_kafka_destroy(k->producer.client); - + if (k->consumer.client) rd_kafka_destroy(k->consumer.client); @@ -316,11 +315,11 @@ int kafka_start(struct vnode *n) if (rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) ret = 1; if (rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; + ret = 1; if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) { - if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; - } + if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + ret = 1; + } if (!strcmp(k->protocol, "SASL_PLAINTEXT") || !strcmp(k->protocol, "SASL_SSL")) { if (rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanism, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) @@ -374,7 +373,7 @@ kafka_config_error: kafka_server_error: n->logger->warn("Error subscribing to {} at {} ", k->consume, k->server); return ret; - + } int kafka_stop(struct vnode *n) @@ -383,7 +382,7 @@ int kafka_stop(struct vnode *n) // struct kafka *k = (struct kafka *) n->_vd; // Unregister client from global kafka client list - // so that kafka loop is no longer invoked for this client + // so that kafka loop is no longer invoked for this client // important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect vlist_remove(&clients, vlist_index(&clients, n)); @@ -481,8 +480,8 @@ int kafka_write(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned * return ret; if (k->produce) { - ret = rd_kafka_produce(k->producer.topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, - data, wbytes, NULL, 0, NULL); + ret = rd_kafka_produce(k->producer.topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, + data, wbytes, NULL, 0, NULL); if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) { n->logger->warn("Publish failed"); @@ -508,12 +507,12 @@ static struct plugin p; __attribute__((constructor(110))) static void register_plugin() { - p.name = "kafka"; + p.name = "kafka"; p.description = "Kafka event message streaming (rdkafka)"; - p.type = PluginType::NODE; + p.type = PluginType::NODE; p.node.instances.state = State::DESTROYED; p.node.vectorize = 0; - p.node.size = sizeof(struct kafka); + p.node.size = sizeof(struct kafka); p.node.type.start = kafka_type_start; p.node.type.stop = kafka_type_stop; p.node.destroy = kafka_destroy; @@ -521,11 +520,11 @@ static void register_plugin() { p.node.parse = kafka_parse; p.node.prepare = kafka_prepare; p.node.print = kafka_print; - p.node.init = kafka_init; + p.node.init = kafka_init; p.node.destroy = kafka_destroy; p.node.start = kafka_start; - p.node.stop = kafka_stop; - p.node.read = kafka_read; + p.node.stop = kafka_stop; + p.node.read = kafka_read; p.node.write = kafka_write; p.node.reverse = kafka_reverse; p.node.poll_fds = kafka_poll_fds;