diff --git a/etc/example.conf b/etc/example.conf index 86cba27e7..e4fa53aeb 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -162,6 +162,13 @@ nodes = { "ipc:///tmp/test.ipc", "inproc://test" ] + }, + zeromq_node = { + type = "zeromq", + + pattern = "pubsub", # The ZeroMQ pattern. One of: 'pubsub', 'radiodish' + subscribe = "tcp://*:1234" # The subscribe endpoint. See http://api.zeromq.org/2-1:zmq-bind for details. + publish = [ "tcp://localhost:1235", "tcp://localhost:12444" ] # The publish endpoints. See http://api.zeromq.org/2-1:zmq-connect for details. } }; diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h new file mode 100644 index 000000000..c274fb6c4 --- /dev/null +++ b/include/villas/nodes/zeromq.h @@ -0,0 +1,76 @@ +/** Node type: ZeroMQ + * + * @file + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** + * @addtogroup zeromq ZeroMQ node type + * @ingroup node + * @{ + */ + +#pragma once + +#include "node.h" +#include "list.h" + +struct zeromq { + enum { + ZEROMQ_PATTERN_PUBSUB, + ZEROMQ_PATTERN_RADIODISH + } pattern; + + struct { + void *socket; /**< ZeroMQ socket. */ + char *endpoint; + } subscriber; + + struct { + void *socket; /**< ZeroMQ socket. */ + struct list endpoints; + } publisher; +}; + +/** @see node_type::print */ +char * zeromq_print(struct node *n); + +/** @see node_type::parse */ +int zeromq_parse(struct node *n, config_setting_t *cfg); + +/** @see node_type::init */ +int zeromq_init(); + +/** @see node_type::deinit */ +int zeromq_deinit(); + +/** @see node_type::open */ +int zeromq_start(struct node *n); + +/** @see node_type::close */ +int zeromq_stop(struct node *n); + +/** @see node_type::read */ +int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt); + +/** @see node_type::write */ +int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt); + +/** @} */ diff --git a/lib/Makefile.villas.inc b/lib/Makefile.villas.inc index 1901cb23d..c0033e851 100644 --- a/lib/Makefile.villas.inc +++ b/lib/Makefile.villas.inc @@ -59,6 +59,14 @@ ifeq ($(shell $(PKGCONFIG) nanomsg; echo $$?),0) endif endif +# Enable ZeroMQ node type when libzmq is available +ifndef WITHOUT_ZMQ +ifeq ($(shell $(PKGCONFIG) libzmq; echo $$?),0) + LIB_SRCS += $(addprefix lib/nodes/, zeromq.c) + LIB_PKGS += libzmq +endif +endif + # Enable VILLASfpga support when libxil is available ifndef WITHOUT_FPGA ifeq ($(shell $(PKGCONFIG) libxil; echo $$?),0) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c new file mode 100644 index 000000000..af68a49be --- /dev/null +++ b/lib/nodes/zeromq.c @@ -0,0 +1,250 @@ +/** Node type: ZeroMQ + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include "nodes/zeromq.h" +#include "utils.h" +#include "queue.h" +#include "plugin.h" + +static void *context; + +/* Release our samples. */ +static void free_msg(void *data, void *hint) +{ + struct sample *s = data; + + sample_put(s); +} + +int zeromq_reverse(struct node *n) +{ +// struct zeromq *z = n->_vd; + + return 0; +} + +int zeromq_parse(struct node *n, config_setting_t *cfg) +{ + struct zeromq *z = n->_vd; + + const char *ep, *type; + + config_setting_t *cfg_pub; + + list_init(&z->publisher.endpoints); + + if (config_setting_lookup_string(cfg, "subscribe", &ep)) + z->subscriber.endpoint = strdup(ep); + else + z->subscriber.endpoint = NULL; + + cfg_pub = config_setting_lookup(cfg, "publish"); + if (cfg_pub) { + switch (config_setting_type(cfg_pub)) { + case CONFIG_TYPE_LIST: + case CONFIG_TYPE_ARRAY: + for (int j = 0; j < config_setting_length(cfg_pub); j++) { + const char *ep = config_setting_get_string_elem(cfg_pub, j); + + list_push(&z->publisher.endpoints, strdup(ep)); + } + break; + + case CONFIG_TYPE_STRING: + ep = config_setting_get_string(cfg_pub); + + list_push(&z->publisher.endpoints, strdup(ep)); + + break; + + default: + cerror(cfg_pub, "Invalid type for ZeroMQ publisher setting"); + } + } + + if (config_setting_lookup_string(cfg, "pattern", &type)) { + if (!strcmp(type, "pubsub")) + z->pattern = ZEROMQ_PATTERN_PUBSUB; + else if (!strcmp(type, "radiodish")) + z->pattern = ZEROMQ_PATTERN_RADIODISH; + else + cerror(cfg, "Invalid type for ZeroMQ node: %s", node_name_short(n)); + } + + return 0; +} + +char * zeromq_print(struct node *n) +{ + struct zeromq *z = n->_vd; + + char *buf = NULL; + char *pattern = NULL; + + switch (z->pattern) { + case ZEROMQ_PATTERN_PUBSUB: pattern = "pubsub"; break; + case ZEROMQ_PATTERN_RADIODISH: pattern = "radiodish"; break; + } + + strcatf(&buf, "pattern=%s, subscribe=%s, publish=[ ", pattern, z->subscriber.endpoint); + + for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { + char *ep = list_at(&z->publisher.endpoints, i); + + strcatf(&buf, "%s ", ep); + } + + strcatf(&buf, " ]"); + + return buf; +} + +int zeromq_init(struct super_node *sn) +{ + context = zmq_ctx_new(); + + info("context is %p", context); + + return context == NULL; +} + +int zeromq_deinit() +{ + return zmq_ctx_term(context); +} + +int zeromq_start(struct node *n) +{ + int ret; + struct zeromq *z = n->_vd; + + switch (z->pattern) { + case ZEROMQ_PATTERN_RADIODISH: + z->subscriber.socket = zmq_socket(context, ZMQ_RADIO); + z->publisher.socket = zmq_socket(context, ZMQ_DISH); + break; + + case ZEROMQ_PATTERN_PUBSUB: + z->subscriber.socket = zmq_socket(context, ZMQ_SUB); + z->publisher.socket = zmq_socket(context, ZMQ_PUB); + break; + } + + /* Bind subscriber socket */ + if (z->subscriber.endpoint) { + ret = zmq_bind(z->subscriber.socket, z->subscriber.endpoint); + if (ret) + return ret; + } + + /* Subscribe to all pubsub messages. */ + zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, NULL, 0); + + /* Connect publisher socket */ + for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { + char *ep = list_at(&z->publisher.endpoints, i); + + ret = zmq_connect(z->publisher.socket, ep); + if (ret) + return ret; + } + + return 0; +} + +int zeromq_stop(struct node *n) +{ + int ret; + struct zeromq *z = n->_vd; + + ret = zmq_close(z->subscriber.socket); + if (ret) + return ret; + + return zmq_close(z->publisher.socket); +} + +int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) +{ + int i, ret; + struct zeromq *z = n->_vd; + + for (i = 0; i < cnt; i++) { + zmq_msg_t m; + + ret = zmq_msg_init_data(&m, smps[i], SAMPLE_LEN(smps[i]->capacity), free_msg, NULL); + if (ret < 0) + break; + + ret = zmq_msg_recv(&m, z->subscriber.socket, 0); + if (ret < 0) + break; + } + + return i; +} + +int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) +{ + int i, ret; + struct zeromq *z = n->_vd; + + for (i = 0; i < cnt; i++) { + zmq_msg_t m; + + sample_get(smps[i]); + + ret = zmq_msg_init_data(&m, smps[i], SAMPLE_LEN(smps[i]->length), free_msg, NULL); + if (ret < 0) + break; + + ret = zmq_msg_send(&m, z->publisher.socket, 0); + if (ret < 0) + break; + } + + return i; +} + +static struct plugin p = { + .name = "zeromq", + .description = "ZeroMQ Distributed Messaging", + .type = PLUGIN_TYPE_NODE, + .node = { + .vectorize = 0, + .size = sizeof(struct zeromq), + .reverse = zeromq_reverse, + .parse = zeromq_parse, + .print = zeromq_print, + .start = zeromq_start, + .stop = zeromq_stop, + .init = zeromq_init, + .deinit = zeromq_deinit, + .read = zeromq_read, + .write = zeromq_write, + .instances = LIST_INIT() + } +}; + +REGISTER_PLUGIN(&p) \ No newline at end of file