mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Merge branch 'feature-zeromq' into 'develop'
Add new ZeroMQ node-type See merge request !21
This commit is contained in:
commit
56f4067bd8
17 changed files with 686 additions and 19 deletions
|
@ -3,4 +3,5 @@
|
|||
!thirdparty/libxil/
|
||||
!thirdparty/criterion/
|
||||
!thirdparty/libwebsockets/
|
||||
!thirdparty/nanomsg/
|
||||
!thirdparty/nanomsg/
|
||||
!thirdparty/libzmq/
|
||||
|
|
3
.gitmodules
vendored
3
.gitmodules
vendored
|
@ -28,3 +28,6 @@
|
|||
[submodule "thirdparty/nanomsg"]
|
||||
path = thirdparty/nanomsg
|
||||
url = https://github.com/nanomsg/nanomsg.git
|
||||
[submodule "thirdparty/libzmq"]
|
||||
path = thirdparty/libzmq
|
||||
url = https://github.com/zeromq/libzmq.git
|
||||
|
|
|
@ -45,7 +45,9 @@ RUN dnf -y install \
|
|||
libconfig-devel \
|
||||
libnl3-devel \
|
||||
libcurl-devel \
|
||||
jansson-devel
|
||||
jansson-devel \
|
||||
libsodium-devel \
|
||||
openpgm-devel
|
||||
|
||||
# Several tools only needed for developement and testing
|
||||
RUN dnf -y install \
|
||||
|
@ -85,6 +87,10 @@ RUN mkdir -p /tmp/libwebsockets/build && cd /tmp/libwebsockets/build && cmake -D
|
|||
COPY thirdparty/nanomsg /tmp/nanomsg
|
||||
RUN mkdir -p /tmp/nanomsg/build && cd /tmp/nanomsg/build && cmake .. && make install
|
||||
|
||||
# Build & Install libzmq
|
||||
COPY thirdparty/libzmq /tmp/libzmq
|
||||
RUN cd /tmp/libzmq && autoreconf -fi && ./configure --with-libsodium --with-pgm --enable-drafts && make install
|
||||
|
||||
# Cleanup intermediate files from builds
|
||||
RUN rm -rf /tmp/*
|
||||
|
||||
|
|
|
@ -162,6 +162,24 @@ nodes = {
|
|||
"ipc:///tmp/test.ipc",
|
||||
"inproc://test"
|
||||
]
|
||||
},
|
||||
zeromq_node = {
|
||||
type = "zeromq",
|
||||
|
||||
pattern = "pubsub", # The ZeroMQ pattern. One of: 'pubsub', 'radiodish'
|
||||
ipv6 = false, # Enable IPv6 support
|
||||
filter = "ab184", # A filter which is prefix matched
|
||||
curve = { # Z85 encoded Curve25519 keys
|
||||
enabled = true,
|
||||
public_key = "Veg+Q.V-c&1k>yVh663gQ^7fL($y47gybE-nZP1L",
|
||||
secret_key = "HPY.+mFuB[jGs@(zZr6$IZ1H1dZ7Ji*j>oi@O?Pc"
|
||||
}
|
||||
|
||||
subscribe = "tcp://*:1234" # The subscribe endpoint. See http://api.zeromq.org/2-1:zmq-bind for details.
|
||||
publish = [ # The publish endpoints. See http://api.zeromq.org/2-1:zmq-connect for details.
|
||||
"tcp://localhost:1235",
|
||||
"tcp://localhost:12444"
|
||||
],
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -157,4 +157,7 @@ int node_type_start(struct node_type *vt, struct super_node *sn);
|
|||
*/
|
||||
int node_type_stop(struct node_type *vt);
|
||||
|
||||
/** Return a printable representation of the node-type. */
|
||||
char * node_type_name(struct node_type *vt);
|
||||
|
||||
/** @} */
|
91
include/villas/nodes/zeromq.h
Normal file
91
include/villas/nodes/zeromq.h
Normal file
|
@ -0,0 +1,91 @@
|
|||
/** Node type: ZeroMQ
|
||||
*
|
||||
* @file
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
|
||||
* @license GNU General Public License (version 3)
|
||||
*
|
||||
* VILLASnode
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*********************************************************************************/
|
||||
|
||||
/**
|
||||
* @addtogroup zeromq ZeroMQ node type
|
||||
* @ingroup node
|
||||
* @{
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include "node.h"
|
||||
#include "list.h"
|
||||
|
||||
struct zeromq {
|
||||
int ipv6;
|
||||
|
||||
char *filter;
|
||||
|
||||
struct {
|
||||
int enabled;
|
||||
struct {
|
||||
char public_key[41];
|
||||
char secret_key[41];
|
||||
} server, client;
|
||||
} curve;
|
||||
|
||||
enum {
|
||||
ZEROMQ_PATTERN_PUBSUB,
|
||||
ZEROMQ_PATTERN_RADIODISH
|
||||
} pattern;
|
||||
|
||||
struct {
|
||||
void *socket; /**< ZeroMQ socket. */
|
||||
void *mon_socket;
|
||||
char *endpoint;
|
||||
} subscriber;
|
||||
|
||||
struct {
|
||||
void *socket; /**< ZeroMQ socket. */
|
||||
struct list endpoints;
|
||||
} publisher;
|
||||
};
|
||||
|
||||
/** @see node_type::print */
|
||||
char * zeromq_print(struct node *n);
|
||||
|
||||
/** @see node_type::parse */
|
||||
int zeromq_parse(struct node *n, config_setting_t *cfg);
|
||||
|
||||
/** @see node_type::init */
|
||||
int zeromq_init();
|
||||
|
||||
/** @see node_type::deinit */
|
||||
int zeromq_deinit();
|
||||
|
||||
/** @see node_type::open */
|
||||
int zeromq_start(struct node *n);
|
||||
|
||||
/** @see node_type::close */
|
||||
int zeromq_stop(struct node *n);
|
||||
|
||||
/** @see node_type::read */
|
||||
int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt);
|
||||
|
||||
/** @see node_type::write */
|
||||
int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt);
|
||||
|
||||
/** @} */
|
|
@ -59,6 +59,14 @@ ifeq ($(shell $(PKGCONFIG) nanomsg; echo $$?),0)
|
|||
endif
|
||||
endif
|
||||
|
||||
# Enable ZeroMQ node type when libzmq is available
|
||||
ifndef WITHOUT_ZMQ
|
||||
ifeq ($(shell $(PKGCONFIG) libzmq; echo $$?),0)
|
||||
LIB_SRCS += $(addprefix lib/nodes/, zeromq.c)
|
||||
LIB_PKGS += libzmq
|
||||
endif
|
||||
endif
|
||||
|
||||
# Enable VILLASfpga support when libxil is available
|
||||
ifndef WITHOUT_FPGA
|
||||
ifeq ($(shell $(PKGCONFIG) libxil; echo $$?),0)
|
||||
|
|
|
@ -36,7 +36,7 @@ int node_type_start(struct node_type *vt, struct super_node *sn)
|
|||
if (vt->state != STATE_DESTROYED)
|
||||
return 0;
|
||||
|
||||
info("Initializing " YEL("%s") " node type which is used by %zu nodes", plugin_name(vt), list_length(&vt->instances));
|
||||
info("Initializing " YEL("%s") " node type which is used by %zu nodes", node_type_name(vt), list_length(&vt->instances));
|
||||
{ INDENT
|
||||
ret = vt->init ? vt->init(sn) : 0;
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ int node_type_stop(struct node_type *vt)
|
|||
if (vt->state != STATE_STARTED)
|
||||
return 0;
|
||||
|
||||
info("De-initializing " YEL("%s") " node type", plugin_name(vt));
|
||||
info("De-initializing " YEL("%s") " node type", node_type_name(vt));
|
||||
{ INDENT
|
||||
ret = vt->deinit ? vt->deinit() : 0;
|
||||
}
|
||||
|
@ -64,3 +64,8 @@ int node_type_stop(struct node_type *vt)
|
|||
|
||||
return ret;
|
||||
}
|
||||
|
||||
char * node_type_name(struct node_type *vt)
|
||||
{
|
||||
return plugin_name(vt);
|
||||
}
|
|
@ -327,23 +327,14 @@ static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
|
||||
static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int ret;
|
||||
struct socket *s = n->_vd;
|
||||
|
||||
int ret;
|
||||
char data[MSG_MAX_PACKET_LEN];
|
||||
ssize_t bytes;
|
||||
|
||||
/* Peak into message header of the first sample and to get total packet size. */
|
||||
bytes = recv(s->sd, NULL, 0, MSG_PEEK | MSG_TRUNC);
|
||||
if (bytes < MSG_LEN(1) || bytes % 4 != 0) {
|
||||
warn("Received invalid packet for node %s", node_name(n));
|
||||
recv(s->sd, NULL, 0, 0); /* empty receive buffer */
|
||||
return -1;
|
||||
}
|
||||
|
||||
char data[bytes];
|
||||
|
||||
/* Receive message from socket */
|
||||
bytes = recv(s->sd, data, bytes, 0);
|
||||
bytes = recv(s->sd, data, sizeof(data), 0);
|
||||
if (bytes == 0)
|
||||
error("Remote node %s closed the connection", node_name(n));
|
||||
else if (bytes < 0)
|
||||
|
|
478
lib/nodes/zeromq.c
Normal file
478
lib/nodes/zeromq.c
Normal file
|
@ -0,0 +1,478 @@
|
|||
/** Node type: ZeroMQ
|
||||
*
|
||||
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
|
||||
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
|
||||
* @license GNU General Public License (version 3)
|
||||
*
|
||||
* VILLASnode
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*********************************************************************************/
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
#include "nodes/zeromq.h"
|
||||
#include "utils.h"
|
||||
#include "queue.h"
|
||||
#include "plugin.h"
|
||||
#include "msg.h"
|
||||
|
||||
static void *context;
|
||||
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
/** Read one event off the monitor socket; return value and address
|
||||
* by reference, if not null, and event number by value.
|
||||
*
|
||||
* @returnval -1 In case of error. */
|
||||
static int get_monitor_event(void *monitor, int *value, char **address)
|
||||
{
|
||||
/* First frame in message contains event number and value */
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
if (zmq_msg_recv (&msg, monitor, 0) == -1)
|
||||
return -1; /* Interruped, presumably. */
|
||||
|
||||
assert(zmq_msg_more (&msg));
|
||||
|
||||
uint8_t *data = (uint8_t *) zmq_msg_data(&msg);
|
||||
uint16_t event = *(uint16_t *) (data);
|
||||
if (value)
|
||||
*value = *(uint32_t *) (data + 2);
|
||||
|
||||
/* Second frame in message contains event address */
|
||||
zmq_msg_init(&msg);
|
||||
if (zmq_msg_recv(&msg, monitor, 0) == -1)
|
||||
return -1; /* Interruped, presumably. */
|
||||
|
||||
assert(!zmq_msg_more(&msg));
|
||||
|
||||
if (address) {
|
||||
uint8_t *data = (uint8_t *) zmq_msg_data(&msg);
|
||||
size_t size = zmq_msg_size(&msg);
|
||||
*address = (char *) malloc(size + 1);
|
||||
memcpy(*address, data, size);
|
||||
*address [size] = 0;
|
||||
}
|
||||
|
||||
return event;
|
||||
}
|
||||
#endif
|
||||
|
||||
int zeromq_reverse(struct node *n)
|
||||
{
|
||||
struct zeromq *z = n->_vd;
|
||||
|
||||
if (list_length(&z->publisher.endpoints) != 1)
|
||||
return -1;
|
||||
|
||||
char *subscriber = z->subscriber.endpoint;
|
||||
char *publisher = list_first(&z->publisher.endpoints);
|
||||
|
||||
z->subscriber.endpoint = publisher;
|
||||
list_set(&z->publisher.endpoints, 0, subscriber);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zeromq_parse(struct node *n, config_setting_t *cfg)
|
||||
{
|
||||
struct zeromq *z = n->_vd;
|
||||
|
||||
const char *ep, *type, *filter;
|
||||
|
||||
config_setting_t *cfg_pub, *cfg_curve;
|
||||
|
||||
list_init(&z->publisher.endpoints);
|
||||
|
||||
if (config_setting_lookup_string(cfg, "subscribe", &ep))
|
||||
z->subscriber.endpoint = strdup(ep);
|
||||
else
|
||||
z->subscriber.endpoint = NULL;
|
||||
|
||||
cfg_pub = config_setting_lookup(cfg, "publish");
|
||||
if (cfg_pub) {
|
||||
switch (config_setting_type(cfg_pub)) {
|
||||
case CONFIG_TYPE_LIST:
|
||||
case CONFIG_TYPE_ARRAY:
|
||||
for (int j = 0; j < config_setting_length(cfg_pub); j++) {
|
||||
const char *ep = config_setting_get_string_elem(cfg_pub, j);
|
||||
|
||||
list_push(&z->publisher.endpoints, strdup(ep));
|
||||
}
|
||||
break;
|
||||
|
||||
case CONFIG_TYPE_STRING:
|
||||
ep = config_setting_get_string(cfg_pub);
|
||||
|
||||
list_push(&z->publisher.endpoints, strdup(ep));
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
cerror(cfg_pub, "Invalid type for ZeroMQ publisher setting");
|
||||
}
|
||||
}
|
||||
|
||||
cfg_curve = config_setting_lookup(cfg, "curve");
|
||||
if (cfg_curve) {
|
||||
if (!config_setting_is_group(cfg_curve))
|
||||
cerror(cfg_curve, "The curve setting must be a group");
|
||||
|
||||
const char *public_key, *secret_key;
|
||||
|
||||
if (!config_setting_lookup_string(cfg_curve, "public_key", &public_key))
|
||||
cerror(cfg_curve, "Setting 'curve.public_key' is missing");
|
||||
|
||||
if (!config_setting_lookup_string(cfg_curve, "secret_key", &secret_key))
|
||||
cerror(cfg_curve, "Setting 'curve.secret_key' is missing");
|
||||
|
||||
if (!config_setting_lookup_bool(cfg_curve, "enabled", &z->curve.enabled))
|
||||
z->curve.enabled = true;
|
||||
|
||||
if (strlen(secret_key) != 40)
|
||||
cerror(cfg_curve, "Setting 'curve.secret_key' must be a Z85 encoded CurveZMQ key");
|
||||
|
||||
if (strlen(public_key) != 40)
|
||||
cerror(cfg_curve, "Setting 'curve.public_key' must be a Z85 encoded CurveZMQ key");
|
||||
|
||||
strncpy(z->curve.server.public_key, public_key, 41);
|
||||
strncpy(z->curve.server.secret_key, secret_key, 41);
|
||||
}
|
||||
else
|
||||
z->curve.enabled = false;
|
||||
|
||||
/** @todo We should fix this. Its mostly done. */
|
||||
if (z->curve.enabled)
|
||||
cerror(cfg_curve, "CurveZMQ support is currently broken");
|
||||
|
||||
if (config_setting_lookup_string(cfg, "filter", &filter))
|
||||
z->filter = strdup(filter);
|
||||
else
|
||||
z->filter = NULL;
|
||||
|
||||
if (config_setting_lookup_string(cfg, "pattern", &type)) {
|
||||
if (!strcmp(type, "pubsub"))
|
||||
z->pattern = ZEROMQ_PATTERN_PUBSUB;
|
||||
else if (!strcmp(type, "radiodish"))
|
||||
z->pattern = ZEROMQ_PATTERN_RADIODISH;
|
||||
else
|
||||
cerror(cfg, "Invalid type for ZeroMQ node: %s", node_name_short(n));
|
||||
}
|
||||
|
||||
if (!config_setting_lookup_bool(cfg, "ipv6", &z->ipv6))
|
||||
z->ipv6 = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
char * zeromq_print(struct node *n)
|
||||
{
|
||||
struct zeromq *z = n->_vd;
|
||||
|
||||
char *buf = NULL;
|
||||
char *pattern = NULL;
|
||||
|
||||
switch (z->pattern) {
|
||||
case ZEROMQ_PATTERN_PUBSUB: pattern = "pubsub"; break;
|
||||
case ZEROMQ_PATTERN_RADIODISH: pattern = "radiodish"; break;
|
||||
}
|
||||
|
||||
strcatf(&buf, "pattern=%s, ipv6=%s, crypto=%s, subscribe=%s, publish=[ ", pattern, z->ipv6 ? "yes" : "no", z->curve.enabled ? "yes" : "no", z->subscriber.endpoint);
|
||||
|
||||
for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) {
|
||||
char *ep = list_at(&z->publisher.endpoints, i);
|
||||
|
||||
strcatf(&buf, "%s ", ep);
|
||||
}
|
||||
|
||||
strcatf(&buf, " ]");
|
||||
|
||||
if (z->filter)
|
||||
strcatf(&buf, ", filter=%s", z->filter);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
int zeromq_init(struct super_node *sn)
|
||||
{
|
||||
context = zmq_ctx_new();
|
||||
|
||||
return context == NULL;
|
||||
}
|
||||
|
||||
int zeromq_deinit()
|
||||
{
|
||||
return zmq_ctx_term(context);
|
||||
}
|
||||
|
||||
int zeromq_start(struct node *n)
|
||||
{
|
||||
int ret;
|
||||
struct zeromq *z = n->_vd;
|
||||
|
||||
switch (z->pattern) {
|
||||
case ZEROMQ_PATTERN_RADIODISH:
|
||||
z->subscriber.socket = zmq_socket(context, ZMQ_DISH);
|
||||
z->publisher.socket = zmq_socket(context, ZMQ_RADIO);
|
||||
break;
|
||||
|
||||
case ZEROMQ_PATTERN_PUBSUB:
|
||||
z->subscriber.socket = zmq_socket(context, ZMQ_SUB);
|
||||
z->publisher.socket = zmq_socket(context, ZMQ_PUB);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!z->subscriber.socket || !z->publisher.socket) {
|
||||
ret = -1;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/* Join group */
|
||||
switch (z->pattern) {
|
||||
case ZEROMQ_PATTERN_RADIODISH:
|
||||
ret = zmq_join(z->subscriber.socket, z->filter);
|
||||
break;
|
||||
|
||||
case ZEROMQ_PATTERN_PUBSUB:
|
||||
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0);
|
||||
break;
|
||||
|
||||
default:
|
||||
ret = -1;
|
||||
}
|
||||
|
||||
if (ret < 0)
|
||||
goto fail;
|
||||
|
||||
ret = zmq_setsockopt(z->publisher.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6));
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6));
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
if (z->curve.enabled) {
|
||||
/* Publisher has server role */
|
||||
ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SECRETKEY, z->curve.server.secret_key, 41);
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_PUBLICKEY, z->curve.server.public_key, 41);
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
int curve_server = 1;
|
||||
ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SERVER, &curve_server, sizeof(curve_server));
|
||||
if (ret)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (z->curve.enabled) {
|
||||
/* Create temporary client keys first */
|
||||
ret = zmq_curve_keypair(z->curve.client.public_key, z->curve.client.secret_key);
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
/* Subscriber has client role */
|
||||
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SECRETKEY, z->curve.client.secret_key, 41);
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_PUBLICKEY, z->curve.client.public_key, 41);
|
||||
if (ret)
|
||||
goto fail;
|
||||
|
||||
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SERVERKEY, z->curve.server.public_key, 41);
|
||||
if (ret)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
/* Monitor handshake events on the server */
|
||||
ret = zmq_socket_monitor(z->subscriber.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED);
|
||||
if (ret < 0)
|
||||
goto fail;
|
||||
|
||||
/* Create socket for collecting monitor events */
|
||||
z->subscriber.mon_socket = zmq_socket(context, ZMQ_PAIR);
|
||||
if (!z->subscriber.mon_socket) {
|
||||
ret = -1;
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/* Connect it to the inproc endpoints so they'll get events */
|
||||
ret = zmq_connect(z->subscriber.mon_socket, "inproc://monitor-server");
|
||||
if (ret < 0)
|
||||
goto fail;
|
||||
#endif
|
||||
|
||||
/* Spawn server for publisher */
|
||||
for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) {
|
||||
char *ep = list_at(&z->publisher.endpoints, i);
|
||||
|
||||
ret = zmq_bind(z->publisher.socket, ep);
|
||||
if (ret)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
/* Connect subscribers to server socket */
|
||||
if (z->subscriber.endpoint) {
|
||||
ret = zmq_connect(z->subscriber.socket, z->subscriber.endpoint);
|
||||
if (ret) {
|
||||
info("Failed to bind ZeroMQ socket: endpoint=%s, error=%s", z->subscriber.endpoint, zmq_strerror(errno));
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
if (z->curve.enabled) {
|
||||
ret = get_monitor_event(z->subscriber.mon_socket, NULL, NULL);
|
||||
return ret == ZMQ_EVENT_HANDSHAKE_SUCCEED;
|
||||
}
|
||||
else
|
||||
return 0; /* The handshake events are only emitted for CurveZMQ sessions. */
|
||||
#else
|
||||
return 0;
|
||||
#endif
|
||||
|
||||
fail:
|
||||
info("Failed to start ZeroMQ node: %s, error=%s", node_name(n), zmq_strerror(errno));
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int zeromq_stop(struct node *n)
|
||||
{
|
||||
int ret;
|
||||
struct zeromq *z = n->_vd;
|
||||
|
||||
ret = zmq_close(z->subscriber.socket);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
ret = zmq_close(z->subscriber.mon_socket);
|
||||
if (ret)
|
||||
return ret;
|
||||
#endif
|
||||
|
||||
return zmq_close(z->publisher.socket);
|
||||
}
|
||||
|
||||
int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int recv, ret;
|
||||
struct zeromq *z = n->_vd;
|
||||
|
||||
zmq_msg_t m;
|
||||
|
||||
ret = zmq_msg_init(&m);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
if (z->filter) {
|
||||
switch (z->pattern) {
|
||||
case ZEROMQ_PATTERN_PUBSUB:
|
||||
/* Discard envelope */
|
||||
zmq_recv(z->subscriber.socket, NULL, 0, 0);
|
||||
break;
|
||||
|
||||
default: { }
|
||||
}
|
||||
}
|
||||
|
||||
/* Receive payload */
|
||||
ret = zmq_msg_recv(&m, z->subscriber.socket, 0);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
recv = msg_buffer_to_samples(smps, cnt, zmq_msg_data(&m), zmq_msg_size(&m));
|
||||
|
||||
ret = zmq_msg_close(&m);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return recv;
|
||||
}
|
||||
|
||||
int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int ret;
|
||||
struct zeromq *z = n->_vd;
|
||||
|
||||
ssize_t sent;
|
||||
zmq_msg_t m;
|
||||
|
||||
char data[1500];
|
||||
|
||||
sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data));
|
||||
if (sent < 0)
|
||||
return -1;
|
||||
|
||||
ret = zmq_msg_init_size(&m, sent);
|
||||
|
||||
if (z->filter) {
|
||||
switch (z->pattern) {
|
||||
case ZEROMQ_PATTERN_RADIODISH:
|
||||
ret = zmq_msg_set_group(&m, z->filter);
|
||||
if (ret < 0)
|
||||
goto fail;
|
||||
break;
|
||||
|
||||
case ZEROMQ_PATTERN_PUBSUB: /* Send envelope */
|
||||
zmq_send(z->publisher.socket, z->filter, strlen(z->filter), ZMQ_SNDMORE);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
memcpy(zmq_msg_data(&m), data, sent);
|
||||
|
||||
ret = zmq_msg_send(&m, z->publisher.socket, 0);
|
||||
if (ret < 0)
|
||||
goto fail;
|
||||
|
||||
ret = zmq_msg_close(&m);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
|
||||
return cnt;
|
||||
|
||||
fail:
|
||||
zmq_msg_close(&m);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
.name = "zeromq",
|
||||
.description = "ZeroMQ Distributed Messaging",
|
||||
.type = PLUGIN_TYPE_NODE,
|
||||
.node = {
|
||||
.vectorize = 0,
|
||||
.size = sizeof(struct zeromq),
|
||||
.reverse = zeromq_reverse,
|
||||
.parse = zeromq_parse,
|
||||
.print = zeromq_print,
|
||||
.start = zeromq_start,
|
||||
.stop = zeromq_stop,
|
||||
.init = zeromq_init,
|
||||
.deinit = zeromq_deinit,
|
||||
.read = zeromq_read,
|
||||
.write = zeromq_write,
|
||||
.instances = LIST_INIT()
|
||||
}
|
||||
};
|
||||
|
||||
REGISTER_PLUGIN(&p)
|
|
@ -57,6 +57,7 @@ rm -rf %{?buildroot}
|
|||
/usr/bin/villas
|
||||
/usr/bin/villas-*
|
||||
/usr/bin/conf2json
|
||||
/usr/bin/zmq-keygen
|
||||
|
||||
/usr/lib/libvillas.so
|
||||
/usr/lib/libvillas.so.*
|
||||
|
|
|
@ -253,7 +253,7 @@ int main(int argc, char *argv[])
|
|||
|
||||
ret = node_type_start(node->_vt, &sn);
|
||||
if (ret)
|
||||
error("Failed to intialize node type: %s", node_name(node));
|
||||
error("Failed to intialize node type: %s", node_type_name(node->_vt));
|
||||
|
||||
ret = node_check(node);
|
||||
if (ret)
|
||||
|
|
7
thirdparty/Makefile.inc
vendored
7
thirdparty/Makefile.inc
vendored
|
@ -21,7 +21,7 @@
|
|||
###################################################################################
|
||||
|
||||
DEPS_CMAKE = libxil libwebsockets criterion jansson nanomsg
|
||||
DEPS_AUTOCONF = libnl libconfig libcurl
|
||||
DEPS_AUTOCONF = libnl libconfig libcurl libzmq
|
||||
|
||||
DEPS = $(DEPS_CMAKE) $(DEPS_AUTOCONF)
|
||||
|
||||
|
@ -33,13 +33,15 @@ ifdef DEBUG
|
|||
AC_CXXFLAGS=-g -O0
|
||||
endif
|
||||
|
||||
CONFIGURE_OPTS = --prefix=$(PREFIX)
|
||||
|
||||
thirdparty:
|
||||
|
||||
# Install & compile autotools based projects
|
||||
$(DEPS_AUTOCONF): CPPFLAGS=$(AC_CPPFLAGS) CFLAGS=$(AC_CFLAGS) CXXFLAGS=$(AC_CXXFLAGS)
|
||||
$(DEPS_AUTOCONF): | $(BUILDDIR)/thirdparty/$$@/
|
||||
autoreconf -fi $(SRCDIR)/thirdparty/$@
|
||||
cd $(BUILDDIR)/thirdparty/$@ && $(SRCDIR)/thirdparty/$@/configure --prefix=$(PREFIX) && make
|
||||
cd $(BUILDDIR)/thirdparty/$@ && $(SRCDIR)/thirdparty/$@/configure $(CONFIGURE_OPTS) && make
|
||||
|
||||
# Install & compile CMake based projects
|
||||
$(DEPS_CMAKE): | $(BUILDDIR)/thirdparty/$$@/
|
||||
|
@ -68,5 +70,6 @@ libconfig-fix:
|
|||
rm -f $(SRCDIR)/thirdparty/libconfig/lib/scanner.[hc]
|
||||
|
||||
libwebsockets: CMAKE_OPTS += -DLWS_IPV6=1 -DLWS_WITH_STATIC=0 -DLWS_WITHOUT_TESTAPPS=1 -DLWS_WITH_HTTP2=1
|
||||
libzmq: CONFIGURE_OPTS += --with-libsodium --with-pgm --enable-drafts
|
||||
|
||||
.PHONY: $(DEPS) thirdparty clean-thirdparty install-thirdparty
|
1
thirdparty/libzmq
vendored
Submodule
1
thirdparty/libzmq
vendored
Submodule
|
@ -0,0 +1 @@
|
|||
Subproject commit ec56eaaeb666960548375cc3746ac0b6927977e4
|
|
@ -26,6 +26,12 @@ TOOLS_CFLAGS = $(CFLAGS)
|
|||
TOOLS_LDLIBS = -lconfig -ljansson -lvillas
|
||||
TOOLS_LDFLAGS = $(LDFLAGS) -Wl,-rpath,'$$ORIGIN'
|
||||
|
||||
ifeq ($(shell $(PKGCONFIG) libzmq; echo $$?),0)
|
||||
TOOLS += $(BUILDDIR)/zmq-keygen
|
||||
TOOLS_CFLAGS += $(shell $(PKGCONFIG) --cflags libzmq)
|
||||
TOOLS_LDLIBS += $(shell $(PKGCONFIG) --libs libzmq)
|
||||
endif
|
||||
|
||||
# Compile executable objects
|
||||
$(BUILDDIR)/tools/%.o: tools/%.c $(BUILDDIR)/defines | $$(dir $$@)
|
||||
$(CC) $(TOOLS_CFLAGS) -c $< -o $@
|
||||
|
|
52
tools/zmq-keygen.c
Normal file
52
tools/zmq-keygen.c
Normal file
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <assert.h>
|
||||
#include <zmq.h>
|
||||
|
||||
int main (int argc, char *argv[])
|
||||
{
|
||||
char public_key [41];
|
||||
char secret_key [41];
|
||||
if (zmq_curve_keypair(public_key, secret_key)) {
|
||||
if (zmq_errno() == ENOTSUP)
|
||||
printf("To use %s, please install libsodium and then rebuild libzmq.", argv[0]);
|
||||
|
||||
exit (EXIT_FAILURE);
|
||||
}
|
||||
|
||||
printf("# Copy these lines to your 'zeromq' node-configuration\n");
|
||||
printf("curve = {\n");
|
||||
printf("\tpublic_key = \"%s\";\n", public_key);
|
||||
printf("\tsecret_key = \"%s\";\n", secret_key);
|
||||
printf("}\n");
|
||||
|
||||
exit (0);
|
||||
}
|
Loading…
Add table
Reference in a new issue