1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

kafka: fix compiler errors

This commit is contained in:
Steffen Vogel 2021-05-09 18:51:00 +02:00
parent 025f030ae3
commit 34cdb2819e

View file

@ -345,7 +345,7 @@ int kafka_start(struct vnode *n)
rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb);
ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr))
ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr));
if (ret != RD_KAFKA_CONF_OK)
goto kafka_config_error;
@ -428,7 +428,7 @@ int kafka_start(struct vnode *n)
ret = rd_kafka_subscribe(k->consumer.client, partitions);
if (ret != RD_KAFKA_RESP_ERR_NO_ERROR)
throw RuntimeError("Error subscribing to {} at {}: {}", k->consume, k->server, rd_kafka_err2str(ret));
throw RuntimeError("Error subscribing to {} at {}: {}", k->consume, k->server, rd_kafka_err2str((rd_kafka_resp_err_t) ret));
n->logger->info("Subscribed consumer from bootstrap server {}", k->server);
}
@ -455,7 +455,9 @@ int kafka_stop(struct vnode *n)
struct kafka *k = (struct kafka *) n->_vd;
if (k->producer.client) {
rd_kafka_flush(k->producer.client, k->timeout * 1000);
ret = rd_kafka_flush(k->producer.client, k->timeout * 1000);
if (ret != RD_KAFKA_RESP_ERR_NO_ERROR)
n->logger->error("Failed to flush messages: {}", rd_kafka_err2str((rd_kafka_resp_err_t) ret));
/* If the output queue is still not empty there is an issue
* with producing messages to the clusters. */