mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
kafka: more code-cleanups
This commit is contained in:
parent
8f8d35792b
commit
6417b3a6f2
1 changed files with 27 additions and 18 deletions
|
@ -85,10 +85,10 @@ static void * kafka_loop_thread(void *ctx)
|
|||
struct kafka *k = (struct kafka *) n->_vd;
|
||||
|
||||
// Execute kafka loop for this client
|
||||
if(k->consumer.client) {
|
||||
if (k->consumer.client) {
|
||||
rd_kafka_message_t *msg = rd_kafka_consumer_poll(k->consumer.client, k->timeout);
|
||||
|
||||
if(msg) {
|
||||
if (msg) {
|
||||
kafka_message((void *) n, msg);
|
||||
rd_kafka_message_destroy(msg);
|
||||
}
|
||||
|
@ -291,10 +291,13 @@ int kafka_destroy(struct vnode *n)
|
|||
|
||||
if (k->produce)
|
||||
free(k->produce);
|
||||
|
||||
if (k->consume)
|
||||
free(k->consume);
|
||||
|
||||
if (k->protocol)
|
||||
free(k->protocol);
|
||||
|
||||
if (k->client_id)
|
||||
free(k->client_id);
|
||||
|
||||
|
@ -310,12 +313,16 @@ int kafka_start(struct vnode *n)
|
|||
|
||||
char *errstr;
|
||||
rd_kafka_conf_t *rdkconf = rd_kafka_conf_new();
|
||||
|
||||
if (rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
|
||||
if (rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
|
||||
if (rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
|
||||
if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) {
|
||||
if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
|
@ -333,7 +340,7 @@ int kafka_start(struct vnode *n)
|
|||
if (ret)
|
||||
goto kafka_config_error;
|
||||
|
||||
if(k->produce) {
|
||||
if (k->produce) {
|
||||
k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr));
|
||||
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
|
||||
if (rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
|
@ -342,8 +349,8 @@ int kafka_start(struct vnode *n)
|
|||
n->logger->info("Connected producer to bootstrap server {}", k->server);
|
||||
}
|
||||
|
||||
// if(k->consume) { # rd_kafka_new() method not working for two simultaneously clients (SIGSEGV)
|
||||
else if(k->consume){
|
||||
// if (k->consume) { # rd_kafka_new() method not working for two simultaneously clients (SIGSEGV)
|
||||
else if (k->consume) {
|
||||
rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0);
|
||||
rd_kafka_topic_partition_list_add(partitions, k->consume, 0);
|
||||
|
||||
|
@ -378,24 +385,26 @@ kafka_server_error:
|
|||
|
||||
int kafka_stop(struct vnode *n)
|
||||
{
|
||||
int ret;
|
||||
// struct kafka *k = (struct kafka *) n->_vd;
|
||||
|
||||
// Unregister client from global kafka client list
|
||||
// so that kafka loop is no longer invoked for this client
|
||||
// important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect
|
||||
vlist_remove(&clients, vlist_index(&clients, n));
|
||||
vlist_remove_all(&clients, n);
|
||||
|
||||
#if 0
|
||||
int ret;
|
||||
struct kafka *k = (struct kafka *) n->_vd;
|
||||
|
||||
ret = kafka_disconnect(k->client);
|
||||
if (ret != RD_KAFKA_ERR_NO_ERROR)
|
||||
goto kafka_error;
|
||||
|
||||
kafka_error:
|
||||
n->logger->warn("{}", kafka_strerror(ret));
|
||||
|
||||
// ret = kafka_disconnect(k->client);
|
||||
// if (ret != RD_KAFKA_ERR_NO_ERROR)
|
||||
// goto kafka_error;
|
||||
ret = 0;
|
||||
return ret;
|
||||
|
||||
//kafka_error:
|
||||
//n->logger->warn("{}", kafka_strerror(ret));
|
||||
|
||||
//return ret;
|
||||
#else
|
||||
return 0;
|
||||
#endif
|
||||
}
|
||||
|
||||
int kafka_type_start(villas::node::SuperNode *sn)
|
||||
|
|
Loading…
Add table
Reference in a new issue