mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
kafka: improve shutdown behaviour
This commit is contained in:
parent
3f459d3539
commit
025f030ae3
1 changed files with 37 additions and 30 deletions
|
@ -336,12 +336,14 @@ int kafka_destroy(struct vnode *n)
|
|||
int kafka_start(struct vnode *n)
|
||||
{
|
||||
int ret;
|
||||
char errstr[1024];
|
||||
struct kafka *k = (struct kafka *) n->_vd;
|
||||
|
||||
char *errstr;
|
||||
rd_kafka_conf_t *rdkconf = rd_kafka_conf_new();
|
||||
if (!rdkconf)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
ret = rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb);
|
||||
rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb);
|
||||
|
||||
ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr))
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
|
@ -379,6 +381,8 @@ int kafka_start(struct vnode *n)
|
|||
// rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object,
|
||||
// so we will need to create a copy first
|
||||
rd_kafka_conf_t *rdkconf_prod = rd_kafka_conf_dup(rdkconf);
|
||||
if (!rdkconf_prod)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf_prod, errstr, sizeof(errstr));
|
||||
if (!k->consumer.client)
|
||||
|
@ -386,7 +390,7 @@ int kafka_start(struct vnode *n)
|
|||
|
||||
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
|
||||
if (!topic_conf)
|
||||
goto kafka_config_error;
|
||||
throw MemoryAllocationError();
|
||||
|
||||
ret = rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
|
@ -394,7 +398,7 @@ int kafka_start(struct vnode *n)
|
|||
|
||||
k->producer.topic = rd_kafka_topic_new(k->producer.client, k->produce, topic_conf);
|
||||
if (!k->producer.topic)
|
||||
goto kafka_config_error;
|
||||
throw MemoryAllocationError();
|
||||
|
||||
n->logger->info("Connected producer to bootstrap server {}", k->server);
|
||||
}
|
||||
|
@ -403,9 +407,16 @@ int kafka_start(struct vnode *n)
|
|||
// rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object,
|
||||
// so we will need to create a copy first
|
||||
rd_kafka_conf_t *rdkconf_cons = rd_kafka_conf_dup(rdkconf);
|
||||
if (!rdkconf_cons)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0);
|
||||
rd_kafka_topic_partition_list_add(partitions, k->consume, 0);
|
||||
rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(1);
|
||||
if (!partitions)
|
||||
throw MemoryAllocationError();
|
||||
|
||||
rd_kafka_topic_partition_t *partition = rd_kafka_topic_partition_list_add(partitions, k->consume, 0);
|
||||
if (!partition)
|
||||
throw RuntimeError("Failed to add new partition");
|
||||
|
||||
ret = rd_kafka_conf_set(rdkconf_cons, "group.id", k->consumer.group_id, errstr, sizeof(errstr));
|
||||
if (ret != RD_KAFKA_CONF_OK)
|
||||
|
@ -413,55 +424,51 @@ int kafka_start(struct vnode *n)
|
|||
|
||||
k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf_cons, errstr, sizeof(errstr));
|
||||
if (!k->consumer.client)
|
||||
goto kafka_config_error;
|
||||
throw MemoryAllocationError();
|
||||
|
||||
rd_kafka_subscribe(k->consumer.client, partitions);
|
||||
ret = rd_kafka_subscribe(k->consumer.client, partitions);
|
||||
if (ret != RD_KAFKA_RESP_ERR_NO_ERROR)
|
||||
throw RuntimeError("Error subscribing to {} at {}: {}", k->consume, k->server, rd_kafka_err2str(ret));
|
||||
|
||||
n->logger->info("Subscribed consumer from bootstrap server {}", k->server);
|
||||
}
|
||||
|
||||
if (!k->consumer.client && !k->producer.client)
|
||||
goto kafka_server_error;
|
||||
|
||||
// Add client to global list of kafka clients
|
||||
// so that thread can call kafka loop for this client
|
||||
vlist_push(&clients, n);
|
||||
|
||||
rd_kafka_conf_destroy(rdkconf);
|
||||
|
||||
return 0;
|
||||
|
||||
kafka_config_error:
|
||||
rd_kafka_conf_destroy(rdkconf);
|
||||
n->logger->error("{}", errstr);
|
||||
return ret;
|
||||
|
||||
kafka_server_error:
|
||||
n->logger->error("Error subscribing to {} at {} ", k->consume, k->server);
|
||||
return ret;
|
||||
throw RuntimeError(errstr);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int kafka_stop(struct vnode *n)
|
||||
{
|
||||
int ret;
|
||||
struct kafka *k = (struct kafka *) n->_vd;
|
||||
|
||||
if (k->producer.client) {
|
||||
rd_kafka_flush(k->producer.client, k->timeout * 1000);
|
||||
|
||||
/* If the output queue is still not empty there is an issue
|
||||
* with producing messages to the clusters. */
|
||||
if (rd_kafka_outq_len(k->producer.client) > 0)
|
||||
n->logger->warn("{} message(s) were not delivered", rd_kafka_outq_len(k->producer.client));
|
||||
}
|
||||
|
||||
// Unregister client from global kafka client list
|
||||
// so that kafka loop is no longer invoked for this client
|
||||
// important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect
|
||||
vlist_remove_all(&clients, n);
|
||||
|
||||
#if 0
|
||||
int ret;
|
||||
struct kafka *k = (struct kafka *) n->_vd;
|
||||
|
||||
ret = kafka_disconnect(k->client);
|
||||
if (ret != RD_KAFKA_ERR_NO_ERROR)
|
||||
goto kafka_error;
|
||||
|
||||
kafka_error:
|
||||
n->logger->warn("{}", kafka_strerror(ret));
|
||||
|
||||
return ret;
|
||||
#else
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
int kafka_type_start(villas::node::SuperNode *sn)
|
||||
|
|
Loading…
Add table
Reference in a new issue