1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

zeromq: support libzmq < 4.2

This commit is contained in:
Steffen Vogel 2017-06-15 13:59:09 +02:00
parent 594b37b59b
commit f738dff1ac
2 changed files with 16 additions and 0 deletions

View file

@ -34,6 +34,10 @@
#include "node.h"
#include "list.h"
#if ZMQ_VERSION_MAJOR >= 4 && ZMQ_VERSION_MINOR >= 2
#define ZMQ_BUILD_DISH 1
#endif
struct zeromq {
int ipv6;
@ -49,7 +53,9 @@ struct zeromq {
enum {
ZEROMQ_PATTERN_PUBSUB,
#ifdef ZMQ_BUILD_DISH
ZEROMQ_PATTERN_RADIODISH
#endif
} pattern;
struct {

View file

@ -164,8 +164,10 @@ int zeromq_parse(struct node *n, config_setting_t *cfg)
if (config_setting_lookup_string(cfg, "pattern", &type)) {
if (!strcmp(type, "pubsub"))
z->pattern = ZEROMQ_PATTERN_PUBSUB;
#ifdef ZMQ_BUILD_DISH
else if (!strcmp(type, "radiodish"))
z->pattern = ZEROMQ_PATTERN_RADIODISH;
#endif
else
cerror(cfg, "Invalid type for ZeroMQ node: %s", node_name_short(n));
}
@ -185,7 +187,9 @@ char * zeromq_print(struct node *n)
switch (z->pattern) {
case ZEROMQ_PATTERN_PUBSUB: pattern = "pubsub"; break;
#ifdef ZMQ_BUILD_DISH
case ZEROMQ_PATTERN_RADIODISH: pattern = "radiodish"; break;
#endif
}
strcatf(&buf, "pattern=%s, ipv6=%s, crypto=%s, subscribe=%s, publish=[ ", pattern, z->ipv6 ? "yes" : "no", z->curve.enabled ? "yes" : "no", z->subscriber.endpoint);
@ -222,10 +226,12 @@ int zeromq_start(struct node *n)
struct zeromq *z = n->_vd;
switch (z->pattern) {
#ifdef ZMQ_BUILD_DISH
case ZEROMQ_PATTERN_RADIODISH:
z->subscriber.socket = zmq_socket(context, ZMQ_DISH);
z->publisher.socket = zmq_socket(context, ZMQ_RADIO);
break;
#endif
case ZEROMQ_PATTERN_PUBSUB:
z->subscriber.socket = zmq_socket(context, ZMQ_SUB);
@ -240,9 +246,11 @@ int zeromq_start(struct node *n)
/* Join group */
switch (z->pattern) {
#ifdef ZMQ_BUILD_DISH
case ZEROMQ_PATTERN_RADIODISH:
ret = zmq_join(z->subscriber.socket, z->filter);
break;
#endif
case ZEROMQ_PATTERN_PUBSUB:
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0);
@ -425,11 +433,13 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
if (z->filter) {
switch (z->pattern) {
#ifdef ZMQ_BUILD_DISH
case ZEROMQ_PATTERN_RADIODISH:
ret = zmq_msg_set_group(&m, z->filter);
if (ret < 0)
goto fail;
break;
#endif
case ZEROMQ_PATTERN_PUBSUB: /* Send envelope */
zmq_send(z->publisher.socket, z->filter, strlen(z->filter), ZMQ_SNDMORE);