1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

kafka: refactor setting "calocation" to "ca"

This commit is contained in:
Steffen Vogel 2021-05-09 15:23:02 +02:00
parent 6417b3a6f2
commit 0881b38224
3 changed files with 56 additions and 56 deletions

View file

@ -1,47 +1,47 @@
nodes = {
kafka_consumer_node = {
type = "kafka",
format = "villas.human",
server = "localhost:9094",
protocol = "SASL_SSL",
client_id = "villas-node",
in = {
consume = "test-topic",
group_id = "villas-node"
},
ssl = {
calocation = "/etc/ssl/certs/ca.pem",
},
sals = {
mechanisms = "SCRAM-SHA-512",
username = "scram-sha-512-usr",
password = "scram-sha-512-pwd"
}
}
kafka_producer_node = {
type = "kafka",
format = "villas.human",
server = "localhost:9094",
protocol = "SASL_SSL",
client_id = "villas-node",
out = {
produce = "test-topic"
},
ssl = {
calocation = "/etc/ssl/certs/ca.pem",
},
sals = {
mechanisms = "SCRAM-SHA-512",
username = "scram-sha-512-usr",
password = "scram-sha-512-pwd"
}
}
}
nodes = {
kafka_consumer_node = {
type = "kafka",
format = "villas.human",
server = "localhost:9094",
protocol = "SASL_SSL",
client_id = "villas-node",
in = {
consume = "test-topic",
group_id = "villas-node"
},
ssl = {
ca = "/etc/ssl/certs/ca.pem",
},
sals = {
mechanisms = "SCRAM-SHA-512",
username = "scram-sha-512-usr",
password = "scram-sha-512-pwd"
}
}
kafka_producer_node = {
type = "kafka",
format = "villas.human",
server = "localhost:9094",
protocol = "SASL_SSL",
client_id = "villas-node",
out = {
produce = "test-topic"
},
ssl = {
ca = "/etc/ssl/certs/ca.pem",
},
sals = {
mechanisms = "SCRAM-SHA-512",
username = "scram-sha-512-usr",
password = "scram-sha-512-pwd"
}
}
}

View file

@ -49,7 +49,7 @@ struct kafka {
char *protocol; /**< Security protocol. */
char *produce; /**< Producer topic. */
char *consume; /**< Consumer topic. */
char *client_id; /**< Client id. */
char *client_id; /**< Client ID. */
struct {
rd_kafka_t *client;
@ -62,7 +62,7 @@ struct kafka {
} consumer;
struct {
char *calocation; /**< SSL CA file. */
char *ca; /**< SSL CA file. */
} ssl;
struct {

View file

@ -129,7 +129,7 @@ int kafka_init(struct vnode *n)
k->sasl.username = nullptr;
k->sasl.password = nullptr;
k->ssl.calocation = nullptr;
k->ssl.ca = nullptr;
ret = 0;
@ -182,18 +182,18 @@ int kafka_parse(struct vnode *n, json_t *json)
if (json_ssl) {
const char *calocation = nullptr;
const char *ca = nullptr;
ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s }",
"calocation", &calocation
"ca", &ca
);
if (ret)
throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", "Failed to parse SSL configuration of node {}", node_name(n));
if (!calocation)
throw ConfigError(json_ssl, "node-config-node-kafka-ssl", "'calocation' settings must be set for node {}.", node_name(n));
if (!ca)
throw ConfigError(json_ssl, "node-config-node-kafka-ssl", "'ca' settings must be set for node {}.", node_name(n));
k->ssl.calocation = calocation ? strdup(calocation) : nullptr;
k->ssl.ca = ca ? strdup(ca) : nullptr;
}
if (json_sasl) {
@ -324,7 +324,7 @@ int kafka_start(struct vnode *n)
ret = 1;
if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) {
if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.ca, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
ret = 1;
}