diff --git a/etc/examples/nodes/kafka.conf b/etc/examples/nodes/kafka.conf index a7c34d7d9..7726a0708 100644 --- a/etc/examples/nodes/kafka.conf +++ b/etc/examples/nodes/kafka.conf @@ -1,5 +1,5 @@ nodes = { - kafka_node = { + kafka_consumer_node = { type = "kafka", format = "villas.human", @@ -8,14 +8,33 @@ nodes = { protocol = "SASL_SSL", client_id = "villas-node", - out = { - produce = "test-topic" - }, in = { consume = "test-topic", group_id = "villas-node" }, + ssl = { + calocation = "/etc/ssl/certs/ca.pem", + }, + sals = { + mechanisms = "SCRAM-SHA-512", + username = "scram-sha-512-usr", + password = "scram-sha-512-pwd" + } + } + kafka_producer_node = { + type = "kafka", + + format = "villas.human", + + server = "localhost:9094", + protocol = "SASL_SSL", + client_id = "villas-node", + + out = { + produce = "test-topic" + }, + ssl = { calocation = "/etc/ssl/certs/ca.pem", }, diff --git a/include/villas/nodes/kafka.hpp b/include/villas/nodes/kafka.hpp index c8edec721..5a88fd0ab 100644 --- a/include/villas/nodes/kafka.hpp +++ b/include/villas/nodes/kafka.hpp @@ -41,27 +41,34 @@ struct format_type; struct kafka { - rd_kafka_t *client; - rd_kafka_topic_t *topic; struct queue_signalled queue; struct pool pool; - int timeout; /**< Timeout in ms. */ - char *server; /**< Hostname/IP:Port address of the bootstrap server. */ - char *protocol; /**< Security protocol. */ - char *produce; /**< Producer topic. */ - char *consume; /**< Consumer topic. */ - char *client_id; /**< Client id. */ - char *group_id; /**< Group id. */ + int timeout; /**< Timeout in ms. */ + char *server; /**< Hostname/IP:Port address of the bootstrap server. */ + char *protocol; /**< Security protocol. */ + char *produce; /**< Producer topic. */ + char *consume; /**< Consumer topic. */ + char *client_id; /**< Client id. */ struct { - char *calocation; /**< SSL CA file. */ + rd_kafka_t *client; + rd_kafka_topic_t *topic; + } producer; + + struct { + rd_kafka_t *client; + char *group_id; /**< Group id. */ + } consumer; + + struct { + char *calocation; /**< SSL CA file. */ } ssl; struct { - char *mechanism; /**< SASL mechanism. */ - char *username; /**< SSL CA path. */ - char *password; /**< SSL certificate. */ + char *mechanism; /**< SASL mechanism. */ + char *username; /**< SSL CA path. */ + char *password; /**< SSL certificate. */ } sasl; struct format_type *format; diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index b78c396ca..83c4e45cb 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -85,11 +85,13 @@ static void * kafka_loop_thread(void *ctx) struct kafka *k = (struct kafka *) n->_vd; // Execute kafka loop for this client - rd_kafka_message_t *msg = rd_kafka_consumer_poll(k->client, k->timeout); + if(k->consumer.client) { + rd_kafka_message_t *msg = rd_kafka_consumer_poll(k->consumer.client, k->timeout); - if(msg) { - kafka_message((void *) n, msg); - rd_kafka_message_destroy(msg); + if(msg) { + kafka_message((void *) n, msg); + rd_kafka_message_destroy(msg); + } } } } @@ -117,7 +119,11 @@ int kafka_init(struct vnode *n) k->produce = nullptr; k->consume = nullptr; k->client_id = nullptr; - k->group_id = nullptr; + + k->consumer.client = nullptr; + k->consumer.group_id = nullptr; + k->producer.client = nullptr; + k->producer.topic = nullptr; k->sasl.mechanism = nullptr; k->sasl.username = nullptr; @@ -169,7 +175,7 @@ int kafka_parse(struct vnode *n, json_t *json) k->consume = consume ? strdup(consume) : nullptr; k->protocol = protocol ? strdup(protocol) : nullptr; k->client_id = client_id ? strdup(client_id) : nullptr; - k->group_id = group_id ? strdup(group_id) : nullptr; + k->consumer.group_id = group_id ? strdup(group_id) : nullptr; if (!k->produce && !k->consume) throw ConfigError(json, "node-config-node-kafka", "At least one topic has to be specified for node {}", node_name(n)); @@ -190,15 +196,16 @@ int kafka_parse(struct vnode *n, json_t *json) k->ssl.calocation = calocation ? strdup(calocation) : nullptr; } - if (json_sasl) { + if (json_sasl) { + const char *mechanism = nullptr; const char *username = nullptr; - const char *password = nullptr; + const char *password = nullptr; ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s, s?: s, s?: s }", "mechanism", &mechanism, "username", &username, - "password", &password + "password", &password ); if (ret) throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", "Failed to parse SASL configuration of node {}", node_name(n)); @@ -208,7 +215,7 @@ int kafka_parse(struct vnode *n, json_t *json) k->sasl.mechanism = mechanism ? strdup(mechanism) : nullptr; k->sasl.username = username ? strdup(username) : nullptr; - k->sasl.password = password ? strdup(password) : nullptr; + k->sasl.password = password ? strdup(password) : nullptr; } k->format = format_type_lookup(format); @@ -264,8 +271,12 @@ int kafka_destroy(struct vnode *n) { int ret; struct kafka *k = (struct kafka *) n->_vd; - - rd_kafka_destroy(k->client); + + if (k->producer.client) + rd_kafka_destroy(k->producer.client); + + if (k->consumer.client) + rd_kafka_destroy(k->consumer.client); ret = io_destroy(&k->io); if (ret) @@ -287,8 +298,6 @@ int kafka_destroy(struct vnode *n) free(k->protocol); if (k->client_id) free(k->client_id); - if (k->group_id) - free(k->group_id); free(k->server); @@ -309,9 +318,9 @@ int kafka_start(struct vnode *n) if (rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) ret = 1; if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) { - if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) - ret = 1; - } + if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + ret = 1; + } if (!strcmp(k->protocol, "SASL_PLAINTEXT") || !strcmp(k->protocol, "SASL_SSL")) { if (rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanism, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) @@ -326,28 +335,29 @@ int kafka_start(struct vnode *n) goto kafka_config_error; if(k->produce) { - k->client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr)); + k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr)); rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); if (rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) ret = 1; - k->topic = rd_kafka_topic_new(k->client, k->produce, topic_conf); + k->producer.topic = rd_kafka_topic_new(k->producer.client, k->produce, topic_conf); n->logger->info("Connected producer to bootstrap server {}", k->server); } + + // if(k->consume) { # rd_kafka_new() method not working for two simultaneously clients (SIGSEGV) else if(k->consume){ rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0); rd_kafka_topic_partition_list_add(partitions, k->consume, 0); - if (rd_kafka_conf_set(rdkconf, "group.id", k->group_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + if (rd_kafka_conf_set(rdkconf, "group.id", k->consumer.group_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) ret = 1; - k->client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf, errstr, sizeof(errstr)); - rd_kafka_subscribe(k->client, partitions); + k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf, errstr, sizeof(errstr)); + rd_kafka_subscribe(k->consumer.client, partitions); n->logger->info("Subscribed consumer from bootstrap server {}", k->server); } - - if (!k->client) + if (!k->consumer.client && !k->producer.client) goto kafka_server_error; // Add client to global list of kafka clients @@ -471,7 +481,7 @@ int kafka_write(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned * return ret; if (k->produce) { - ret = rd_kafka_produce(k->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, + ret = rd_kafka_produce(k->producer.topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, data, wbytes, NULL, 0, NULL); if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) {