diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index aa11ae1c9..bae2eab82 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -241,7 +241,7 @@ int zeromq_start(struct node *n) /* Join group */ switch (z->pattern) { case ZEROMQ_PATTERN_RADIODISH: - ret = zmq_join(z->publisher.socket, z->filter); + ret = zmq_join(z->subscriber.socket, z->filter); break; case ZEROMQ_PATTERN_PUBSUB: @@ -317,25 +317,25 @@ int zeromq_start(struct node *n) if (ret < 0) goto fail; #endif + + /* Spawn server for publisher */ + for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { + char *ep = list_at(&z->publisher.endpoints, i); + + ret = zmq_bind(z->publisher.socket, ep); + if (ret) + goto fail; + } - /* Bind subscriber socket */ + /* Connect subscribers to server socket */ if (z->subscriber.endpoint) { - ret = zmq_bind(z->subscriber.socket, z->subscriber.endpoint); + ret = zmq_connect(z->subscriber.socket, z->subscriber.endpoint); if (ret) { info("Failed to bind ZeroMQ socket: endpoint=%s, error=%s", z->subscriber.endpoint, zmq_strerror(errno)); return ret; } } - /* Connect publisher socket */ - for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { - char *ep = list_at(&z->publisher.endpoints, i); - - ret = zmq_connect(z->publisher.socket, ep); - if (ret) - goto fail; - } - #ifdef ZMQ_BUILD_DRAFT_API if (z->curve.enabled) { ret = get_monitor_event(z->subscriber.mon_socket, NULL, NULL);