mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
node-type-kafka producer and consumer separation, not working at same time as in a single node as desirable
This commit is contained in:
parent
b32af984ac
commit
818047c6bd
3 changed files with 78 additions and 42 deletions
|
@ -1,5 +1,5 @@
|
|||
nodes = {
|
||||
kafka_node = {
|
||||
kafka_consumer_node = {
|
||||
type = "kafka",
|
||||
|
||||
format = "villas.human",
|
||||
|
@ -8,14 +8,33 @@ nodes = {
|
|||
protocol = "SASL_SSL",
|
||||
client_id = "villas-node",
|
||||
|
||||
out = {
|
||||
produce = "test-topic"
|
||||
},
|
||||
in = {
|
||||
consume = "test-topic",
|
||||
group_id = "villas-node"
|
||||
},
|
||||
|
||||
ssl = {
|
||||
calocation = "/etc/ssl/certs/ca.pem",
|
||||
},
|
||||
sals = {
|
||||
mechanisms = "SCRAM-SHA-512",
|
||||
username = "scram-sha-512-usr",
|
||||
password = "scram-sha-512-pwd"
|
||||
}
|
||||
}
|
||||
kafka_producer_node = {
|
||||
type = "kafka",
|
||||
|
||||
format = "villas.human",
|
||||
|
||||
server = "localhost:9094",
|
||||
protocol = "SASL_SSL",
|
||||
client_id = "villas-node",
|
||||
|
||||
out = {
|
||||
produce = "test-topic"
|
||||
},
|
||||
|
||||
ssl = {
|
||||
calocation = "/etc/ssl/certs/ca.pem",
|
||||
},
|
||||
|
|
|
@ -41,27 +41,34 @@
|
|||
struct format_type;
|
||||
|
||||
struct kafka {
|
||||
rd_kafka_t *client;
|
||||
rd_kafka_topic_t *topic;
|
||||
struct queue_signalled queue;
|
||||
struct pool pool;
|
||||
|
||||
int timeout; /**< Timeout in ms. */
|
||||
char *server; /**< Hostname/IP:Port address of the bootstrap server. */
|
||||
char *protocol; /**< Security protocol. */
|
||||
char *produce; /**< Producer topic. */
|
||||
char *consume; /**< Consumer topic. */
|
||||
char *client_id; /**< Client id. */
|
||||
char *group_id; /**< Group id. */
|
||||
int timeout; /**< Timeout in ms. */
|
||||
char *server; /**< Hostname/IP:Port address of the bootstrap server. */
|
||||
char *protocol; /**< Security protocol. */
|
||||
char *produce; /**< Producer topic. */
|
||||
char *consume; /**< Consumer topic. */
|
||||
char *client_id; /**< Client id. */
|
||||
|
||||
struct {
|
||||
char *calocation; /**< SSL CA file. */
|
||||
rd_kafka_t *client;
|
||||
rd_kafka_topic_t *topic;
|
||||
} producer;
|
||||
|
||||
struct {
|
||||
rd_kafka_t *client;
|
||||
char *group_id; /**< Group id. */
|
||||
} consumer;
|
||||
|
||||
struct {
|
||||
char *calocation; /**< SSL CA file. */
|
||||
} ssl;
|
||||
|
||||
struct {
|
||||
char *mechanism; /**< SASL mechanism. */
|
||||
char *username; /**< SSL CA path. */
|
||||
char *password; /**< SSL certificate. */
|
||||
char *mechanism; /**< SASL mechanism. */
|
||||
char *username; /**< SSL CA path. */
|
||||
char *password; /**< SSL certificate. */
|
||||
} sasl;
|
||||
|
||||
struct format_type *format;
|
||||
|
|
|
@ -85,11 +85,13 @@ static void * kafka_loop_thread(void *ctx)
|
|||
struct kafka *k = (struct kafka *) n->_vd;
|
||||
|
||||
// Execute kafka loop for this client
|
||||
rd_kafka_message_t *msg = rd_kafka_consumer_poll(k->client, k->timeout);
|
||||
if(k->consumer.client) {
|
||||
rd_kafka_message_t *msg = rd_kafka_consumer_poll(k->consumer.client, k->timeout);
|
||||
|
||||
if(msg) {
|
||||
kafka_message((void *) n, msg);
|
||||
rd_kafka_message_destroy(msg);
|
||||
if(msg) {
|
||||
kafka_message((void *) n, msg);
|
||||
rd_kafka_message_destroy(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -117,7 +119,11 @@ int kafka_init(struct vnode *n)
|
|||
k->produce = nullptr;
|
||||
k->consume = nullptr;
|
||||
k->client_id = nullptr;
|
||||
k->group_id = nullptr;
|
||||
|
||||
k->consumer.client = nullptr;
|
||||
k->consumer.group_id = nullptr;
|
||||
k->producer.client = nullptr;
|
||||
k->producer.topic = nullptr;
|
||||
|
||||
k->sasl.mechanism = nullptr;
|
||||
k->sasl.username = nullptr;
|
||||
|
@ -169,7 +175,7 @@ int kafka_parse(struct vnode *n, json_t *json)
|
|||
k->consume = consume ? strdup(consume) : nullptr;
|
||||
k->protocol = protocol ? strdup(protocol) : nullptr;
|
||||
k->client_id = client_id ? strdup(client_id) : nullptr;
|
||||
k->group_id = group_id ? strdup(group_id) : nullptr;
|
||||
k->consumer.group_id = group_id ? strdup(group_id) : nullptr;
|
||||
|
||||
if (!k->produce && !k->consume)
|
||||
throw ConfigError(json, "node-config-node-kafka", "At least one topic has to be specified for node {}", node_name(n));
|
||||
|
@ -190,15 +196,16 @@ int kafka_parse(struct vnode *n, json_t *json)
|
|||
k->ssl.calocation = calocation ? strdup(calocation) : nullptr;
|
||||
}
|
||||
|
||||
if (json_sasl) {
|
||||
if (json_sasl) {
|
||||
|
||||
const char *mechanism = nullptr;
|
||||
const char *username = nullptr;
|
||||
const char *password = nullptr;
|
||||
const char *password = nullptr;
|
||||
|
||||
ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s, s?: s, s?: s }",
|
||||
"mechanism", &mechanism,
|
||||
"username", &username,
|
||||
"password", &password
|
||||
"password", &password
|
||||
);
|
||||
if (ret)
|
||||
throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", "Failed to parse SASL configuration of node {}", node_name(n));
|
||||
|
@ -208,7 +215,7 @@ int kafka_parse(struct vnode *n, json_t *json)
|
|||
|
||||
k->sasl.mechanism = mechanism ? strdup(mechanism) : nullptr;
|
||||
k->sasl.username = username ? strdup(username) : nullptr;
|
||||
k->sasl.password = password ? strdup(password) : nullptr;
|
||||
k->sasl.password = password ? strdup(password) : nullptr;
|
||||
}
|
||||
|
||||
k->format = format_type_lookup(format);
|
||||
|
@ -264,8 +271,12 @@ int kafka_destroy(struct vnode *n)
|
|||
{
|
||||
int ret;
|
||||
struct kafka *k = (struct kafka *) n->_vd;
|
||||
|
||||
rd_kafka_destroy(k->client);
|
||||
|
||||
if (k->producer.client)
|
||||
rd_kafka_destroy(k->producer.client);
|
||||
|
||||
if (k->consumer.client)
|
||||
rd_kafka_destroy(k->consumer.client);
|
||||
|
||||
ret = io_destroy(&k->io);
|
||||
if (ret)
|
||||
|
@ -287,8 +298,6 @@ int kafka_destroy(struct vnode *n)
|
|||
free(k->protocol);
|
||||
if (k->client_id)
|
||||
free(k->client_id);
|
||||
if (k->group_id)
|
||||
free(k->group_id);
|
||||
|
||||
free(k->server);
|
||||
|
||||
|
@ -309,9 +318,9 @@ int kafka_start(struct vnode *n)
|
|||
if (rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) {
|
||||
if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
}
|
||||
if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
}
|
||||
|
||||
if (!strcmp(k->protocol, "SASL_PLAINTEXT") || !strcmp(k->protocol, "SASL_SSL")) {
|
||||
if (rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanism, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
|
@ -326,28 +335,29 @@ int kafka_start(struct vnode *n)
|
|||
goto kafka_config_error;
|
||||
|
||||
if(k->produce) {
|
||||
k->client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr));
|
||||
k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr));
|
||||
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
|
||||
if (rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
k->topic = rd_kafka_topic_new(k->client, k->produce, topic_conf);
|
||||
k->producer.topic = rd_kafka_topic_new(k->producer.client, k->produce, topic_conf);
|
||||
n->logger->info("Connected producer to bootstrap server {}", k->server);
|
||||
}
|
||||
|
||||
// if(k->consume) { # rd_kafka_new() method not working for two simultaneously clients (SIGSEGV)
|
||||
else if(k->consume){
|
||||
rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0);
|
||||
rd_kafka_topic_partition_list_add(partitions, k->consume, 0);
|
||||
|
||||
if (rd_kafka_conf_set(rdkconf, "group.id", k->group_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
if (rd_kafka_conf_set(rdkconf, "group.id", k->consumer.group_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
|
||||
ret = 1;
|
||||
|
||||
k->client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf, errstr, sizeof(errstr));
|
||||
rd_kafka_subscribe(k->client, partitions);
|
||||
k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf, errstr, sizeof(errstr));
|
||||
rd_kafka_subscribe(k->consumer.client, partitions);
|
||||
|
||||
n->logger->info("Subscribed consumer from bootstrap server {}", k->server);
|
||||
}
|
||||
|
||||
|
||||
if (!k->client)
|
||||
if (!k->consumer.client && !k->producer.client)
|
||||
goto kafka_server_error;
|
||||
|
||||
// Add client to global list of kafka clients
|
||||
|
@ -471,7 +481,7 @@ int kafka_write(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *
|
|||
return ret;
|
||||
|
||||
if (k->produce) {
|
||||
ret = rd_kafka_produce(k->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
|
||||
ret = rd_kafka_produce(k->producer.topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
|
||||
data, wbytes, NULL, 0, NULL);
|
||||
|
||||
if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) {
|
||||
|
|
Loading…
Add table
Reference in a new issue