diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp index 2c0e28264..6d0b23855 100644 --- a/lib/nodes/kafka.cpp +++ b/lib/nodes/kafka.cpp @@ -145,7 +145,7 @@ int kafka_parse(struct vnode *n, json_t *json) const char *format = "villas.binary"; const char *produce = nullptr; const char *consume = nullptr; - const char *protocol = nullptr; + const char *protocol; const char *client_id = nullptr; const char *group_id = nullptr; @@ -153,7 +153,7 @@ int kafka_parse(struct vnode *n, json_t *json) json_t *json_ssl = nullptr; json_t *json_sasl = nullptr; - ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: s, s: s, s?: i, s?: s, s?: s, s?: o, s?: o }", + ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: s, s: s, s?: i, s: s, s?: s, s?: o, s?: o }", "out", "produce", &produce, "in", @@ -173,35 +173,36 @@ int kafka_parse(struct vnode *n, json_t *json) k->server = strdup(server); k->produce = produce ? strdup(produce) : nullptr; k->consume = consume ? strdup(consume) : nullptr; - k->protocol = protocol ? strdup(protocol) : nullptr; + k->protocol = strdup(protocol); k->client_id = client_id ? strdup(client_id) : nullptr; k->consumer.group_id = group_id ? strdup(group_id) : nullptr; + if (strcmp(protocol), "SSL") && + strcmp(protocol), "SASL_SSL") && + strcmp(protocol), "SASL_PLAINTEXT")) + throw ConfigError(json, "node-config-node-kafka-protocol", "Invalid security protocol: {}", protocol); + if (!k->produce && !k->consume) throw ConfigError(json, "node-config-node-kafka", "At least one topic has to be specified for node {}", node_name(n)); if (json_ssl) { + const char *ca; - const char *ca = nullptr; - - ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s }", + ret = json_unpack_ex(json_ssl, &err, 0, "{ s: s }", "ca", &ca ); if (ret) throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", "Failed to parse SSL configuration of node {}", node_name(n)); - if (!ca) - throw ConfigError(json_ssl, "node-config-node-kafka-ssl", "'ca' settings must be set for node {}.", node_name(n)); - - k->ssl.ca = ca ? strdup(ca) : nullptr; + k->ssl.ca = strdup(ca); } if (json_sasl) { - const char *mechanism = nullptr; - const char *username = nullptr; - const char *password = nullptr; + const char *mechanism; + const char *username; + const char *password; - ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s, s?: s, s?: s }", + ret = json_unpack_ex(json_ssl, &err, 0, "{ s: s, s: s, s: s }", "mechanism", &mechanism, "username", &username, "password", &password @@ -209,12 +210,9 @@ int kafka_parse(struct vnode *n, json_t *json) if (ret) throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", "Failed to parse SASL configuration of node {}", node_name(n)); - if (!username && !password && !mechanism) - throw ConfigError(json_sasl, "node-config-node-kafka-sasl", "Either 'sasl.mechanism', 'sasl.username' or 'sasl.password' settings must be set for node {}.", node_name(n)); - - k->sasl.mechanism = mechanism ? strdup(mechanism) : nullptr; - k->sasl.username = username ? strdup(username) : nullptr; - k->sasl.password = password ? strdup(password) : nullptr; + k->sasl.mechanism = strdup(mechanism); + k->sasl.username = strdup(username); + k->sasl.password = strdup(password); } k->format = format_type_lookup(format);