diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index 02b1226b2..bf01eea27 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -336,12 +336,14 @@ int kafka_destroy(struct vnode *n) int kafka_start(struct vnode *n) { int ret; + char errstr[1024]; struct kafka *k = (struct kafka *) n->_vd; - char *errstr; rd_kafka_conf_t *rdkconf = rd_kafka_conf_new(); + if (!rdkconf) + throw MemoryAllocationError(); - ret = rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb); + rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb); ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr)) if (ret != RD_KAFKA_CONF_OK) @@ -379,6 +381,8 @@ int kafka_start(struct vnode *n) // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, // so we will need to create a copy first rd_kafka_conf_t *rdkconf_prod = rd_kafka_conf_dup(rdkconf); + if (!rdkconf_prod) + throw MemoryAllocationError(); k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf_prod, errstr, sizeof(errstr)); if (!k->consumer.client) @@ -386,7 +390,7 @@ int kafka_start(struct vnode *n) rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); if (!topic_conf) - goto kafka_config_error; + throw MemoryAllocationError(); ret = rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr)); if (ret != RD_KAFKA_CONF_OK) @@ -394,7 +398,7 @@ int kafka_start(struct vnode *n) k->producer.topic = rd_kafka_topic_new(k->producer.client, k->produce, topic_conf); if (!k->producer.topic) - goto kafka_config_error; + throw MemoryAllocationError(); n->logger->info("Connected producer to bootstrap server {}", k->server); } @@ -403,9 +407,16 @@ int kafka_start(struct vnode *n) // rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object, // so we will need to create a copy first rd_kafka_conf_t *rdkconf_cons = rd_kafka_conf_dup(rdkconf); + if (!rdkconf_cons) + throw MemoryAllocationError(); - rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0); - rd_kafka_topic_partition_list_add(partitions, k->consume, 0); + rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(1); + if (!partitions) + throw MemoryAllocationError(); + + rd_kafka_topic_partition_t *partition = rd_kafka_topic_partition_list_add(partitions, k->consume, 0); + if (!partition) + throw RuntimeError("Failed to add new partition"); ret = rd_kafka_conf_set(rdkconf_cons, "group.id", k->consumer.group_id, errstr, sizeof(errstr)); if (ret != RD_KAFKA_CONF_OK) @@ -413,55 +424,51 @@ int kafka_start(struct vnode *n) k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf_cons, errstr, sizeof(errstr)); if (!k->consumer.client) - goto kafka_config_error; + throw MemoryAllocationError(); - rd_kafka_subscribe(k->consumer.client, partitions); + ret = rd_kafka_subscribe(k->consumer.client, partitions); + if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) + throw RuntimeError("Error subscribing to {} at {}: {}", k->consume, k->server, rd_kafka_err2str(ret)); n->logger->info("Subscribed consumer from bootstrap server {}", k->server); } - if (!k->consumer.client && !k->producer.client) - goto kafka_server_error; - // Add client to global list of kafka clients // so that thread can call kafka loop for this client vlist_push(&clients, n); + rd_kafka_conf_destroy(rdkconf); + return 0; kafka_config_error: rd_kafka_conf_destroy(rdkconf); - n->logger->error("{}", errstr); - return ret; -kafka_server_error: - n->logger->error("Error subscribing to {} at {} ", k->consume, k->server); - return ret; + throw RuntimeError(errstr); + return -1; } int kafka_stop(struct vnode *n) { + int ret; + struct kafka *k = (struct kafka *) n->_vd; + + if (k->producer.client) { + rd_kafka_flush(k->producer.client, k->timeout * 1000); + + /* If the output queue is still not empty there is an issue + * with producing messages to the clusters. */ + if (rd_kafka_outq_len(k->producer.client) > 0) + n->logger->warn("{} message(s) were not delivered", rd_kafka_outq_len(k->producer.client)); + } + // Unregister client from global kafka client list // so that kafka loop is no longer invoked for this client // important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect vlist_remove_all(&clients, n); -#if 0 - int ret; - struct kafka *k = (struct kafka *) n->_vd; - - ret = kafka_disconnect(k->client); - if (ret != RD_KAFKA_ERR_NO_ERROR) - goto kafka_error; - -kafka_error: - n->logger->warn("{}", kafka_strerror(ret)); - - return ret; -#else return 0; -#endif } int kafka_type_start(villas::node::SuperNode *sn)