diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index b43314ff8..6628f16a0 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -85,10 +85,10 @@ static void * kafka_loop_thread(void *ctx) struct kafka *k = (struct kafka *) n->_vd; // Execute kafka loop for this client - if(k->consumer.client) { + if (k->consumer.client) { rd_kafka_message_t *msg = rd_kafka_consumer_poll(k->consumer.client, k->timeout); - if(msg) { + if (msg) { kafka_message((void *) n, msg); rd_kafka_message_destroy(msg); } @@ -291,10 +291,13 @@ int kafka_destroy(struct vnode *n) if (k->produce) free(k->produce); + if (k->consume) free(k->consume); + if (k->protocol) free(k->protocol); + if (k->client_id) free(k->client_id); @@ -310,12 +313,16 @@ int kafka_start(struct vnode *n) char *errstr; rd_kafka_conf_t *rdkconf = rd_kafka_conf_new(); + if (rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) ret = 1; + if (rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) ret = 1; + if (rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) ret = 1; + if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) { if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) ret = 1; @@ -333,7 +340,7 @@ int kafka_start(struct vnode *n) if (ret) goto kafka_config_error; - if(k->produce) { + if (k->produce) { k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr)); rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); if (rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) @@ -342,8 +349,8 @@ int kafka_start(struct vnode *n) n->logger->info("Connected producer to bootstrap server {}", k->server); } - // if(k->consume) { # rd_kafka_new() method not working for two simultaneously clients (SIGSEGV) - else if(k->consume){ + // if (k->consume) { # rd_kafka_new() method not working for two simultaneously clients (SIGSEGV) + else if (k->consume) { rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0); rd_kafka_topic_partition_list_add(partitions, k->consume, 0); @@ -378,24 +385,26 @@ kafka_server_error: int kafka_stop(struct vnode *n) { - int ret; - // struct kafka *k = (struct kafka *) n->_vd; - // Unregister client from global kafka client list // so that kafka loop is no longer invoked for this client // important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect - vlist_remove(&clients, vlist_index(&clients, n)); + vlist_remove_all(&clients, n); + +#if 0 + int ret; + struct kafka *k = (struct kafka *) n->_vd; + + ret = kafka_disconnect(k->client); + if (ret != RD_KAFKA_ERR_NO_ERROR) + goto kafka_error; + +kafka_error: + n->logger->warn("{}", kafka_strerror(ret)); - // ret = kafka_disconnect(k->client); - // if (ret != RD_KAFKA_ERR_NO_ERROR) - // goto kafka_error; - ret = 0; return ret; - -//kafka_error: - //n->logger->warn("{}", kafka_strerror(ret)); - - //return ret; +#else + return 0; +#endif } int kafka_type_start(villas::node::SuperNode *sn)