1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

kafka: some of the configuration settings are required

This commit is contained in:
Steffen Vogel 2021-05-09 17:46:27 +02:00
parent 5a9af6553c
commit 18c736b184

View file

@ -145,7 +145,7 @@ int kafka_parse(struct vnode *n, json_t *json)
const char *format = "villas.binary";
const char *produce = nullptr;
const char *consume = nullptr;
const char *protocol = nullptr;
const char *protocol;
const char *client_id = nullptr;
const char *group_id = nullptr;
@ -153,7 +153,7 @@ int kafka_parse(struct vnode *n, json_t *json)
json_t *json_ssl = nullptr;
json_t *json_sasl = nullptr;
ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: s, s: s, s?: i, s?: s, s?: s, s?: o, s?: o }",
ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: s, s: s, s?: i, s: s, s?: s, s?: o, s?: o }",
"out",
"produce", &produce,
"in",
@ -173,35 +173,36 @@ int kafka_parse(struct vnode *n, json_t *json)
k->server = strdup(server);
k->produce = produce ? strdup(produce) : nullptr;
k->consume = consume ? strdup(consume) : nullptr;
k->protocol = protocol ? strdup(protocol) : nullptr;
k->protocol = strdup(protocol);
k->client_id = client_id ? strdup(client_id) : nullptr;
k->consumer.group_id = group_id ? strdup(group_id) : nullptr;
if (strcmp(protocol), "SSL") &&
strcmp(protocol), "SASL_SSL") &&
strcmp(protocol), "SASL_PLAINTEXT"))
throw ConfigError(json, "node-config-node-kafka-protocol", "Invalid security protocol: {}", protocol);
if (!k->produce && !k->consume)
throw ConfigError(json, "node-config-node-kafka", "At least one topic has to be specified for node {}", node_name(n));
if (json_ssl) {
const char *ca;
const char *ca = nullptr;
ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s }",
ret = json_unpack_ex(json_ssl, &err, 0, "{ s: s }",
"ca", &ca
);
if (ret)
throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", "Failed to parse SSL configuration of node {}", node_name(n));
if (!ca)
throw ConfigError(json_ssl, "node-config-node-kafka-ssl", "'ca' settings must be set for node {}.", node_name(n));
k->ssl.ca = ca ? strdup(ca) : nullptr;
k->ssl.ca = strdup(ca);
}
if (json_sasl) {
const char *mechanism = nullptr;
const char *username = nullptr;
const char *password = nullptr;
const char *mechanism;
const char *username;
const char *password;
ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s, s?: s, s?: s }",
ret = json_unpack_ex(json_ssl, &err, 0, "{ s: s, s: s, s: s }",
"mechanism", &mechanism,
"username", &username,
"password", &password
@ -209,12 +210,9 @@ int kafka_parse(struct vnode *n, json_t *json)
if (ret)
throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", "Failed to parse SASL configuration of node {}", node_name(n));
if (!username && !password && !mechanism)
throw ConfigError(json_sasl, "node-config-node-kafka-sasl", "Either 'sasl.mechanism', 'sasl.username' or 'sasl.password' settings must be set for node {}.", node_name(n));
k->sasl.mechanism = mechanism ? strdup(mechanism) : nullptr;
k->sasl.username = username ? strdup(username) : nullptr;
k->sasl.password = password ? strdup(password) : nullptr;
k->sasl.mechanism = strdup(mechanism);
k->sasl.username = strdup(username);
k->sasl.password = strdup(password);
}
k->format = format_type_lookup(format);