From 4316ecaf38b03eb45fe4f012bdc58906f6015271 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 22 May 2017 20:06:31 +0200 Subject: [PATCH 01/22] zeromq: add libzmq as thirdparty module --- .dockerignore | 3 ++- .gitmodules | 3 +++ Dockerfile.dev | 8 +++++++- thirdparty/Makefile.inc | 7 +++++-- thirdparty/libzmq | 1 + 5 files changed, 18 insertions(+), 4 deletions(-) create mode 160000 thirdparty/libzmq diff --git a/.dockerignore b/.dockerignore index 4e893e699..0f1a95af5 100644 --- a/.dockerignore +++ b/.dockerignore @@ -3,4 +3,5 @@ !thirdparty/libxil/ !thirdparty/criterion/ !thirdparty/libwebsockets/ -!thirdparty/nanomsg/ \ No newline at end of file +!thirdparty/nanomsg/ +!thirdparty/libzmq/ diff --git a/.gitmodules b/.gitmodules index 59504c262..02fef7637 100644 --- a/.gitmodules +++ b/.gitmodules @@ -28,3 +28,6 @@ [submodule "thirdparty/nanomsg"] path = thirdparty/nanomsg url = https://github.com/nanomsg/nanomsg.git +[submodule "thirdparty/libzmq"] + path = thirdparty/libzmq + url = https://github.com/zeromq/libzmq.git diff --git a/Dockerfile.dev b/Dockerfile.dev index 1e542147a..07709d3ee 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -45,7 +45,9 @@ RUN dnf -y install \ libconfig-devel \ libnl3-devel \ libcurl-devel \ - jansson-devel + jansson-devel \ + libsodium-devel \ + openpgm-devel # Several tools only needed for developement and testing RUN dnf -y install \ @@ -85,6 +87,10 @@ RUN mkdir -p /tmp/libwebsockets/build && cd /tmp/libwebsockets/build && cmake -D COPY thirdparty/nanomsg /tmp/nanomsg RUN mkdir -p /tmp/nanomsg/build && cd /tmp/nanomsg/build && cmake .. && make install +# Build & Install libzmq +COPY thirdparty/libzmq /tmp/libzmq +RUN cd /tmp/libzmq && autoreconf -fi && ./configure --with-libsodium --with-pgm --enable-drafts && make install + # Cleanup intermediate files from builds RUN rm -rf /tmp/* diff --git a/thirdparty/Makefile.inc b/thirdparty/Makefile.inc index e1eddf17d..757a8ac0c 100644 --- a/thirdparty/Makefile.inc +++ b/thirdparty/Makefile.inc @@ -21,7 +21,7 @@ ################################################################################### DEPS_CMAKE = libxil libwebsockets criterion jansson nanomsg -DEPS_AUTOCONF = libnl libconfig libcurl +DEPS_AUTOCONF = libnl libconfig libcurl libzmq DEPS = $(DEPS_CMAKE) $(DEPS_AUTOCONF) @@ -33,13 +33,15 @@ ifdef DEBUG AC_CXXFLAGS=-g -O0 endif +CONFIGURE_OPTS = --prefix=$(PREFIX) + thirdparty: # Install & compile autotools based projects $(DEPS_AUTOCONF): CPPFLAGS=$(AC_CPPFLAGS) CFLAGS=$(AC_CFLAGS) CXXFLAGS=$(AC_CXXFLAGS) $(DEPS_AUTOCONF): | $(BUILDDIR)/thirdparty/$$@/ autoreconf -fi $(SRCDIR)/thirdparty/$@ - cd $(BUILDDIR)/thirdparty/$@ && $(SRCDIR)/thirdparty/$@/configure --prefix=$(PREFIX) && make + cd $(BUILDDIR)/thirdparty/$@ && $(SRCDIR)/thirdparty/$@/configure $(CONFIGURE_OPTS) && make # Install & compile CMake based projects $(DEPS_CMAKE): | $(BUILDDIR)/thirdparty/$$@/ @@ -68,5 +70,6 @@ libconfig-fix: rm -f $(SRCDIR)/thirdparty/libconfig/lib/scanner.[hc] libwebsockets: CMAKE_OPTS += -DLWS_IPV6=1 -DLWS_WITH_STATIC=0 -DLWS_WITHOUT_TESTAPPS=1 -DLWS_WITH_HTTP2=1 +libzmq: CONFIGURE_OPTS += --with-libsodium --with-pgm --enable-drafts .PHONY: $(DEPS) thirdparty clean-thirdparty install-thirdparty \ No newline at end of file diff --git a/thirdparty/libzmq b/thirdparty/libzmq new file mode 160000 index 000000000..ec56eaaeb --- /dev/null +++ b/thirdparty/libzmq @@ -0,0 +1 @@ +Subproject commit ec56eaaeb666960548375cc3746ac0b6927977e4 From 5a267813d028e8b5c19ff2c451392571294e5f5f Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 22 May 2017 20:09:26 +0200 Subject: [PATCH 02/22] zeromq: added initial version of ZeroMQ node-type --- etc/example.conf | 7 + include/villas/nodes/zeromq.h | 76 +++++++++++ lib/Makefile.villas.inc | 8 ++ lib/nodes/zeromq.c | 250 ++++++++++++++++++++++++++++++++++ 4 files changed, 341 insertions(+) create mode 100644 include/villas/nodes/zeromq.h create mode 100644 lib/nodes/zeromq.c diff --git a/etc/example.conf b/etc/example.conf index 86cba27e7..e4fa53aeb 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -162,6 +162,13 @@ nodes = { "ipc:///tmp/test.ipc", "inproc://test" ] + }, + zeromq_node = { + type = "zeromq", + + pattern = "pubsub", # The ZeroMQ pattern. One of: 'pubsub', 'radiodish' + subscribe = "tcp://*:1234" # The subscribe endpoint. See http://api.zeromq.org/2-1:zmq-bind for details. + publish = [ "tcp://localhost:1235", "tcp://localhost:12444" ] # The publish endpoints. See http://api.zeromq.org/2-1:zmq-connect for details. } }; diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h new file mode 100644 index 000000000..c274fb6c4 --- /dev/null +++ b/include/villas/nodes/zeromq.h @@ -0,0 +1,76 @@ +/** Node type: ZeroMQ + * + * @file + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** + * @addtogroup zeromq ZeroMQ node type + * @ingroup node + * @{ + */ + +#pragma once + +#include "node.h" +#include "list.h" + +struct zeromq { + enum { + ZEROMQ_PATTERN_PUBSUB, + ZEROMQ_PATTERN_RADIODISH + } pattern; + + struct { + void *socket; /**< ZeroMQ socket. */ + char *endpoint; + } subscriber; + + struct { + void *socket; /**< ZeroMQ socket. */ + struct list endpoints; + } publisher; +}; + +/** @see node_type::print */ +char * zeromq_print(struct node *n); + +/** @see node_type::parse */ +int zeromq_parse(struct node *n, config_setting_t *cfg); + +/** @see node_type::init */ +int zeromq_init(); + +/** @see node_type::deinit */ +int zeromq_deinit(); + +/** @see node_type::open */ +int zeromq_start(struct node *n); + +/** @see node_type::close */ +int zeromq_stop(struct node *n); + +/** @see node_type::read */ +int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt); + +/** @see node_type::write */ +int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt); + +/** @} */ diff --git a/lib/Makefile.villas.inc b/lib/Makefile.villas.inc index 1901cb23d..c0033e851 100644 --- a/lib/Makefile.villas.inc +++ b/lib/Makefile.villas.inc @@ -59,6 +59,14 @@ ifeq ($(shell $(PKGCONFIG) nanomsg; echo $$?),0) endif endif +# Enable ZeroMQ node type when libzmq is available +ifndef WITHOUT_ZMQ +ifeq ($(shell $(PKGCONFIG) libzmq; echo $$?),0) + LIB_SRCS += $(addprefix lib/nodes/, zeromq.c) + LIB_PKGS += libzmq +endif +endif + # Enable VILLASfpga support when libxil is available ifndef WITHOUT_FPGA ifeq ($(shell $(PKGCONFIG) libxil; echo $$?),0) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c new file mode 100644 index 000000000..af68a49be --- /dev/null +++ b/lib/nodes/zeromq.c @@ -0,0 +1,250 @@ +/** Node type: ZeroMQ + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include "nodes/zeromq.h" +#include "utils.h" +#include "queue.h" +#include "plugin.h" + +static void *context; + +/* Release our samples. */ +static void free_msg(void *data, void *hint) +{ + struct sample *s = data; + + sample_put(s); +} + +int zeromq_reverse(struct node *n) +{ +// struct zeromq *z = n->_vd; + + return 0; +} + +int zeromq_parse(struct node *n, config_setting_t *cfg) +{ + struct zeromq *z = n->_vd; + + const char *ep, *type; + + config_setting_t *cfg_pub; + + list_init(&z->publisher.endpoints); + + if (config_setting_lookup_string(cfg, "subscribe", &ep)) + z->subscriber.endpoint = strdup(ep); + else + z->subscriber.endpoint = NULL; + + cfg_pub = config_setting_lookup(cfg, "publish"); + if (cfg_pub) { + switch (config_setting_type(cfg_pub)) { + case CONFIG_TYPE_LIST: + case CONFIG_TYPE_ARRAY: + for (int j = 0; j < config_setting_length(cfg_pub); j++) { + const char *ep = config_setting_get_string_elem(cfg_pub, j); + + list_push(&z->publisher.endpoints, strdup(ep)); + } + break; + + case CONFIG_TYPE_STRING: + ep = config_setting_get_string(cfg_pub); + + list_push(&z->publisher.endpoints, strdup(ep)); + + break; + + default: + cerror(cfg_pub, "Invalid type for ZeroMQ publisher setting"); + } + } + + if (config_setting_lookup_string(cfg, "pattern", &type)) { + if (!strcmp(type, "pubsub")) + z->pattern = ZEROMQ_PATTERN_PUBSUB; + else if (!strcmp(type, "radiodish")) + z->pattern = ZEROMQ_PATTERN_RADIODISH; + else + cerror(cfg, "Invalid type for ZeroMQ node: %s", node_name_short(n)); + } + + return 0; +} + +char * zeromq_print(struct node *n) +{ + struct zeromq *z = n->_vd; + + char *buf = NULL; + char *pattern = NULL; + + switch (z->pattern) { + case ZEROMQ_PATTERN_PUBSUB: pattern = "pubsub"; break; + case ZEROMQ_PATTERN_RADIODISH: pattern = "radiodish"; break; + } + + strcatf(&buf, "pattern=%s, subscribe=%s, publish=[ ", pattern, z->subscriber.endpoint); + + for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { + char *ep = list_at(&z->publisher.endpoints, i); + + strcatf(&buf, "%s ", ep); + } + + strcatf(&buf, " ]"); + + return buf; +} + +int zeromq_init(struct super_node *sn) +{ + context = zmq_ctx_new(); + + info("context is %p", context); + + return context == NULL; +} + +int zeromq_deinit() +{ + return zmq_ctx_term(context); +} + +int zeromq_start(struct node *n) +{ + int ret; + struct zeromq *z = n->_vd; + + switch (z->pattern) { + case ZEROMQ_PATTERN_RADIODISH: + z->subscriber.socket = zmq_socket(context, ZMQ_RADIO); + z->publisher.socket = zmq_socket(context, ZMQ_DISH); + break; + + case ZEROMQ_PATTERN_PUBSUB: + z->subscriber.socket = zmq_socket(context, ZMQ_SUB); + z->publisher.socket = zmq_socket(context, ZMQ_PUB); + break; + } + + /* Bind subscriber socket */ + if (z->subscriber.endpoint) { + ret = zmq_bind(z->subscriber.socket, z->subscriber.endpoint); + if (ret) + return ret; + } + + /* Subscribe to all pubsub messages. */ + zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, NULL, 0); + + /* Connect publisher socket */ + for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { + char *ep = list_at(&z->publisher.endpoints, i); + + ret = zmq_connect(z->publisher.socket, ep); + if (ret) + return ret; + } + + return 0; +} + +int zeromq_stop(struct node *n) +{ + int ret; + struct zeromq *z = n->_vd; + + ret = zmq_close(z->subscriber.socket); + if (ret) + return ret; + + return zmq_close(z->publisher.socket); +} + +int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) +{ + int i, ret; + struct zeromq *z = n->_vd; + + for (i = 0; i < cnt; i++) { + zmq_msg_t m; + + ret = zmq_msg_init_data(&m, smps[i], SAMPLE_LEN(smps[i]->capacity), free_msg, NULL); + if (ret < 0) + break; + + ret = zmq_msg_recv(&m, z->subscriber.socket, 0); + if (ret < 0) + break; + } + + return i; +} + +int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) +{ + int i, ret; + struct zeromq *z = n->_vd; + + for (i = 0; i < cnt; i++) { + zmq_msg_t m; + + sample_get(smps[i]); + + ret = zmq_msg_init_data(&m, smps[i], SAMPLE_LEN(smps[i]->length), free_msg, NULL); + if (ret < 0) + break; + + ret = zmq_msg_send(&m, z->publisher.socket, 0); + if (ret < 0) + break; + } + + return i; +} + +static struct plugin p = { + .name = "zeromq", + .description = "ZeroMQ Distributed Messaging", + .type = PLUGIN_TYPE_NODE, + .node = { + .vectorize = 0, + .size = sizeof(struct zeromq), + .reverse = zeromq_reverse, + .parse = zeromq_parse, + .print = zeromq_print, + .start = zeromq_start, + .stop = zeromq_stop, + .init = zeromq_init, + .deinit = zeromq_deinit, + .read = zeromq_read, + .write = zeromq_write, + .instances = LIST_INIT() + } +}; + +REGISTER_PLUGIN(&p) \ No newline at end of file From 8f6b38cd63d6266e83f7d12425e085a57939368d Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 22 May 2017 20:10:12 +0200 Subject: [PATCH 03/22] added new function node_type_name() --- include/villas/node_type.h | 3 +++ lib/node_type.c | 9 +++++++-- src/pipe.c | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/include/villas/node_type.h b/include/villas/node_type.h index d0dcdc8d1..2beef8e85 100644 --- a/include/villas/node_type.h +++ b/include/villas/node_type.h @@ -157,4 +157,7 @@ int node_type_start(struct node_type *vt, struct super_node *sn); */ int node_type_stop(struct node_type *vt); +/** Return a printable representation of the node-type. */ +char * node_type_name(struct node_type *vt); + /** @} */ \ No newline at end of file diff --git a/lib/node_type.c b/lib/node_type.c index b4fb095e4..92a26c851 100644 --- a/lib/node_type.c +++ b/lib/node_type.c @@ -36,7 +36,7 @@ int node_type_start(struct node_type *vt, struct super_node *sn) if (vt->state != STATE_DESTROYED) return 0; - info("Initializing " YEL("%s") " node type which is used by %zu nodes", plugin_name(vt), list_length(&vt->instances)); + info("Initializing " YEL("%s") " node type which is used by %zu nodes", node_type_name(vt), list_length(&vt->instances)); { INDENT ret = vt->init ? vt->init(sn) : 0; } @@ -54,7 +54,7 @@ int node_type_stop(struct node_type *vt) if (vt->state != STATE_STARTED) return 0; - info("De-initializing " YEL("%s") " node type", plugin_name(vt)); + info("De-initializing " YEL("%s") " node type", node_type_name(vt)); { INDENT ret = vt->deinit ? vt->deinit() : 0; } @@ -64,3 +64,8 @@ int node_type_stop(struct node_type *vt) return ret; } + +char * node_type_name(struct node_type *vt) +{ + return plugin_name(vt); +} \ No newline at end of file diff --git a/src/pipe.c b/src/pipe.c index 074fb5106..97557f527 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -253,7 +253,7 @@ int main(int argc, char *argv[]) ret = node_type_start(node->_vt, &sn); if (ret) - error("Failed to intialize node type: %s", node_name(node)); + error("Failed to intialize node type: %s", node_type_name(node->_vt)); ret = node_check(node); if (ret) From 246a4a98fb0966c75f4cb246741219126b70e3c2 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 09:12:24 +0200 Subject: [PATCH 04/22] zeromq: added zmq-keygen as a new tool --- packaging/rpm/villas-node.spec | 1 + tools/Makefile.inc | 6 ++++ tools/zmq-keygen.c | 52 ++++++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+) create mode 100644 tools/zmq-keygen.c diff --git a/packaging/rpm/villas-node.spec b/packaging/rpm/villas-node.spec index c8879e636..58ec70c3d 100644 --- a/packaging/rpm/villas-node.spec +++ b/packaging/rpm/villas-node.spec @@ -57,6 +57,7 @@ rm -rf %{?buildroot} /usr/bin/villas /usr/bin/villas-* /usr/bin/conf2json +/usr/bin/zmq-keygen /usr/lib/libvillas.so /usr/lib/libvillas.so.* diff --git a/tools/Makefile.inc b/tools/Makefile.inc index 534d34c6e..6f2392176 100644 --- a/tools/Makefile.inc +++ b/tools/Makefile.inc @@ -26,6 +26,12 @@ TOOLS_CFLAGS = $(CFLAGS) TOOLS_LDLIBS = -lconfig -ljansson -lvillas TOOLS_LDFLAGS = $(LDFLAGS) -Wl,-rpath,'$$ORIGIN' +ifeq ($(shell $(PKGCONFIG) libzmq; echo $$?),0) + TOOLS += $(BUILDDIR)/zmq-keygen + TOOLS_CFLAGS += $(shell $(PKGCONFIG) --cflags libzmq) + TOOLS_LDLIBS += $(shell $(PKGCONFIG) --libs libzmq) +endif + # Compile executable objects $(BUILDDIR)/tools/%.o: tools/%.c $(BUILDDIR)/defines | $$(dir $$@) $(CC) $(TOOLS_CFLAGS) -c $< -o $@ diff --git a/tools/zmq-keygen.c b/tools/zmq-keygen.c new file mode 100644 index 000000000..0984abb54 --- /dev/null +++ b/tools/zmq-keygen.c @@ -0,0 +1,52 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include + +int main (int argc, char *argv[]) +{ + char public_key [41]; + char secret_key [41]; + if (zmq_curve_keypair(public_key, secret_key)) { + if (zmq_errno() == ENOTSUP) + printf("To use %s, please install libsodium and then rebuild libzmq.", argv[0]); + + exit (EXIT_FAILURE); + } + + printf("# Copy these lines to your 'zeromq' node-configuration\n"); + printf("curve = {\n"); + printf("\tpublic_key = \"%s\";\n", public_key); + printf("\tsecret_key = \"%s\";\n", secret_key); + printf("}\n"); + + exit (0); +} \ No newline at end of file From 465999d608e6cfa5768b3f4fae9baa10de3e2d69 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 08:59:42 +0200 Subject: [PATCH 05/22] socket: save MSG_PEEK recv(2) call by using a fixed allocation. This should work for most data link layers. --- lib/nodes/socket.c | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 828375999..af7977b31 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -40,6 +40,8 @@ #include "queue.h" #include "plugin.h" +#define MAX_PACKETLEN 1500 + /* Forward declartions */ static struct plugin p; @@ -327,23 +329,14 @@ static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt) static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cnt) { + int ret; struct socket *s = n->_vd; - int ret; + char data[MAX_PACKETLEN]; ssize_t bytes; - /* Peak into message header of the first sample and to get total packet size. */ - bytes = recv(s->sd, NULL, 0, MSG_PEEK | MSG_TRUNC); - if (bytes < MSG_LEN(1) || bytes % 4 != 0) { - warn("Received invalid packet for node %s", node_name(n)); - recv(s->sd, NULL, 0, 0); /* empty receive buffer */ - return -1; - } - - char data[bytes]; - /* Receive message from socket */ - bytes = recv(s->sd, data, bytes, 0); + bytes = recv(s->sd, data, sizeof(data), 0); if (bytes == 0) error("Remote node %s closed the connection", node_name(n)); else if (bytes < 0) From 98cd2938afad33e482b5d1b33beebafd027d497c Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 09:33:42 +0200 Subject: [PATCH 06/22] add new msg_buffer() functions to read / write multiple struct sample from / to a buffer using the struct msg wire protocol. --- lib/nodes/socket.c | 2 -- lib/nodes/zeromq.c | 75 ++++++++++++++++++++++++---------------------- 2 files changed, 39 insertions(+), 38 deletions(-) diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index af7977b31..0e9310516 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -40,8 +40,6 @@ #include "queue.h" #include "plugin.h" -#define MAX_PACKETLEN 1500 - /* Forward declartions */ static struct plugin p; diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index af68a49be..a7bb5b877 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -26,17 +26,10 @@ #include "utils.h" #include "queue.h" #include "plugin.h" +#include "msg.h" static void *context; -/* Release our samples. */ -static void free_msg(void *data, void *hint) -{ - struct sample *s = data; - - sample_put(s); -} - int zeromq_reverse(struct node *n) { // struct zeromq *z = n->_vd; @@ -187,44 +180,54 @@ int zeromq_stop(struct node *n) int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) { - int i, ret; + int recv, ret; struct zeromq *z = n->_vd; - for (i = 0; i < cnt; i++) { - zmq_msg_t m; - - ret = zmq_msg_init_data(&m, smps[i], SAMPLE_LEN(smps[i]->capacity), free_msg, NULL); - if (ret < 0) - break; - - ret = zmq_msg_recv(&m, z->subscriber.socket, 0); - if (ret < 0) - break; - } + zmq_msg_t m; + + ret = zmq_msg_init(&m); + if (ret < 0) + return ret; + + ret = zmq_msg_recv(&m, z->subscriber.socket, 0); + if (ret < 0) + return ret; - return i; + recv = msg_buffer_to_samples(smps, cnt, zmq_msg_data(&m), zmq_msg_size(&m)); + + ret = zmq_msg_close(&m); + if (ret) + return ret; + + return recv; } int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) { - int i, ret; + int ret; struct zeromq *z = n->_vd; + + ssize_t sent; + zmq_msg_t m; + + char data[1500]; - for (i = 0; i < cnt; i++) { - zmq_msg_t m; - - sample_get(smps[i]); - - ret = zmq_msg_init_data(&m, smps[i], SAMPLE_LEN(smps[i]->length), free_msg, NULL); - if (ret < 0) - break; - - ret = zmq_msg_send(&m, z->publisher.socket, 0); - if (ret < 0) - break; - } + sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data)); + if (sent < 0) + return -1; - return i; + ret = zmq_msg_init_size(&m, sent); + + memcpy(zmq_msg_data(&m), data, sent); + + if (ret < 0) + return ret; + + ret = zmq_msg_send(&m, z->publisher.socket, 0); + if (ret < 0) + return ret; + + return cnt; } static struct plugin p = { From 9edd0fc68db94b2106d38d9516a7e056662a6b4e Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 09:06:22 +0200 Subject: [PATCH 07/22] zeromq: fix little bug because socket-types were swapped --- lib/nodes/zeromq.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index a7bb5b877..e86ec04c4 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -116,9 +116,7 @@ char * zeromq_print(struct node *n) int zeromq_init(struct super_node *sn) { context = zmq_ctx_new(); - - info("context is %p", context); - + return context == NULL; } @@ -134,8 +132,8 @@ int zeromq_start(struct node *n) switch (z->pattern) { case ZEROMQ_PATTERN_RADIODISH: - z->subscriber.socket = zmq_socket(context, ZMQ_RADIO); - z->publisher.socket = zmq_socket(context, ZMQ_DISH); + z->subscriber.socket = zmq_socket(context, ZMQ_DISH); + z->publisher.socket = zmq_socket(context, ZMQ_RADIO); break; case ZEROMQ_PATTERN_PUBSUB: From 5ef08552b22ec1d6414f455b2737e2fe435fbbf7 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 09:12:08 +0200 Subject: [PATCH 08/22] zeromq: added IPv6 options --- include/villas/nodes/zeromq.h | 2 ++ lib/nodes/zeromq.c | 11 +++++++++++ 2 files changed, 13 insertions(+) diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index c274fb6c4..5e7d196c4 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -33,6 +33,8 @@ #include "list.h" struct zeromq { + int ipv6; + enum { ZEROMQ_PATTERN_PUBSUB, ZEROMQ_PATTERN_RADIODISH diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index e86ec04c4..1fa349f76 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -84,6 +84,9 @@ int zeromq_parse(struct node *n, config_setting_t *cfg) else cerror(cfg, "Invalid type for ZeroMQ node: %s", node_name_short(n)); } + + if (!config_setting_lookup_bool(cfg, "ipv6", &z->ipv6)) + z->ipv6 = 0; return 0; } @@ -141,10 +144,18 @@ int zeromq_start(struct node *n) z->publisher.socket = zmq_socket(context, ZMQ_PUB); break; } + + ret = zmq_setsockopt(z->publisher.socket, &z->ipv6, sizeof(z->ipv6)); + if (ret) + return ret; /* Bind subscriber socket */ if (z->subscriber.endpoint) { ret = zmq_bind(z->subscriber.socket, z->subscriber.endpoint); + ret = zmq_setsockopt(z->subscriber.socket, &z->ipv6, sizeof(z->ipv6)); + if (ret) + return ret; + if (ret) return ret; } From b6c39611c17c62479d09fafd64dc79cbf5670a43 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 09:33:28 +0200 Subject: [PATCH 09/22] zeromq: add support to reverse node --- lib/nodes/zeromq.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 1fa349f76..a0c6b3ee6 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -32,7 +32,16 @@ static void *context; int zeromq_reverse(struct node *n) { -// struct zeromq *z = n->_vd; + struct zeromq *z = n->_vd; + + if (list_length(&z->publisher.endpoints) != 1) + return -1; + + char *subscriber = z->subscriber.endpoint; + char *publisher = list_first(&z->publisher.endpoints); + + z->subscriber.endpoint = publisher; + list_set(&z->publisher.endpoints, 0, subscriber); return 0; } From 0f197cb22372c0bb075c4ac8efad4d66bca9c860 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 09:20:57 +0200 Subject: [PATCH 10/22] zeromq: check if handshake succeeded --- lib/nodes/zeromq.c | 60 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 59 insertions(+), 1 deletion(-) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index a0c6b3ee6..e17e73c44 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -30,6 +30,45 @@ static void *context; +#ifdef ZMQ_BUILD_DRAFT_API +/** Read one event off the monitor socket; return value and address + * by reference, if not null, and event number by value. + * + * @returnval -1 In case of error. */ +static int get_monitor_event(void *monitor, int *value, char **address) +{ + /* First frame in message contains event number and value */ + zmq_msg_t msg; + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; /* Interruped, presumably. */ + + assert (zmq_msg_more (&msg)); + + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + uint16_t event = *(uint16_t *) (data); + if (value) + *value = *(uint32_t *) (data + 2); + + /* Second frame in message contains event address */ + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; /* Interruped, presumably. */ + + assert (!zmq_msg_more (&msg)); + + if (address) { + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + size_t size = zmq_msg_size (&msg); + *address = (char *) malloc (size + 1); + memcpy (*address, data, size); + *address [size] = 0; + } + + return event; +} +#endif + int zeromq_reverse(struct node *n) { struct zeromq *z = n->_vd; @@ -171,6 +210,20 @@ int zeromq_start(struct node *n) /* Subscribe to all pubsub messages. */ zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, NULL, 0); +#ifdef ZMQ_BUILD_DRAFT_API + /* Monitor handshake events on the server */ + ret = zmq_socket_monitor(z->subscriber.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED); + assert(ret == 0); + + /* Create socket for collecting monitor events */ + void *server_mon = zmq_socket(context, ZMQ_PAIR); + assert(server_mon); + + /* Connect it to the inproc endpoints so they'll get events */ + ret = zmq_connect(server_mon, "inproc://monitor-server"); + assert(ret == 0); +#endif + /* Connect publisher socket */ for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { @@ -180,8 +233,13 @@ int zeromq_start(struct node *n) if (ret) return ret; } - + +#ifdef ZMQ_BUILD_DRAFT_API + ret = get_monitor_event(server_mon, NULL, NULL); + return ret == ZMQ_EVENT_HANDSHAKE_SUCCEED; +#else return 0; +#endif } int zeromq_stop(struct node *n) From 503da4bb945790c6a1b10bdbfe1327e5ee633e9f Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 09:34:53 +0200 Subject: [PATCH 11/22] zeromq: add support for filtering based on pubsub groups --- include/villas/nodes/zeromq.h | 2 ++ lib/nodes/zeromq.c | 38 ++++++++++++++++++++++++++++------- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index 5e7d196c4..53912a90b 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -35,6 +35,8 @@ struct zeromq { int ipv6; + char *filter; + enum { ZEROMQ_PATTERN_PUBSUB, ZEROMQ_PATTERN_RADIODISH diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index e17e73c44..ddf76b173 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -89,7 +89,7 @@ int zeromq_parse(struct node *n, config_setting_t *cfg) { struct zeromq *z = n->_vd; - const char *ep, *type; + const char *ep, *type, *filter; config_setting_t *cfg_pub; @@ -124,6 +124,11 @@ int zeromq_parse(struct node *n, config_setting_t *cfg) } } + if (config_setting_lookup_string(cfg, "filter", &filter)) + z->filter = strdup(filter); + else + z->filter = NULL; + if (config_setting_lookup_string(cfg, "pattern", &type)) { if (!strcmp(type, "pubsub")) z->pattern = ZEROMQ_PATTERN_PUBSUB; @@ -160,6 +165,9 @@ char * zeromq_print(struct node *n) } strcatf(&buf, " ]"); + + if (z->filter) + strcatf(&buf, ", filter=%s", z->filter); return buf; } @@ -204,12 +212,17 @@ int zeromq_start(struct node *n) if (ret) return ret; + /* Subscribe to pubsub messages. */ + if (z->filter && z->pattern == ZEROMQ_PATTERN_PUBSUB) { + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, strlen(z->filter)); + if (ret < 0) + return ret; + } + if (ret) return ret; } - /* Subscribe to all pubsub messages. */ - zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, NULL, 0); #ifdef ZMQ_BUILD_DRAFT_API /* Monitor handshake events on the server */ ret = zmq_socket_monitor(z->subscriber.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED); @@ -224,7 +237,6 @@ int zeromq_start(struct node *n) assert(ret == 0); #endif - /* Connect publisher socket */ for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { char *ep = list_at(&z->publisher.endpoints, i); @@ -294,16 +306,28 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) ret = zmq_msg_init_size(&m, sent); + if (z->filter && z->pattern == ZEROMQ_PATTERN_RADIODISH) { + ret = zmq_msg_set_group(&m, z->filter); + if (ret < 0) + goto fail; + } + memcpy(zmq_msg_data(&m), data, sent); - if (ret < 0) - return ret; - ret = zmq_msg_send(&m, z->publisher.socket, 0); + if (ret < 0) + goto fail; + + ret = zmq_msg_close(&m); if (ret < 0) return ret; return cnt; + +fail: + zmq_msg_close(&m); + + return ret; } static struct plugin p = { From fbf75d90eecca5eae5b2fac8aaea34e37a9af032 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 09:34:30 +0200 Subject: [PATCH 12/22] zeromq: fix ipv6 --- lib/nodes/zeromq.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index ddf76b173..bc750ad5b 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -201,14 +201,14 @@ int zeromq_start(struct node *n) break; } - ret = zmq_setsockopt(z->publisher.socket, &z->ipv6, sizeof(z->ipv6)); + ret = zmq_setsockopt(z->publisher.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) return ret; /* Bind subscriber socket */ if (z->subscriber.endpoint) { ret = zmq_bind(z->subscriber.socket, z->subscriber.endpoint); - ret = zmq_setsockopt(z->subscriber.socket, &z->ipv6, sizeof(z->ipv6)); + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) return ret; From 7e602dd44e552d9239e79ad8f2cde856a07fb042 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 09:58:11 +0200 Subject: [PATCH 13/22] zeromq: added support for CurveZMQ encryption and authentication (still not working) --- include/villas/nodes/zeromq.h | 10 +++++ lib/nodes/zeromq.c | 84 ++++++++++++++++++++++++++++++++--- 2 files changed, 88 insertions(+), 6 deletions(-) diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index 53912a90b..ba6fb3a94 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -29,6 +29,8 @@ #pragma once +#include + #include "node.h" #include "list.h" @@ -37,6 +39,14 @@ struct zeromq { char *filter; + struct { + int enabled; + struct { + char public_key[41]; + char secret_key[41]; + } server, client; + } curve; + enum { ZEROMQ_PATTERN_PUBSUB, ZEROMQ_PATTERN_RADIODISH diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index bc750ad5b..4083d8d57 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -91,7 +91,7 @@ int zeromq_parse(struct node *n, config_setting_t *cfg) const char *ep, *type, *filter; - config_setting_t *cfg_pub; + config_setting_t *cfg_pub, *cfg_curve; list_init(&z->publisher.endpoints); @@ -124,6 +124,38 @@ int zeromq_parse(struct node *n, config_setting_t *cfg) } } + cfg_curve = config_setting_lookup(cfg, "curve"); + if (cfg_curve) { + if (!config_setting_is_group(cfg_curve)) + cerror(cfg_curve, "The curve setting must be a group"); + + const char *public_key, *secret_key; + + if (!config_setting_lookup_string(cfg_curve, "public_key", &public_key)) + cerror(cfg_curve, "Setting 'curve.public_key' is missing"); + + if (!config_setting_lookup_string(cfg_curve, "secret_key", &secret_key)) + cerror(cfg_curve, "Setting 'curve.secret_key' is missing"); + + if (!config_setting_lookup_bool(cfg_curve, "enabled", &z->curve.enabled)) + z->curve.enabled = true; + + if (strlen(secret_key) != 40) + cerror(cfg_curve, "Setting 'curve.secret_key' must be a Z85 encoded CurveZMQ key"); + + if (strlen(public_key) != 40) + cerror(cfg_curve, "Setting 'curve.public_key' must be a Z85 encoded CurveZMQ key"); + + strncpy(z->curve.server.public_key, public_key, 41); + strncpy(z->curve.server.secret_key, secret_key, 41); + } + else + z->curve.enabled = false; + + /** @todo We should fix this. Its mostly done. */ + if (z->curve.enabled) + cerror(cfg_curve, "CurveZMQ support is currently broken"); + if (config_setting_lookup_string(cfg, "filter", &filter)) z->filter = strdup(filter); else @@ -156,7 +188,7 @@ char * zeromq_print(struct node *n) case ZEROMQ_PATTERN_RADIODISH: pattern = "radiodish"; break; } - strcatf(&buf, "pattern=%s, subscribe=%s, publish=[ ", pattern, z->subscriber.endpoint); + strcatf(&buf, "pattern=%s, ipv6=%s, crypto=%s, subscribe=%s, publish=[ ", pattern, z->ipv6 ? "yes" : "no", z->curve.enabled ? "yes" : "no", z->subscriber.endpoint); for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { char *ep = list_at(&z->publisher.endpoints, i); @@ -205,9 +237,6 @@ int zeromq_start(struct node *n) if (ret) return ret; - /* Bind subscriber socket */ - if (z->subscriber.endpoint) { - ret = zmq_bind(z->subscriber.socket, z->subscriber.endpoint); ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) return ret; @@ -219,6 +248,38 @@ int zeromq_start(struct node *n) return ret; } + if (z->curve.enabled) { + /* Publisher has server role */ + ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SECRETKEY, z->curve.server.secret_key, 41); + if (ret) + return ret; + + ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_PUBLICKEY, z->curve.server.public_key, 41); + if (ret) + return ret; + + int curve_server = 1; + ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SERVER, &curve_server, sizeof(curve_server)); + if (ret) + return ret; + } + + if (z->curve.enabled) { + /* Create temporary client keys first */ + ret = zmq_curve_keypair(z->curve.client.public_key, z->curve.client.secret_key); + if (ret) + return ret; + + /* Subscriber has client role */ + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SECRETKEY, z->curve.client.secret_key, 41); + if (ret) + return ret; + + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_PUBLICKEY, z->curve.client.public_key, 41); + if (ret) + return ret; + + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SERVERKEY, z->curve.server.public_key, 41); if (ret) return ret; } @@ -236,14 +297,25 @@ int zeromq_start(struct node *n) ret = zmq_connect(server_mon, "inproc://monitor-server"); assert(ret == 0); #endif + + /* Bind subscriber socket */ + if (z->subscriber.endpoint) { + ret = zmq_bind(z->subscriber.socket, z->subscriber.endpoint); + if (ret) { + info("Failed to bind ZeroMQ socket: endpoint=%s, error=%s", z->subscriber.endpoint, zmq_strerror(errno)); + return ret; + } + } /* Connect publisher socket */ for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { char *ep = list_at(&z->publisher.endpoints, i); ret = zmq_connect(z->publisher.socket, ep); - if (ret) + if (ret) { + info("Failed to connect to ZeroMQ endpoint: endpoint=%s, error=%s", ep, zmq_strerror(errno)); return ret; + } } #ifdef ZMQ_BUILD_DRAFT_API From 890d5e24970d44f9410a51f6e6ce2368d27c7060 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 09:58:34 +0200 Subject: [PATCH 14/22] zeromq: updated example configuration --- etc/example.conf | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/etc/example.conf b/etc/example.conf index e4fa53aeb..e897c129e 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -167,8 +167,19 @@ nodes = { type = "zeromq", pattern = "pubsub", # The ZeroMQ pattern. One of: 'pubsub', 'radiodish' + ipv6 = false, # Enable IPv6 support + filter = "ab184", # A filter which is prefix matched + curve = { # Z85 encoded Curve25519 keys + enabled = true, + public_key = "Veg+Q.V-c&1k>yVh663gQ^7fL($y47gybE-nZP1L", + secret_key = "HPY.+mFuB[jGs@(zZr6$IZ1H1dZ7Ji*j>oi@O?Pc" + } + subscribe = "tcp://*:1234" # The subscribe endpoint. See http://api.zeromq.org/2-1:zmq-bind for details. - publish = [ "tcp://localhost:1235", "tcp://localhost:12444" ] # The publish endpoints. See http://api.zeromq.org/2-1:zmq-connect for details. + publish = [ # The publish endpoints. See http://api.zeromq.org/2-1:zmq-connect for details. + "tcp://localhost:1235", + "tcp://localhost:12444" + ], } }; From df028c3b2801cf18ae857d5703d6abe1b401280f Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 11:13:41 +0200 Subject: [PATCH 15/22] zeromq: some code cleanup --- include/villas/nodes/zeromq.h | 1 + lib/nodes/zeromq.c | 71 +++++++++++++++++++++-------------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index ba6fb3a94..9f80a3cad 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -54,6 +54,7 @@ struct zeromq { struct { void *socket; /**< ZeroMQ socket. */ + void *mon_socket; char *endpoint; } subscriber; diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 4083d8d57..47a3acf93 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -43,25 +43,25 @@ static int get_monitor_event(void *monitor, int *value, char **address) if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; /* Interruped, presumably. */ - assert (zmq_msg_more (&msg)); + assert(zmq_msg_more (&msg)); - uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + uint8_t *data = (uint8_t *) zmq_msg_data(&msg); uint16_t event = *(uint16_t *) (data); if (value) *value = *(uint32_t *) (data + 2); /* Second frame in message contains event address */ - zmq_msg_init (&msg); - if (zmq_msg_recv (&msg, monitor, 0) == -1) + zmq_msg_init(&msg); + if (zmq_msg_recv(&msg, monitor, 0) == -1) return -1; /* Interruped, presumably. */ - assert (!zmq_msg_more (&msg)); + assert(!zmq_msg_more(&msg)); if (address) { - uint8_t *data = (uint8_t *) zmq_msg_data (&msg); - size_t size = zmq_msg_size (&msg); - *address = (char *) malloc (size + 1); - memcpy (*address, data, size); + uint8_t *data = (uint8_t *) zmq_msg_data(&msg); + size_t size = zmq_msg_size(&msg); + *address = (char *) malloc(size + 1); + memcpy(*address, data, size); *address [size] = 0; } @@ -233,17 +233,22 @@ int zeromq_start(struct node *n) break; } + if (!z->subscriber.socket || !z->publisher.socket) { + ret = -1; + goto fail; + } + ret = zmq_setsockopt(z->publisher.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) - return ret; + goto fail; ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) return ret; /* Subscribe to pubsub messages. */ - if (z->filter && z->pattern == ZEROMQ_PATTERN_PUBSUB) { - ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, strlen(z->filter)); + if (z->pattern == ZEROMQ_PATTERN_PUBSUB) { + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0); if (ret < 0) return ret; } @@ -252,50 +257,55 @@ int zeromq_start(struct node *n) /* Publisher has server role */ ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SECRETKEY, z->curve.server.secret_key, 41); if (ret) - return ret; + goto fail; ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_PUBLICKEY, z->curve.server.public_key, 41); if (ret) - return ret; + goto fail; int curve_server = 1; ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SERVER, &curve_server, sizeof(curve_server)); if (ret) - return ret; + goto fail; } if (z->curve.enabled) { /* Create temporary client keys first */ ret = zmq_curve_keypair(z->curve.client.public_key, z->curve.client.secret_key); if (ret) - return ret; + goto fail; /* Subscriber has client role */ ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SECRETKEY, z->curve.client.secret_key, 41); if (ret) - return ret; + goto fail; ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_PUBLICKEY, z->curve.client.public_key, 41); if (ret) - return ret; + goto fail; ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SERVERKEY, z->curve.server.public_key, 41); if (ret) - return ret; + goto fail; } #ifdef ZMQ_BUILD_DRAFT_API /* Monitor handshake events on the server */ ret = zmq_socket_monitor(z->subscriber.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED); - assert(ret == 0); + if (ret < 0) + goto fail; /* Create socket for collecting monitor events */ - void *server_mon = zmq_socket(context, ZMQ_PAIR); - assert(server_mon); + z->subscriber.mon_socket = zmq_socket(context, ZMQ_PAIR); + if (!z->subscriber.mon_socket) { + ret = -1; + goto fail; + } /* Connect it to the inproc endpoints so they'll get events */ - ret = zmq_connect(server_mon, "inproc://monitor-server"); - assert(ret == 0); + ret = zmq_connect(z->subscriber.mon_socket, "inproc://monitor-server"); + if (ret < 0) + goto fail; #endif /* Bind subscriber socket */ @@ -312,18 +322,21 @@ int zeromq_start(struct node *n) char *ep = list_at(&z->publisher.endpoints, i); ret = zmq_connect(z->publisher.socket, ep); - if (ret) { - info("Failed to connect to ZeroMQ endpoint: endpoint=%s, error=%s", ep, zmq_strerror(errno)); - return ret; - } + if (ret) + goto fail; } #ifdef ZMQ_BUILD_DRAFT_API - ret = get_monitor_event(server_mon, NULL, NULL); + ret = get_monitor_event(z->subscriber.mon_socket, NULL, NULL); return ret == ZMQ_EVENT_HANDSHAKE_SUCCEED; #else return 0; #endif + +fail: + info("Failed to start ZeroMQ node: %s, error=%s", node_name(n), zmq_strerror(errno)); + + return ret; } int zeromq_stop(struct node *n) From 48b3898f231a5080cb025becb58740787c9bd81b Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 11:15:00 +0200 Subject: [PATCH 16/22] zeromq: fix group subscription --- lib/nodes/zeromq.c | 51 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 47a3acf93..96cb9e340 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -238,20 +238,27 @@ int zeromq_start(struct node *n) goto fail; } + /* Join group */ + switch (z->pattern) { + case ZEROMQ_PATTERN_RADIODISH: + ret = zmq_join(z->publisher.socket, z->filter); + break; + + case ZEROMQ_PATTERN_PUBSUB: + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0); + break; + } + + if (ret < 0) + goto fail; + ret = zmq_setsockopt(z->publisher.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) goto fail; ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) - return ret; - - /* Subscribe to pubsub messages. */ - if (z->pattern == ZEROMQ_PATTERN_PUBSUB) { - ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0); - if (ret < 0) - return ret; - } + goto fail; if (z->curve.enabled) { /* Publisher has server role */ @@ -362,6 +369,18 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) if (ret < 0) return ret; + if (z->filter) { + switch (z->pattern) { + case ZEROMQ_PATTERN_PUBSUB: + /* Discard envelope */ + zmq_recv(z->subscriber.socket, NULL, 0, 0); + break; + + default: { } + } + } + + /* Receive payload */ ret = zmq_msg_recv(&m, z->subscriber.socket, 0); if (ret < 0) return ret; @@ -391,10 +410,18 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) ret = zmq_msg_init_size(&m, sent); - if (z->filter && z->pattern == ZEROMQ_PATTERN_RADIODISH) { - ret = zmq_msg_set_group(&m, z->filter); - if (ret < 0) - goto fail; + if (z->filter) { + switch (z->pattern) { + case ZEROMQ_PATTERN_RADIODISH: + ret = zmq_msg_set_group(&m, z->filter); + if (ret < 0) + goto fail; + break; + + case ZEROMQ_PATTERN_PUBSUB: /* Send envelope */ + zmq_send(z->publisher.socket, z->filter, strlen(z->filter), ZMQ_SNDMORE); + break; + } } memcpy(zmq_msg_data(&m), data, sent); From eef7764b8ad7eecaa18825bb4e5576e7ec8f622d Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 11:15:58 +0200 Subject: [PATCH 17/22] zeromq: only check for handshake if we are in a CurveZMQ session --- lib/nodes/zeromq.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 96cb9e340..6a8601103 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -334,8 +334,12 @@ int zeromq_start(struct node *n) } #ifdef ZMQ_BUILD_DRAFT_API - ret = get_monitor_event(z->subscriber.mon_socket, NULL, NULL); - return ret == ZMQ_EVENT_HANDSHAKE_SUCCEED; + if (z->curve.enabled) { + ret = get_monitor_event(z->subscriber.mon_socket, NULL, NULL); + return ret == ZMQ_EVENT_HANDSHAKE_SUCCEED; + } + else + return 0; /* The handshake events are only emitted for CurveZMQ sessions. */ #else return 0; #endif From 89e4f3588ffeb9117d48a9e5f4d9b1a8050849f3 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 11:16:18 +0200 Subject: [PATCH 18/22] zeromq: fix shutdown close pending socket --- lib/nodes/zeromq.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 6a8601103..d1bb3caff 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -358,6 +358,12 @@ int zeromq_stop(struct node *n) ret = zmq_close(z->subscriber.socket); if (ret) return ret; + +#ifdef ZMQ_BUILD_DRAFT_API + ret = zmq_close(z->subscriber.mon_socket); + if (ret) + return ret; +#endif return zmq_close(z->publisher.socket); } From 78224a29d32e2991309d98846bf0e6a58f60fae5 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 11:16:46 +0200 Subject: [PATCH 19/22] prepare more loopback tests --- tests/integration/{pipe-loopback.sh => pipe-loopback-socket.sh} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/integration/{pipe-loopback.sh => pipe-loopback-socket.sh} (100%) diff --git a/tests/integration/pipe-loopback.sh b/tests/integration/pipe-loopback-socket.sh similarity index 100% rename from tests/integration/pipe-loopback.sh rename to tests/integration/pipe-loopback-socket.sh From d6970abd8287653f2fa420f6389341b2943d214d Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 15:03:39 +0200 Subject: [PATCH 20/22] zeromq: fix compiler warning --- lib/nodes/zeromq.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index d1bb3caff..aa11ae1c9 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -247,6 +247,9 @@ int zeromq_start(struct node *n) case ZEROMQ_PATTERN_PUBSUB: ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0); break; + + default: + ret = -1; } if (ret < 0) From 09b37a1bfc11159767f3764b7da9a818d29409d3 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 15:15:13 +0200 Subject: [PATCH 21/22] zeromq: fix radio dish support by swapping server / client roles --- lib/nodes/zeromq.c | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index aa11ae1c9..bae2eab82 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -241,7 +241,7 @@ int zeromq_start(struct node *n) /* Join group */ switch (z->pattern) { case ZEROMQ_PATTERN_RADIODISH: - ret = zmq_join(z->publisher.socket, z->filter); + ret = zmq_join(z->subscriber.socket, z->filter); break; case ZEROMQ_PATTERN_PUBSUB: @@ -317,25 +317,25 @@ int zeromq_start(struct node *n) if (ret < 0) goto fail; #endif + + /* Spawn server for publisher */ + for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { + char *ep = list_at(&z->publisher.endpoints, i); + + ret = zmq_bind(z->publisher.socket, ep); + if (ret) + goto fail; + } - /* Bind subscriber socket */ + /* Connect subscribers to server socket */ if (z->subscriber.endpoint) { - ret = zmq_bind(z->subscriber.socket, z->subscriber.endpoint); + ret = zmq_connect(z->subscriber.socket, z->subscriber.endpoint); if (ret) { info("Failed to bind ZeroMQ socket: endpoint=%s, error=%s", z->subscriber.endpoint, zmq_strerror(errno)); return ret; } } - /* Connect publisher socket */ - for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { - char *ep = list_at(&z->publisher.endpoints, i); - - ret = zmq_connect(z->publisher.socket, ep); - if (ret) - goto fail; - } - #ifdef ZMQ_BUILD_DRAFT_API if (z->curve.enabled) { ret = get_monitor_event(z->subscriber.mon_socket, NULL, NULL); From 08a5aa23200eaf1d56fe8b58e0797ed691d2102b Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Tue, 23 May 2017 15:33:59 +0200 Subject: [PATCH 22/22] socket: fix compiler error because of undefined macro --- lib/nodes/socket.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 0e9310516..503291b47 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -330,7 +330,7 @@ static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cn int ret; struct socket *s = n->_vd; - char data[MAX_PACKETLEN]; + char data[MSG_MAX_PACKET_LEN]; ssize_t bytes; /* Receive message from socket */