diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index bf01eea27..920651614 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -345,7 +345,7 @@ int kafka_start(struct vnode *n) rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb); - ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr)) + ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr)); if (ret != RD_KAFKA_CONF_OK) goto kafka_config_error; @@ -428,7 +428,7 @@ int kafka_start(struct vnode *n) ret = rd_kafka_subscribe(k->consumer.client, partitions); if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) - throw RuntimeError("Error subscribing to {} at {}: {}", k->consume, k->server, rd_kafka_err2str(ret)); + throw RuntimeError("Error subscribing to {} at {}: {}", k->consume, k->server, rd_kafka_err2str((rd_kafka_resp_err_t) ret)); n->logger->info("Subscribed consumer from bootstrap server {}", k->server); } @@ -455,7 +455,9 @@ int kafka_stop(struct vnode *n) struct kafka *k = (struct kafka *) n->_vd; if (k->producer.client) { - rd_kafka_flush(k->producer.client, k->timeout * 1000); + ret = rd_kafka_flush(k->producer.client, k->timeout * 1000); + if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) + n->logger->error("Failed to flush messages: {}", rd_kafka_err2str((rd_kafka_resp_err_t) ret)); /* If the output queue is still not empty there is an issue * with producing messages to the clusters. */