1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

zeromq: fix group subscription

This commit is contained in:
Steffen Vogel 2017-05-23 11:15:00 +02:00
parent df028c3b28
commit 48b3898f23

View file

@ -238,20 +238,27 @@ int zeromq_start(struct node *n)
goto fail;
}
/* Join group */
switch (z->pattern) {
case ZEROMQ_PATTERN_RADIODISH:
ret = zmq_join(z->publisher.socket, z->filter);
break;
case ZEROMQ_PATTERN_PUBSUB:
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0);
break;
}
if (ret < 0)
goto fail;
ret = zmq_setsockopt(z->publisher.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6));
if (ret)
goto fail;
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6));
if (ret)
return ret;
/* Subscribe to pubsub messages. */
if (z->pattern == ZEROMQ_PATTERN_PUBSUB) {
ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0);
if (ret < 0)
return ret;
}
goto fail;
if (z->curve.enabled) {
/* Publisher has server role */
@ -362,6 +369,18 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt)
if (ret < 0)
return ret;
if (z->filter) {
switch (z->pattern) {
case ZEROMQ_PATTERN_PUBSUB:
/* Discard envelope */
zmq_recv(z->subscriber.socket, NULL, 0, 0);
break;
default: { }
}
}
/* Receive payload */
ret = zmq_msg_recv(&m, z->subscriber.socket, 0);
if (ret < 0)
return ret;
@ -391,10 +410,18 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt)
ret = zmq_msg_init_size(&m, sent);
if (z->filter && z->pattern == ZEROMQ_PATTERN_RADIODISH) {
ret = zmq_msg_set_group(&m, z->filter);
if (ret < 0)
goto fail;
if (z->filter) {
switch (z->pattern) {
case ZEROMQ_PATTERN_RADIODISH:
ret = zmq_msg_set_group(&m, z->filter);
if (ret < 0)
goto fail;
break;
case ZEROMQ_PATTERN_PUBSUB: /* Send envelope */
zmq_send(z->publisher.socket, z->filter, strlen(z->filter), ZMQ_SNDMORE);
break;
}
}
memcpy(zmq_msg_data(&m), data, sent);