2021-05-09 14:57:44 +02:00
|
|
|
/** Node type: kafka
|
|
|
|
*
|
|
|
|
* @author Juan Pablo Noreña <jpnorenam@unal.edu.co>
|
|
|
|
* @copyright 2021, Universidad Nacional de Colombia
|
2022-07-04 18:20:03 +02:00
|
|
|
* @license Apache 2.0
|
2021-05-09 14:57:44 +02:00
|
|
|
*********************************************************************************/
|
|
|
|
|
|
|
|
#include <cstring>
|
2021-05-09 18:10:34 +02:00
|
|
|
#include <sys/syslog.h>
|
2021-05-09 14:57:44 +02:00
|
|
|
#include <librdkafka/rdkafkacpp.h>
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
#include <villas/node_compat.hpp>
|
2021-05-09 14:57:44 +02:00
|
|
|
#include <villas/nodes/kafka.hpp>
|
|
|
|
#include <villas/utils.hpp>
|
|
|
|
#include <villas/exceptions.hpp>
|
|
|
|
|
|
|
|
using namespace villas;
|
2021-05-10 00:12:30 +02:00
|
|
|
using namespace villas::node;
|
2021-05-09 14:57:44 +02:00
|
|
|
using namespace villas::utils;
|
|
|
|
|
|
|
|
// Each process has a list of clients for which a thread invokes the kafka loop
|
2021-08-10 10:12:48 -04:00
|
|
|
static struct List clients;
|
2021-05-09 14:57:44 +02:00
|
|
|
static pthread_t thread;
|
|
|
|
static Logger logger;
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
static
|
|
|
|
void kafka_logger_cb(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
|
2021-05-09 18:10:34 +02:00
|
|
|
{
|
|
|
|
|
|
|
|
switch (level) {
|
|
|
|
case LOG_EMERG:
|
|
|
|
case LOG_CRIT:
|
|
|
|
case LOG_ERR:
|
|
|
|
logger->error("{}: {}", fac, buf);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case LOG_ALERT:
|
|
|
|
case LOG_WARNING:
|
|
|
|
logger->warn("{}: {}", fac, buf);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case LOG_DEBUG:
|
|
|
|
logger->debug("{}: {}", fac, buf);
|
|
|
|
break;
|
|
|
|
|
|
|
|
case LOG_NOTICE:
|
|
|
|
case LOG_INFO:
|
|
|
|
default:
|
|
|
|
logger->info("{}: {}", fac, buf);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
static
|
|
|
|
void kafka_message_cb(void *ctx, const rd_kafka_message_t *msg)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
|
|
|
int ret;
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *n = (NodeCompat *) ctx;
|
|
|
|
auto *k = n->getData<struct kafka>();
|
|
|
|
struct Sample *smps[n->in.vectorize];
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
n->logger->debug("Received a message of {} bytes from broker {}", msg->len, k->server);
|
|
|
|
|
|
|
|
ret = sample_alloc_many(&k->pool, smps, n->in.vectorize);
|
|
|
|
if (ret <= 0) {
|
|
|
|
n->logger->warn("Pool underrun in consumer");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
ret = k->formatter->sscan((char *) msg->payload, msg->len, nullptr, smps, n->in.vectorize);
|
2021-05-09 14:57:44 +02:00
|
|
|
if (ret < 0) {
|
|
|
|
n->logger->warn("Received an invalid message");
|
|
|
|
n->logger->warn(" Payload: {}", (char *) msg->payload);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
if (ret == 0) {
|
|
|
|
n->logger->debug("Skip empty message");
|
|
|
|
sample_decref_many(smps, n->in.vectorize);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
ret = queue_signalled_push_many(&k->queue, (void **) smps, n->in.vectorize);
|
|
|
|
if (ret < (int) n->in.vectorize)
|
|
|
|
n->logger->warn("Failed to enqueue samples");
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
static
|
|
|
|
void * kafka_loop_thread(void *ctx)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
|
|
|
int ret;
|
|
|
|
|
|
|
|
// Set the cancel type of this thread to async
|
|
|
|
ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
|
|
|
|
if (ret != 0)
|
|
|
|
throw RuntimeError("Unable to set cancel type of Kafka communication thread to asynchronous.");
|
|
|
|
|
|
|
|
while (true) {
|
2021-08-10 10:12:48 -04:00
|
|
|
for (unsigned i = 0; i < list_length(&clients); i++) {
|
|
|
|
auto *n = (NodeCompat *) list_at(&clients, i);
|
|
|
|
auto *k = n->getData<struct kafka>();
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
// Execute kafka loop for this client
|
2021-05-09 15:21:59 +02:00
|
|
|
if (k->consumer.client) {
|
2021-05-09 18:09:24 +02:00
|
|
|
rd_kafka_message_t *msg = rd_kafka_consumer_poll(k->consumer.client, k->timeout * 1000);
|
2021-05-09 15:21:59 +02:00
|
|
|
if (msg) {
|
2021-05-09 18:10:34 +02:00
|
|
|
kafka_message_cb((void *) n, msg);
|
2021-05-09 14:57:44 +02:00
|
|
|
rd_kafka_message_destroy(msg);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nullptr;
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_reverse(NodeCompat *n)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *k = n->getData<struct kafka>();
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
SWAP(k->produce, k->consume);
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_init(NodeCompat *n)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *k = n->getData<struct kafka>();
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
/* Default values */
|
|
|
|
k->server = nullptr;
|
|
|
|
k->protocol = nullptr;
|
|
|
|
k->produce = nullptr;
|
|
|
|
k->consume = nullptr;
|
|
|
|
k->client_id = nullptr;
|
2021-05-09 18:09:24 +02:00
|
|
|
k->timeout = 1.0;
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
k->consumer.client = nullptr;
|
|
|
|
k->consumer.group_id = nullptr;
|
|
|
|
k->producer.client = nullptr;
|
|
|
|
k->producer.topic = nullptr;
|
|
|
|
|
2021-09-21 15:22:09 +02:00
|
|
|
k->sasl.mechanisms = nullptr;
|
2021-05-09 14:57:44 +02:00
|
|
|
k->sasl.username = nullptr;
|
|
|
|
k->sasl.password = nullptr;
|
|
|
|
|
2021-05-09 15:23:02 +02:00
|
|
|
k->ssl.ca = nullptr;
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
k->formatter = nullptr;
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
return 0;
|
2021-05-09 14:57:44 +02:00
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_parse(NodeCompat *n, json_t *json)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
|
|
|
int ret;
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *k = n->getData<struct kafka>();
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
const char *server;
|
|
|
|
const char *produce = nullptr;
|
|
|
|
const char *consume = nullptr;
|
2021-05-09 17:46:27 +02:00
|
|
|
const char *protocol;
|
2021-05-09 21:49:49 +02:00
|
|
|
const char *client_id = "villas-node";
|
2021-05-09 14:57:44 +02:00
|
|
|
const char *group_id = nullptr;
|
|
|
|
|
|
|
|
json_error_t err;
|
|
|
|
json_t *json_ssl = nullptr;
|
|
|
|
json_t *json_sasl = nullptr;
|
2021-05-10 00:12:30 +02:00
|
|
|
json_t *json_format = nullptr;
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: o, s: s, s?: F, s: s, s?: s, s?: o, s?: o }",
|
2021-05-09 14:57:44 +02:00
|
|
|
"out",
|
|
|
|
"produce", &produce,
|
|
|
|
"in",
|
|
|
|
"consume", &consume,
|
|
|
|
"group_id", &group_id,
|
2021-05-10 00:12:30 +02:00
|
|
|
"format", &json_format,
|
2021-05-09 14:57:44 +02:00
|
|
|
"server", &server,
|
|
|
|
"timeout", &k->timeout,
|
2021-05-09 14:58:08 +02:00
|
|
|
"protocol", &protocol,
|
|
|
|
"client_id", &client_id,
|
2021-05-09 14:57:44 +02:00
|
|
|
"ssl", &json_ssl,
|
|
|
|
"sasl", &json_sasl
|
|
|
|
);
|
|
|
|
if (ret)
|
|
|
|
throw ConfigError(json, err, "node-config-node-kafka");
|
|
|
|
|
|
|
|
k->server = strdup(server);
|
|
|
|
k->produce = produce ? strdup(produce) : nullptr;
|
|
|
|
k->consume = consume ? strdup(consume) : nullptr;
|
2021-05-09 17:46:27 +02:00
|
|
|
k->protocol = strdup(protocol);
|
2021-05-09 21:49:49 +02:00
|
|
|
k->client_id = strdup(client_id);
|
2021-05-09 14:57:44 +02:00
|
|
|
k->consumer.group_id = group_id ? strdup(group_id) : nullptr;
|
2021-05-09 14:58:08 +02:00
|
|
|
|
2021-05-09 18:12:02 +02:00
|
|
|
if (strcmp(protocol, "SSL") &&
|
2021-05-09 21:42:36 +02:00
|
|
|
strcmp(protocol, "PLAINTEXT") &&
|
2021-05-09 18:12:02 +02:00
|
|
|
strcmp(protocol, "SASL_SSL") &&
|
|
|
|
strcmp(protocol, "SASL_PLAINTEXT"))
|
2021-05-09 17:46:27 +02:00
|
|
|
throw ConfigError(json, "node-config-node-kafka-protocol", "Invalid security protocol: {}", protocol);
|
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
if (!k->produce && !k->consume)
|
2023-01-10 16:02:40 +01:00
|
|
|
throw ConfigError(json, "node-config-node-kafka", "At least one topic has to be specified for node {}", n->getName());
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
if (json_ssl) {
|
2021-05-09 17:46:27 +02:00
|
|
|
const char *ca;
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-05-09 17:46:27 +02:00
|
|
|
ret = json_unpack_ex(json_ssl, &err, 0, "{ s: s }",
|
2021-05-09 15:23:02 +02:00
|
|
|
"ca", &ca
|
2021-05-09 14:57:44 +02:00
|
|
|
);
|
|
|
|
if (ret)
|
2023-01-10 16:02:40 +01:00
|
|
|
throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", "Failed to parse SSL configuration of node {}", n->getName());
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-05-09 17:46:27 +02:00
|
|
|
k->ssl.ca = strdup(ca);
|
2021-05-09 14:57:44 +02:00
|
|
|
}
|
|
|
|
|
2021-05-09 14:58:08 +02:00
|
|
|
if (json_sasl) {
|
2021-09-21 15:22:09 +02:00
|
|
|
const char *mechanisms;
|
2021-05-09 17:46:27 +02:00
|
|
|
const char *username;
|
|
|
|
const char *password;
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-09-21 15:22:09 +02:00
|
|
|
ret = json_unpack_ex(json_sasl, &err, 0, "{ s: s, s: s, s: s }",
|
|
|
|
"mechanisms", &mechanisms,
|
2021-05-09 14:57:44 +02:00
|
|
|
"username", &username,
|
2021-05-09 14:58:08 +02:00
|
|
|
"password", &password
|
2021-05-09 14:57:44 +02:00
|
|
|
);
|
|
|
|
if (ret)
|
2021-09-21 15:22:09 +02:00
|
|
|
throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", "Failed to parse SASL configuration");
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-09-21 15:22:09 +02:00
|
|
|
k->sasl.mechanisms = strdup(mechanisms);
|
2021-05-09 17:46:27 +02:00
|
|
|
k->sasl.username = strdup(username);
|
|
|
|
k->sasl.password = strdup(password);
|
2021-05-09 14:57:44 +02:00
|
|
|
}
|
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
/* Format */
|
2021-08-10 10:12:48 -04:00
|
|
|
if (k->formatter)
|
|
|
|
delete k->formatter;
|
2021-05-10 00:12:30 +02:00
|
|
|
k->formatter = json_format
|
|
|
|
? FormatFactory::make(json_format)
|
|
|
|
: FormatFactory::make("villas.binary");
|
|
|
|
if (!k->formatter)
|
|
|
|
throw ConfigError(json_format, "node-config-node-kafka-format", "Invalid format configuration");
|
2021-08-10 10:12:48 -04:00
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_prepare(NodeCompat *n)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
|
|
|
int ret;
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *k = n->getData<struct kafka>();
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
k->formatter->start(n->getInputSignals(false), ~(int) SampleFlags::HAS_OFFSET);
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
ret = pool_init(&k->pool, 1024, SAMPLE_LENGTH(n->getInputSignals(false)->size()));
|
2021-05-09 14:57:44 +02:00
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
ret = queue_signalled_init(&k->queue, 1024);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
char * villas::node::kafka_print(NodeCompat *n)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *k = n->getData<struct kafka>();
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
char *buf = nullptr;
|
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
strcatf(&buf, "bootstrap.server=%s, client.id=%s, security.protocol=%s",
|
2021-05-09 14:57:44 +02:00
|
|
|
k->server,
|
2021-05-09 14:58:08 +02:00
|
|
|
k->client_id,
|
2021-05-09 14:57:44 +02:00
|
|
|
k->protocol
|
|
|
|
);
|
|
|
|
|
|
|
|
/* Only show if not default */
|
|
|
|
if (k->produce)
|
|
|
|
strcatf(&buf, ", out.produce=%s", k->produce);
|
|
|
|
|
|
|
|
if (k->consume)
|
|
|
|
strcatf(&buf, ", in.consume=%s", k->consume);
|
|
|
|
|
|
|
|
return buf;
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_destroy(NodeCompat *n)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
|
|
|
int ret;
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *k = n->getData<struct kafka>();
|
2021-05-09 14:58:08 +02:00
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
if (k->producer.client)
|
|
|
|
rd_kafka_destroy(k->producer.client);
|
2021-05-09 14:58:08 +02:00
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
if (k->consumer.client)
|
|
|
|
rd_kafka_destroy(k->consumer.client);
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
if (k->formatter)
|
|
|
|
delete k->formatter;
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
ret = pool_destroy(&k->pool);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
ret = queue_signalled_destroy(&k->queue);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
if (k->produce)
|
|
|
|
free(k->produce);
|
2021-05-09 15:21:59 +02:00
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
if (k->consume)
|
|
|
|
free(k->consume);
|
2021-05-09 15:21:59 +02:00
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
if (k->protocol)
|
|
|
|
free(k->protocol);
|
2021-05-09 15:21:59 +02:00
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
if (k->client_id)
|
|
|
|
free(k->client_id);
|
|
|
|
|
|
|
|
free(k->server);
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_start(NodeCompat *n)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
|
|
|
int ret;
|
2021-05-09 18:43:20 +02:00
|
|
|
char errstr[1024];
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *k = n->getData<struct kafka>();
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
rd_kafka_conf_t *rdkconf = rd_kafka_conf_new();
|
2021-05-09 18:43:20 +02:00
|
|
|
if (!rdkconf)
|
|
|
|
throw MemoryAllocationError();
|
2021-05-09 15:21:59 +02:00
|
|
|
|
2021-05-09 18:43:20 +02:00
|
|
|
rd_kafka_conf_set_log_cb(rdkconf, kafka_logger_cb);
|
2021-05-09 18:10:34 +02:00
|
|
|
|
2021-05-09 18:51:00 +02:00
|
|
|
ret = rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr));
|
2021-05-09 17:44:54 +02:00
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
goto kafka_config_error;
|
2021-05-09 15:21:59 +02:00
|
|
|
|
2021-05-09 17:44:54 +02:00
|
|
|
ret = rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, sizeof(errstr));
|
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
goto kafka_config_error;
|
2021-05-09 15:21:59 +02:00
|
|
|
|
2021-05-09 18:12:02 +02:00
|
|
|
ret = rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr));
|
2021-05-09 17:44:54 +02:00
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
goto kafka_config_error;
|
2021-05-09 15:21:59 +02:00
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) {
|
2021-05-09 17:44:54 +02:00
|
|
|
ret = rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.ca, errstr, sizeof(errstr));
|
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
goto kafka_config_error;
|
2021-05-09 14:58:08 +02:00
|
|
|
}
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
if (!strcmp(k->protocol, "SASL_PLAINTEXT") || !strcmp(k->protocol, "SASL_SSL")) {
|
2021-09-21 15:22:09 +02:00
|
|
|
ret = rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanisms, errstr, sizeof(errstr));
|
2021-05-09 17:44:54 +02:00
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
goto kafka_config_error;
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-05-09 17:44:54 +02:00
|
|
|
ret = rd_kafka_conf_set(rdkconf, "sasl.username", k->sasl.username, errstr, sizeof(errstr));
|
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
goto kafka_config_error;
|
|
|
|
|
|
|
|
ret = rd_kafka_conf_set(rdkconf, "sasl.password", k->sasl.password, errstr, sizeof(errstr));
|
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
goto kafka_config_error;
|
|
|
|
}
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-05-09 15:21:59 +02:00
|
|
|
if (k->produce) {
|
2021-05-09 18:23:37 +02:00
|
|
|
// rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object,
|
|
|
|
// so we will need to create a copy first
|
|
|
|
rd_kafka_conf_t *rdkconf_prod = rd_kafka_conf_dup(rdkconf);
|
2021-05-09 18:43:20 +02:00
|
|
|
if (!rdkconf_prod)
|
|
|
|
throw MemoryAllocationError();
|
2021-05-09 18:23:37 +02:00
|
|
|
|
|
|
|
k->producer.client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf_prod, errstr, sizeof(errstr));
|
2021-06-02 14:44:50 -05:00
|
|
|
if (!k->producer.client)
|
2021-05-09 17:44:54 +02:00
|
|
|
goto kafka_config_error;
|
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();
|
2021-05-09 17:44:54 +02:00
|
|
|
if (!topic_conf)
|
2021-05-09 18:43:20 +02:00
|
|
|
throw MemoryAllocationError();
|
2021-05-09 17:44:54 +02:00
|
|
|
|
|
|
|
ret = rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr));
|
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
goto kafka_config_error;
|
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
k->producer.topic = rd_kafka_topic_new(k->producer.client, k->produce, topic_conf);
|
2021-05-09 17:44:54 +02:00
|
|
|
if (!k->producer.topic)
|
2021-05-09 18:43:20 +02:00
|
|
|
throw MemoryAllocationError();
|
2021-05-09 17:44:54 +02:00
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
n->logger->info("Connected producer to bootstrap server {}", k->server);
|
|
|
|
}
|
|
|
|
|
2021-05-09 18:23:37 +02:00
|
|
|
if (k->consume) {
|
|
|
|
// rd_kafka_new() will take ownership and free the passed rd_kafka_conf_t object,
|
|
|
|
// so we will need to create a copy first
|
|
|
|
rd_kafka_conf_t *rdkconf_cons = rd_kafka_conf_dup(rdkconf);
|
2021-05-09 18:43:20 +02:00
|
|
|
if (!rdkconf_cons)
|
|
|
|
throw MemoryAllocationError();
|
|
|
|
|
|
|
|
rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(1);
|
|
|
|
if (!partitions)
|
|
|
|
throw MemoryAllocationError();
|
2021-05-09 18:23:37 +02:00
|
|
|
|
2021-05-09 18:43:20 +02:00
|
|
|
rd_kafka_topic_partition_t *partition = rd_kafka_topic_partition_list_add(partitions, k->consume, 0);
|
|
|
|
if (!partition)
|
|
|
|
throw RuntimeError("Failed to add new partition");
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-05-09 18:23:37 +02:00
|
|
|
ret = rd_kafka_conf_set(rdkconf_cons, "group.id", k->consumer.group_id, errstr, sizeof(errstr));
|
2021-05-09 17:44:54 +02:00
|
|
|
if (ret != RD_KAFKA_CONF_OK)
|
|
|
|
goto kafka_config_error;
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-05-09 18:23:37 +02:00
|
|
|
k->consumer.client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf_cons, errstr, sizeof(errstr));
|
2021-05-09 17:44:54 +02:00
|
|
|
if (!k->consumer.client)
|
2021-05-09 18:43:20 +02:00
|
|
|
throw MemoryAllocationError();
|
2021-05-09 17:44:54 +02:00
|
|
|
|
2021-05-09 18:43:20 +02:00
|
|
|
ret = rd_kafka_subscribe(k->consumer.client, partitions);
|
|
|
|
if (ret != RD_KAFKA_RESP_ERR_NO_ERROR)
|
2021-05-09 18:51:00 +02:00
|
|
|
throw RuntimeError("Error subscribing to {} at {}: {}", k->consume, k->server, rd_kafka_err2str((rd_kafka_resp_err_t) ret));
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
n->logger->info("Subscribed consumer from bootstrap server {}", k->server);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add client to global list of kafka clients
|
|
|
|
// so that thread can call kafka loop for this client
|
2021-08-10 10:12:48 -04:00
|
|
|
list_push(&clients, n);
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-05-09 18:43:20 +02:00
|
|
|
rd_kafka_conf_destroy(rdkconf);
|
|
|
|
|
2021-05-09 14:57:44 +02:00
|
|
|
return 0;
|
|
|
|
|
|
|
|
kafka_config_error:
|
|
|
|
rd_kafka_conf_destroy(rdkconf);
|
|
|
|
|
2021-05-09 18:43:20 +02:00
|
|
|
throw RuntimeError(errstr);
|
2021-05-09 14:58:08 +02:00
|
|
|
|
2021-05-09 18:43:20 +02:00
|
|
|
return -1;
|
2021-05-09 14:57:44 +02:00
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_stop(NodeCompat *n)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
2021-05-09 15:21:59 +02:00
|
|
|
int ret;
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *k = n->getData<struct kafka>();
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-05-09 18:43:20 +02:00
|
|
|
if (k->producer.client) {
|
2021-05-09 18:51:00 +02:00
|
|
|
ret = rd_kafka_flush(k->producer.client, k->timeout * 1000);
|
|
|
|
if (ret != RD_KAFKA_RESP_ERR_NO_ERROR)
|
|
|
|
n->logger->error("Failed to flush messages: {}", rd_kafka_err2str((rd_kafka_resp_err_t) ret));
|
2021-05-09 15:21:59 +02:00
|
|
|
|
2021-05-09 18:43:20 +02:00
|
|
|
/* If the output queue is still not empty there is an issue
|
|
|
|
* with producing messages to the clusters. */
|
|
|
|
if (rd_kafka_outq_len(k->producer.client) > 0)
|
|
|
|
n->logger->warn("{} message(s) were not delivered", rd_kafka_outq_len(k->producer.client));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Unregister client from global kafka client list
|
|
|
|
// so that kafka loop is no longer invoked for this client
|
|
|
|
// important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect
|
2021-08-10 10:12:48 -04:00
|
|
|
list_remove_all(&clients, n);
|
|
|
|
|
|
|
|
ret = queue_signalled_close(&k->queue);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-05-09 15:21:59 +02:00
|
|
|
return 0;
|
2021-05-09 14:57:44 +02:00
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_type_start(villas::node::SuperNode *sn)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
|
|
|
int ret;
|
|
|
|
|
|
|
|
logger = logging.get("node:kafka");
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
ret = list_init(&clients);
|
2021-05-09 14:57:44 +02:00
|
|
|
if (ret)
|
|
|
|
goto kafka_error;
|
|
|
|
|
|
|
|
// Start thread here to run kafka loop for registered clients
|
|
|
|
ret = pthread_create(&thread, nullptr, kafka_loop_thread, nullptr);
|
|
|
|
if (ret)
|
|
|
|
goto kafka_error;
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
kafka_error:
|
|
|
|
logger->warn("Error initialazing node type kafka");
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_type_stop()
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
|
|
|
int ret;
|
|
|
|
|
|
|
|
// Stop thread here that executes kafka loop
|
|
|
|
ret = pthread_cancel(thread);
|
|
|
|
if (ret)
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
logger->debug("Called pthread_cancel() on kafka communication management thread.");
|
|
|
|
|
|
|
|
ret = pthread_join(thread, nullptr);
|
|
|
|
if (ret)
|
|
|
|
goto kafka_error;
|
|
|
|
|
|
|
|
// When this is called the list of clients should be empty
|
2021-08-10 10:12:48 -04:00
|
|
|
if (list_length(&clients) > 0)
|
2021-05-09 14:57:44 +02:00
|
|
|
throw RuntimeError("List of kafka clients contains elements at time of destruction. Call node_stop for each kafka node before stopping node type!");
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
ret = list_destroy(&clients, nullptr, false);
|
2021-05-09 14:57:44 +02:00
|
|
|
if (ret)
|
|
|
|
goto kafka_error;
|
|
|
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
kafka_error:
|
|
|
|
logger->warn("Error stoping node type kafka");
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_read(NodeCompat *n, struct Sample * const smps[], unsigned cnt)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
|
|
|
int pulled;
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *k = n->getData<struct kafka>();
|
|
|
|
struct Sample *smpt[cnt];
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
pulled = queue_signalled_pull_many(&k->queue, (void **) smpt, cnt);
|
|
|
|
|
|
|
|
sample_copy_many(smps, smpt, pulled);
|
|
|
|
sample_decref_many(smpt, pulled);
|
|
|
|
|
|
|
|
return pulled;
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_write(NodeCompat *n, struct Sample * const smps[], unsigned cnt)
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
|
|
|
int ret;
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *k = n->getData<struct kafka>();
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
size_t wbytes;
|
|
|
|
|
2022-11-07 10:13:05 -05:00
|
|
|
char data[DEFAULT_FORMAT_BUFFER_LENGTH];
|
2021-05-09 14:57:44 +02:00
|
|
|
|
2021-05-10 00:12:30 +02:00
|
|
|
ret = k->formatter->sprint(data, sizeof(data), &wbytes, smps, cnt);
|
2021-05-09 14:57:44 +02:00
|
|
|
if (ret < 0)
|
|
|
|
return ret;
|
|
|
|
|
|
|
|
if (k->produce) {
|
2021-05-09 14:58:08 +02:00
|
|
|
ret = rd_kafka_produce(k->producer.topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
|
|
|
|
data, wbytes, NULL, 0, NULL);
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) {
|
|
|
|
n->logger->warn("Publish failed");
|
|
|
|
return -abs(ret);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
n->logger->warn("No produce possible because no produce topic is configured");
|
|
|
|
|
|
|
|
return cnt;
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
int villas::node::kafka_poll_fds(NodeCompat *n, int fds[])
|
2021-05-09 14:57:44 +02:00
|
|
|
{
|
2021-08-10 10:12:48 -04:00
|
|
|
auto *k = n->getData<struct kafka>();
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
fds[0] = queue_signalled_fd(&k->queue);
|
|
|
|
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
static NodeCompatType p;
|
2021-05-09 14:57:44 +02:00
|
|
|
|
|
|
|
__attribute__((constructor(110)))
|
|
|
|
static void register_plugin() {
|
2021-06-21 16:53:28 -04:00
|
|
|
p.name = "kafka";
|
|
|
|
p.description = "Kafka event message streaming (rdkafka)";
|
2021-06-21 16:11:42 -04:00
|
|
|
p.vectorize = 0;
|
|
|
|
p.size = sizeof(struct kafka);
|
2021-06-21 16:53:28 -04:00
|
|
|
p.type.start = kafka_type_start;
|
|
|
|
p.type.stop = kafka_type_stop;
|
2021-06-21 16:11:42 -04:00
|
|
|
p.destroy = kafka_destroy;
|
|
|
|
p.prepare = kafka_prepare;
|
|
|
|
p.parse = kafka_parse;
|
|
|
|
p.prepare = kafka_prepare;
|
|
|
|
p.print = kafka_print;
|
|
|
|
p.init = kafka_init;
|
|
|
|
p.destroy = kafka_destroy;
|
|
|
|
p.start = kafka_start;
|
|
|
|
p.stop = kafka_stop;
|
|
|
|
p.read = kafka_read;
|
|
|
|
p.write = kafka_write;
|
|
|
|
p.reverse = kafka_reverse;
|
|
|
|
p.poll_fds = kafka_poll_fds;
|
|
|
|
|
2021-08-10 10:12:48 -04:00
|
|
|
static NodeCompatFactory ncp(&p);
|
2021-05-09 14:57:44 +02:00
|
|
|
}
|