From ca10fd076f2c3879bc2b4a51a186bc9e11db1355 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Pablo=20Nore=C3=B1a?= Date: Wed, 5 May 2021 12:02:18 -0500 Subject: [PATCH] Adds node-type-kafka, example and packaging deps --- CMakeLists.txt | 3 + etc/examples/nodes/kafka.conf | 28 + include/villas/nodes/kafka.hpp | 104 ++++ lib/nodes/CMakeLists.txt | 6 + lib/nodes/kafka.cpp | 531 +++++++++++++++++++ packaging/deps.sh | 11 + packaging/docker/Dockerfile.centos | 1 + packaging/docker/Dockerfile.debian | 1 + packaging/docker/Dockerfile.debian-multiarch | 1 + packaging/docker/Dockerfile.fedora | 1 + packaging/docker/Dockerfile.ubuntu | 1 + 11 files changed, 688 insertions(+) create mode 100644 etc/examples/nodes/kafka.conf create mode 100644 include/villas/nodes/kafka.hpp create mode 100644 lib/nodes/kafka.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 217904376..ff1eb8e53 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -109,6 +109,7 @@ pkg_check_modules(LIBNL3_ROUTE IMPORTED_TARGET libnl-route-3.0>=3.2.27) pkg_check_modules(LIBIEC61850 IMPORTED_TARGET libiec61850>=1.2) pkg_check_modules(LIBCONFIG IMPORTED_TARGET libconfig>=1.4.9) pkg_check_modules(MOSQUITTO IMPORTED_TARGET libmosquitto>=1.6.9) +pkg_check_modules(RDKAFKA IMPORTED_TARGET rdkafka>=1.5.0) pkg_check_modules(RABBITMQ_C IMPORTED_TARGET librabbitmq>=0.8.0) pkg_check_modules(COMEDILIB IMPORTED_TARGET comedilib>=0.11.0) pkg_check_modules(LIBZMQ IMPORTED_TARGET libzmq>=2.2.0) @@ -156,6 +157,7 @@ cmake_dependent_option(WITH_NODE_INFINIBAND "Build with infiniband node-type" cmake_dependent_option(WITH_NODE_INFLUXDB "Build with influxdb node-type" ON "" OFF) cmake_dependent_option(WITH_NODE_LOOPBACK "Build with loopback node-type" ON "" OFF) cmake_dependent_option(WITH_NODE_MQTT "Build with mqtt node-type" ON "MOSQUITTO_FOUND" OFF) +cmake_dependent_option(WITH_NODE_KAFKA "Build with kafka node-type" ON "RDKAFKA_FOUND" OFF) cmake_dependent_option(WITH_NODE_NANOMSG "Build with nanomsg node-type" ON "NANOMSG_FOUND" OFF) cmake_dependent_option(WITH_NODE_NGSI "Build with ngsi node-type" ON "" OFF) cmake_dependent_option(WITH_NODE_OPAL "Build with opal node-type" ON "Opal_FOUND" OFF) @@ -245,6 +247,7 @@ add_feature_info(NODE_INFINIBAND WITH_NODE_INFINIBAND "Build with add_feature_info(NODE_INFLUXDB WITH_NODE_INFLUXDB "Build with influxdb node-type") add_feature_info(NODE_LOOPBACK WITH_NODE_LOOPBACK "Build with loopback node-type") add_feature_info(NODE_MQTT WITH_NODE_MQTT "Build with mqtt node-type") +add_feature_info(NODE_KAFKA WITH_NODE_KAFKA "Build with kafka node-type") add_feature_info(NODE_NANOMSG WITH_NODE_NANOMSG "Build with nanomsg node-type") add_feature_info(NODE_NGSI WITH_NODE_NGSI "Build with ngsi node-type") add_feature_info(NODE_OPAL WITH_NODE_OPAL "Build with opal node-type") diff --git a/etc/examples/nodes/kafka.conf b/etc/examples/nodes/kafka.conf new file mode 100644 index 000000000..a7c34d7d9 --- /dev/null +++ b/etc/examples/nodes/kafka.conf @@ -0,0 +1,28 @@ +nodes = { + kafka_node = { + type = "kafka", + + format = "villas.human", + + server = "localhost:9094", + protocol = "SASL_SSL", + client_id = "villas-node", + + out = { + produce = "test-topic" + }, + in = { + consume = "test-topic", + group_id = "villas-node" + }, + + ssl = { + calocation = "/etc/ssl/certs/ca.pem", + }, + sals = { + mechanisms = "SCRAM-SHA-512", + username = "scram-sha-512-usr", + password = "scram-sha-512-pwd" + } + } +} diff --git a/include/villas/nodes/kafka.hpp b/include/villas/nodes/kafka.hpp new file mode 100644 index 000000000..c8edec721 --- /dev/null +++ b/include/villas/nodes/kafka.hpp @@ -0,0 +1,104 @@ +/** Node type: kafka + * + * @file + * @author Steffen Vogel + * @copyright 2014-2020, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** + * @addtogroup kafka kafka node type + * @ingroup node + * @{ + */ + +#pragma once + +#include + +#include +#include +#include +#include + + +/* Forward declarations */ +struct format_type; + +struct kafka { + rd_kafka_t *client; + rd_kafka_topic_t *topic; + struct queue_signalled queue; + struct pool pool; + + int timeout; /**< Timeout in ms. */ + char *server; /**< Hostname/IP:Port address of the bootstrap server. */ + char *protocol; /**< Security protocol. */ + char *produce; /**< Producer topic. */ + char *consume; /**< Consumer topic. */ + char *client_id; /**< Client id. */ + char *group_id; /**< Group id. */ + + struct { + char *calocation; /**< SSL CA file. */ + } ssl; + + struct { + char *mechanism; /**< SASL mechanism. */ + char *username; /**< SSL CA path. */ + char *password; /**< SSL certificate. */ + } sasl; + + struct format_type *format; + struct io io; +}; + +/** @see node_type::reverse */ +int kafka_reverse(struct vnode *n); + +/** @see node_type::print */ +char * kafka_print(struct vnode *n); + +/** @see node_type::prepare */ +int kafka_prepare(struct vnode *n); + +/** @see node_type::parse */ +int kafka_parse(struct vnode *n, json_t *json); + +/** @see node_type::start */ +int kafka_start(struct vnode *n); + +/** @see node_type::destroy */ +int kafka_destroy(struct vnode *n); + +/** @see node_type::stop */ +int kafka_stop(struct vnode *n); + +/** @see node_type::type_start */ +int kafka_type_start(villas::node::SuperNode *sn); + +/** @see node_type::type_stop */ +int kafka_type_stop(); + +/** @see node_type::read */ +int kafka_read(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *release); + +/** @see node_type::write */ +int kafka_write(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *release); + +/** @} */ diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 0d2856a4d..ab224e5a8 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -121,6 +121,12 @@ if(WITH_NODE_MQTT) list(APPEND LIBRARIES PkgConfig::MOSQUITTO) endif() +# Enable KAFKA support +if(WITH_NODE_KAFKA) + list(APPEND NODE_SRC kafka.cpp) + list(APPEND LIBRARIES PkgConfig::RDKAFKA) +endif() + # Enable Comedi support if(WITH_NODE_COMEDI) list(APPEND NODE_SRC comedi.cpp) diff --git a/lib/nodes/kafka.cpp b/lib/nodes/kafka.cpp new file mode 100644 index 000000000..b78c396ca --- /dev/null +++ b/lib/nodes/kafka.cpp @@ -0,0 +1,531 @@ +/** Node type: kafka + * + * @author Steffen Vogel + * @copyright 2014-2020, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include +#include + +#include +#include +#include +#include +#include + +using namespace villas; +using namespace villas::utils; + +// Each process has a list of clients for which a thread invokes the kafka loop +static struct vlist clients; +static pthread_t thread; +static Logger logger; + +static void kafka_message(void *ctx, const rd_kafka_message_t *msg) +{ + int ret; + struct vnode *n = (struct vnode *) ctx; + struct kafka *k = (struct kafka *) n->_vd; + struct sample *smps[n->in.vectorize]; + + n->logger->debug("Received a message of {} bytes from broker {}", msg->len, k->server); + + ret = sample_alloc_many(&k->pool, smps, n->in.vectorize); + if (ret <= 0) { + n->logger->warn("Pool underrun in consumer"); + return; + } + + ret = io_sscan(&k->io, (char *) msg->payload, msg->len, nullptr, smps, n->in.vectorize); + if (ret < 0) { + n->logger->warn("Received an invalid message"); + n->logger->warn(" Payload: {}", (char *) msg->payload); + return; + } + + if (ret == 0) { + n->logger->debug("Skip empty message"); + sample_decref_many(smps, n->in.vectorize); + return; + } + + ret = queue_signalled_push_many(&k->queue, (void **) smps, n->in.vectorize); + if (ret < (int) n->in.vectorize) + n->logger->warn("Failed to enqueue samples"); +} + +static void * kafka_loop_thread(void *ctx) +{ + int ret; + + // Set the cancel type of this thread to async + ret = pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + if (ret != 0) + throw RuntimeError("Unable to set cancel type of Kafka communication thread to asynchronous."); + + while (true) { + for (unsigned i = 0; i < vlist_length(&clients); i++) { + struct vnode *n = (struct vnode *) vlist_at(&clients, i); + struct kafka *k = (struct kafka *) n->_vd; + + // Execute kafka loop for this client + rd_kafka_message_t *msg = rd_kafka_consumer_poll(k->client, k->timeout); + + if(msg) { + kafka_message((void *) n, msg); + rd_kafka_message_destroy(msg); + } + } + } + + return nullptr; +} + +int kafka_reverse(struct vnode *n) +{ + struct kafka *k = (struct kafka *) n->_vd; + + SWAP(k->produce, k->consume); + + return 0; +} + +int kafka_init(struct vnode *n) +{ + int ret; + struct kafka *k = (struct kafka *) n->_vd; + + /* Default values */ + k->server = nullptr; + k->protocol = nullptr; + k->produce = nullptr; + k->consume = nullptr; + k->client_id = nullptr; + k->group_id = nullptr; + + k->sasl.mechanism = nullptr; + k->sasl.username = nullptr; + k->sasl.password = nullptr; + + k->ssl.calocation = nullptr; + + ret = 0; + + return ret; +} + +int kafka_parse(struct vnode *n, json_t *json) +{ + int ret; + struct kafka *k = (struct kafka *) n->_vd; + + const char *server; + const char *format = "villas.binary"; + const char *produce = nullptr; + const char *consume = nullptr; + const char *protocol = nullptr; + const char *client_id = nullptr; + const char *group_id = nullptr; + + json_error_t err; + json_t *json_ssl = nullptr; + json_t *json_sasl = nullptr; + + ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: s }, s?: { s?: s, s?: s }, s?: s, s: s, s?: i, s?: s, s?: s, s?: o, s?: o }", + "out", + "produce", &produce, + "in", + "consume", &consume, + "group_id", &group_id, + "format", &format, + "server", &server, + "timeout", &k->timeout, + "protocol", &protocol, + "client_id", &client_id, + "ssl", &json_ssl, + "sasl", &json_sasl + ); + if (ret) + throw ConfigError(json, err, "node-config-node-kafka"); + + k->server = strdup(server); + k->produce = produce ? strdup(produce) : nullptr; + k->consume = consume ? strdup(consume) : nullptr; + k->protocol = protocol ? strdup(protocol) : nullptr; + k->client_id = client_id ? strdup(client_id) : nullptr; + k->group_id = group_id ? strdup(group_id) : nullptr; + + if (!k->produce && !k->consume) + throw ConfigError(json, "node-config-node-kafka", "At least one topic has to be specified for node {}", node_name(n)); + + if (json_ssl) { + + const char *calocation = nullptr; + + ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s }", + "calocation", &calocation + ); + if (ret) + throw ConfigError(json_ssl, err, "node-config-node-kafka-ssl", "Failed to parse SSL configuration of node {}", node_name(n)); + + if (!calocation) + throw ConfigError(json_ssl, "node-config-node-kafka-ssl", "'calocation' settings must be set for node {}.", node_name(n)); + + k->ssl.calocation = calocation ? strdup(calocation) : nullptr; + } + + if (json_sasl) { + const char *mechanism = nullptr; + const char *username = nullptr; + const char *password = nullptr; + + ret = json_unpack_ex(json_ssl, &err, 0, "{ s?: s, s?: s, s?: s }", + "mechanism", &mechanism, + "username", &username, + "password", &password + ); + if (ret) + throw ConfigError(json_sasl, err, "node-config-node-kafka-sasl", "Failed to parse SASL configuration of node {}", node_name(n)); + + if (!username && !password && !mechanism) + throw ConfigError(json_sasl, "node-config-node-kafka-sasl", "Either 'sasl.mechanism', 'sasl.username' or 'sasl.password' settings must be set for node {}.", node_name(n)); + + k->sasl.mechanism = mechanism ? strdup(mechanism) : nullptr; + k->sasl.username = username ? strdup(username) : nullptr; + k->sasl.password = password ? strdup(password) : nullptr; + } + + k->format = format_type_lookup(format); + if (!k->format) + throw ConfigError(json_ssl, "node-config-node-kafka-format", "Invalid format '{}' for node {}", format, node_name(n)); + + return 0; +} + +int kafka_prepare(struct vnode *n) +{ + int ret; + struct kafka *k = (struct kafka *) n->_vd; + + ret = io_init(&k->io, k->format, &n->in.signals, (int) SampleFlags::HAS_ALL & ~(int) SampleFlags::HAS_OFFSET); + if (ret) + return ret; + + ret = pool_init(&k->pool, 1024, SAMPLE_LENGTH(vlist_length(&n->in.signals))); + if (ret) + return ret; + + ret = queue_signalled_init(&k->queue, 1024); + if (ret) + return ret; + + return 0; +} + +char * kafka_print(struct vnode *n) +{ + struct kafka *k = (struct kafka *) n->_vd; + + char *buf = nullptr; + + strcatf(&buf, "format=%s, bootstrap.server=%s, client.id=%s, security.protocol=%s", format_type_name(k->format), + k->server, + k->client_id, + k->protocol + ); + + /* Only show if not default */ + if (k->produce) + strcatf(&buf, ", out.produce=%s", k->produce); + + if (k->consume) + strcatf(&buf, ", in.consume=%s", k->consume); + + return buf; +} + +int kafka_destroy(struct vnode *n) +{ + int ret; + struct kafka *k = (struct kafka *) n->_vd; + + rd_kafka_destroy(k->client); + + ret = io_destroy(&k->io); + if (ret) + return ret; + + ret = pool_destroy(&k->pool); + if (ret) + return ret; + + ret = queue_signalled_destroy(&k->queue); + if (ret) + return ret; + + if (k->produce) + free(k->produce); + if (k->consume) + free(k->consume); + if (k->protocol) + free(k->protocol); + if (k->client_id) + free(k->client_id); + if (k->group_id) + free(k->group_id); + + free(k->server); + + return 0; +} + +int kafka_start(struct vnode *n) +{ + int ret; + struct kafka *k = (struct kafka *) n->_vd; + + char *errstr; + rd_kafka_conf_t *rdkconf = rd_kafka_conf_new(); + if (rd_kafka_conf_set(rdkconf, "client.id", k->client_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + ret = 1; + if (rd_kafka_conf_set(rdkconf, "bootstrap.servers", k->server, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + ret = 1; + if (rd_kafka_conf_set(rdkconf, "security.protocol", k->protocol, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + ret = 1; + if (!strcmp(k->protocol, "SASL_SSL") || !strcmp(k->protocol, "SSL")) { + if (rd_kafka_conf_set(rdkconf, "ssl.ca.location", k->ssl.calocation, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + ret = 1; + } + + if (!strcmp(k->protocol, "SASL_PLAINTEXT") || !strcmp(k->protocol, "SASL_SSL")) { + if (rd_kafka_conf_set(rdkconf, "sasl.mechanisms", k->sasl.mechanism, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + ret = 1; + if (rd_kafka_conf_set(rdkconf, "sasl.username", k->sasl.username, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + ret = 1; + if (rd_kafka_conf_set(rdkconf, "sasl.password", k->sasl.password, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + ret = 1; + } + + if (ret) + goto kafka_config_error; + + if(k->produce) { + k->client = rd_kafka_new(RD_KAFKA_PRODUCER, rdkconf, errstr, sizeof(errstr)); + rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new(); + if (rd_kafka_topic_conf_set(topic_conf, "acks", "all", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + ret = 1; + k->topic = rd_kafka_topic_new(k->client, k->produce, topic_conf); + n->logger->info("Connected producer to bootstrap server {}", k->server); + } + else if(k->consume){ + rd_kafka_topic_partition_list_t *partitions = rd_kafka_topic_partition_list_new(0); + rd_kafka_topic_partition_list_add(partitions, k->consume, 0); + + if (rd_kafka_conf_set(rdkconf, "group.id", k->group_id, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) + ret = 1; + + k->client = rd_kafka_new(RD_KAFKA_CONSUMER, rdkconf, errstr, sizeof(errstr)); + rd_kafka_subscribe(k->client, partitions); + + n->logger->info("Subscribed consumer from bootstrap server {}", k->server); + } + + + if (!k->client) + goto kafka_server_error; + + // Add client to global list of kafka clients + // so that thread can call kafka loop for this client + vlist_push(&clients, n); + + return 0; + +kafka_config_error: + rd_kafka_conf_destroy(rdkconf); + n->logger->warn("{}", errstr); + return ret; + +kafka_server_error: + n->logger->warn("Error subscribing to {} at {} ", k->consume, k->server); + return ret; + +} + +int kafka_stop(struct vnode *n) +{ + int ret; + // struct kafka *k = (struct kafka *) n->_vd; + + // Unregister client from global kafka client list + // so that kafka loop is no longer invoked for this client + // important to do that before disconnecting from broker, otherwise, kafka thread will attempt to reconnect + vlist_remove(&clients, vlist_index(&clients, n)); + + // ret = kafka_disconnect(k->client); + // if (ret != RD_KAFKA_ERR_NO_ERROR) + // goto kafka_error; + ret = 0; + return ret; + +//kafka_error: + //n->logger->warn("{}", kafka_strerror(ret)); + + //return ret; +} + +int kafka_type_start(villas::node::SuperNode *sn) +{ + int ret; + + logger = logging.get("node:kafka"); + + ret = vlist_init(&clients); + if (ret) + goto kafka_error; + + // Start thread here to run kafka loop for registered clients + ret = pthread_create(&thread, nullptr, kafka_loop_thread, nullptr); + if (ret) + goto kafka_error; + + return 0; + +kafka_error: + logger->warn("Error initialazing node type kafka"); + + return ret; +} + +int kafka_type_stop() +{ + int ret; + + // Stop thread here that executes kafka loop + ret = pthread_cancel(thread); + if (ret) + return ret; + + logger->debug("Called pthread_cancel() on kafka communication management thread."); + + ret = pthread_join(thread, nullptr); + if (ret) + goto kafka_error; + + // When this is called the list of clients should be empty + if (vlist_length(&clients) > 0) + throw RuntimeError("List of kafka clients contains elements at time of destruction. Call node_stop for each kafka node before stopping node type!"); + + ret = vlist_destroy(&clients, nullptr, false); + if (ret) + goto kafka_error; + + return 0; + +kafka_error: + logger->warn("Error stoping node type kafka"); + + return ret; +} + +int kafka_read(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *release) +{ + int pulled; + struct kafka *k = (struct kafka *) n->_vd; + struct sample *smpt[cnt]; + + pulled = queue_signalled_pull_many(&k->queue, (void **) smpt, cnt); + + sample_copy_many(smps, smpt, pulled); + sample_decref_many(smpt, pulled); + + return pulled; +} + +int kafka_write(struct vnode *n, struct sample *smps[], unsigned cnt, unsigned *release) +{ + int ret; + struct kafka *k = (struct kafka *) n->_vd; + + size_t wbytes; + + char data[1500]; + + ret = io_sprint(&k->io, data, sizeof(data), &wbytes, smps, cnt); + if (ret < 0) + return ret; + + if (k->produce) { + ret = rd_kafka_produce(k->topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, + data, wbytes, NULL, 0, NULL); + + if (ret != RD_KAFKA_RESP_ERR_NO_ERROR) { + n->logger->warn("Publish failed"); + return -abs(ret); + } + } + else + n->logger->warn("No produce possible because no produce topic is configured"); + + return cnt; +} + +int kafka_poll_fds(struct vnode *n, int fds[]) +{ + struct kafka *k = (struct kafka *) n->_vd; + + fds[0] = queue_signalled_fd(&k->queue); + + return 1; +} + +static struct plugin p; + +__attribute__((constructor(110))) +static void register_plugin() { + p.name = "kafka"; + p.description = "Kafka event message streaming (rdkafka)"; + p.type = PluginType::NODE; + p.node.instances.state = State::DESTROYED; + p.node.vectorize = 0; + p.node.size = sizeof(struct kafka); + p.node.type.start = kafka_type_start; + p.node.type.stop = kafka_type_stop; + p.node.destroy = kafka_destroy; + p.node.prepare = kafka_prepare; + p.node.parse = kafka_parse; + p.node.prepare = kafka_prepare; + p.node.print = kafka_print; + p.node.init = kafka_init; + p.node.destroy = kafka_destroy; + p.node.start = kafka_start; + p.node.stop = kafka_stop; + p.node.read = kafka_read; + p.node.write = kafka_write; + p.node.reverse = kafka_reverse; + p.node.poll_fds = kafka_poll_fds; + + int ret = vlist_init(&p.node.instances); + if (!ret) + vlist_init_and_push(&plugins, &p); +} + +__attribute__((destructor(110))) +static void deregister_plugin() { + vlist_remove_all(&plugins, &p); +} diff --git a/packaging/deps.sh b/packaging/deps.sh index 9eefa5847..7d1bb1451 100644 --- a/packaging/deps.sh +++ b/packaging/deps.sh @@ -120,6 +120,17 @@ if ! pkg-config "libwebsockets >= 2.3.0" && \ popd fi +# Build & Install librdkafka +if ! pkg-config "rdkafka>=1.5.0" && \ + [ -z "${SKIP_RDKAFKA}" ]; then + git clone --branch v1.6.0 --depth 1 https://github.com/edenhill/librdkafka.git + mkdir -p librdkafka/build + pushd librdkafka/build + cmake .. + make -j$(nproc) ${TARGET} + popd +fi + # Build & Install uldaq if ! pkg-config "libuldaq >= 1.0.0" && \ [ "${DISTRO}" != "debian-multiarch" ] && \ diff --git a/packaging/docker/Dockerfile.centos b/packaging/docker/Dockerfile.centos index 3fd559d51..853441d5a 100644 --- a/packaging/docker/Dockerfile.centos +++ b/packaging/docker/Dockerfile.centos @@ -56,6 +56,7 @@ RUN dnf -y install \ zeromq-devel \ librabbitmq-devel \ mosquitto-devel \ + librdkafka-devel \ libibverbs-devel \ librdmacm-devel \ libusb1-devel \ diff --git a/packaging/docker/Dockerfile.debian b/packaging/docker/Dockerfile.debian index bd2d38f0b..1a0a933e6 100644 --- a/packaging/docker/Dockerfile.debian +++ b/packaging/docker/Dockerfile.debian @@ -56,6 +56,7 @@ RUN apt-get update && \ libnanomsg-dev \ librabbitmq-dev \ libmosquitto-dev \ + librdkafka-dev \ libcomedi-dev \ libibverbs-dev \ librdmacm-dev \ diff --git a/packaging/docker/Dockerfile.debian-multiarch b/packaging/docker/Dockerfile.debian-multiarch index 417db70f8..17310b596 100644 --- a/packaging/docker/Dockerfile.debian-multiarch +++ b/packaging/docker/Dockerfile.debian-multiarch @@ -62,6 +62,7 @@ RUN apt-get update && \ libnanomsg-dev:${ARCH} \ librabbitmq-dev:${ARCH} \ libmosquitto-dev:${ARCH} \ + librdkafka-dev:${ARCH} \ libcomedi-dev:${ARCH} \ libibverbs-dev:${ARCH} \ librdmacm-dev:${ARCH} \ diff --git a/packaging/docker/Dockerfile.fedora b/packaging/docker/Dockerfile.fedora index 7c25bdc86..2cd08f510 100644 --- a/packaging/docker/Dockerfile.fedora +++ b/packaging/docker/Dockerfile.fedora @@ -74,6 +74,7 @@ RUN dnf -y install \ nanomsg-devel \ librabbitmq-devel \ mosquitto-devel \ + librdkafka-devel \ libibverbs-devel \ librdmacm-devel \ libusb-devel \ diff --git a/packaging/docker/Dockerfile.ubuntu b/packaging/docker/Dockerfile.ubuntu index 3aaf0d931..d6ad6b401 100644 --- a/packaging/docker/Dockerfile.ubuntu +++ b/packaging/docker/Dockerfile.ubuntu @@ -56,6 +56,7 @@ RUN apt-get update && \ libnanomsg-dev \ librabbitmq-dev \ libmosquitto-dev \ + librdkafka-dev \ libcomedi-dev \ libibverbs-dev \ librdmacm-dev \