/* Node type: kafka. * * Author: Juan Pablo NoreƱa * SPDX-FileCopyrightText: 2021 Universidad Nacional de Colombia * SPDX-License-Identifier: Apache-2.0 */ #pragma once #include #include #include #include namespace villas { namespace node { // Forward declarations class NodeCompat; struct kafka { struct CQueueSignalled queue; struct Pool pool; double timeout; // Timeout in seconds. char *server; // Hostname/IP:Port address of the bootstrap server. char *protocol; // Security protocol. char *produce; // Producer topic. char *consume; // Consumer topic. char *client_id; // Client ID. struct { rd_kafka_t *client; rd_kafka_topic_t *topic; } producer; struct { rd_kafka_t *client; char *group_id; // Group id. } consumer; struct { char *ca; // SSL CA file. } ssl; struct { char *mechanisms; // SASL mechanisms. char *username; // SSL CA path. char *password; // SSL certificate. } sasl; Format *formatter; }; int kafka_reverse(NodeCompat *n); char *kafka_print(NodeCompat *n); int kafka_init(NodeCompat *n); int kafka_prepare(NodeCompat *n); int kafka_parse(NodeCompat *n, json_t *json); int kafka_start(NodeCompat *n); int kafka_destroy(NodeCompat *n); int kafka_stop(NodeCompat *n); int kafka_type_start(SuperNode *sn); int kafka_type_stop(); int kafka_poll_fds(NodeCompat *n, int fds[]); int kafka_read(NodeCompat *n, struct Sample *const smps[], unsigned cnt); int kafka_write(NodeCompat *n, struct Sample *const smps[], unsigned cnt); } // namespace node } // namespace villas