diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index a0c6b3ee6..e17e73c44 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -30,6 +30,45 @@ static void *context; +#ifdef ZMQ_BUILD_DRAFT_API +/** Read one event off the monitor socket; return value and address + * by reference, if not null, and event number by value. + * + * @returnval -1 In case of error. */ +static int get_monitor_event(void *monitor, int *value, char **address) +{ + /* First frame in message contains event number and value */ + zmq_msg_t msg; + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; /* Interruped, presumably. */ + + assert (zmq_msg_more (&msg)); + + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + uint16_t event = *(uint16_t *) (data); + if (value) + *value = *(uint32_t *) (data + 2); + + /* Second frame in message contains event address */ + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; /* Interruped, presumably. */ + + assert (!zmq_msg_more (&msg)); + + if (address) { + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + size_t size = zmq_msg_size (&msg); + *address = (char *) malloc (size + 1); + memcpy (*address, data, size); + *address [size] = 0; + } + + return event; +} +#endif + int zeromq_reverse(struct node *n) { struct zeromq *z = n->_vd; @@ -171,6 +210,20 @@ int zeromq_start(struct node *n) /* Subscribe to all pubsub messages. */ zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, NULL, 0); +#ifdef ZMQ_BUILD_DRAFT_API + /* Monitor handshake events on the server */ + ret = zmq_socket_monitor(z->subscriber.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED); + assert(ret == 0); + + /* Create socket for collecting monitor events */ + void *server_mon = zmq_socket(context, ZMQ_PAIR); + assert(server_mon); + + /* Connect it to the inproc endpoints so they'll get events */ + ret = zmq_connect(server_mon, "inproc://monitor-server"); + assert(ret == 0); +#endif + /* Connect publisher socket */ for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { @@ -180,8 +233,13 @@ int zeromq_start(struct node *n) if (ret) return ret; } - + +#ifdef ZMQ_BUILD_DRAFT_API + ret = get_monitor_event(server_mon, NULL, NULL); + return ret == ZMQ_EVENT_HANDSHAKE_SUCCEED; +#else return 0; +#endif } int zeromq_stop(struct node *n)