diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 47a3acf93..96cb9e340 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -238,20 +238,27 @@ int zeromq_start(struct node *n) goto fail; } + /* Join group */ + switch (z->pattern) { + case ZEROMQ_PATTERN_RADIODISH: + ret = zmq_join(z->publisher.socket, z->filter); + break; + + case ZEROMQ_PATTERN_PUBSUB: + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0); + break; + } + + if (ret < 0) + goto fail; + ret = zmq_setsockopt(z->publisher.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) goto fail; ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) - return ret; - - /* Subscribe to pubsub messages. */ - if (z->pattern == ZEROMQ_PATTERN_PUBSUB) { - ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0); - if (ret < 0) - return ret; - } + goto fail; if (z->curve.enabled) { /* Publisher has server role */ @@ -362,6 +369,18 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt) if (ret < 0) return ret; + if (z->filter) { + switch (z->pattern) { + case ZEROMQ_PATTERN_PUBSUB: + /* Discard envelope */ + zmq_recv(z->subscriber.socket, NULL, 0, 0); + break; + + default: { } + } + } + + /* Receive payload */ ret = zmq_msg_recv(&m, z->subscriber.socket, 0); if (ret < 0) return ret; @@ -391,10 +410,18 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt) ret = zmq_msg_init_size(&m, sent); - if (z->filter && z->pattern == ZEROMQ_PATTERN_RADIODISH) { - ret = zmq_msg_set_group(&m, z->filter); - if (ret < 0) - goto fail; + if (z->filter) { + switch (z->pattern) { + case ZEROMQ_PATTERN_RADIODISH: + ret = zmq_msg_set_group(&m, z->filter); + if (ret < 0) + goto fail; + break; + + case ZEROMQ_PATTERN_PUBSUB: /* Send envelope */ + zmq_send(z->publisher.socket, z->filter, strlen(z->filter), ZMQ_SNDMORE); + break; + } } memcpy(zmq_msg_data(&m), data, sent);