1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

zeromq: check if handshake succeeded

This commit is contained in:
Steffen Vogel 2017-05-23 09:20:57 +02:00
parent b6c39611c1
commit 0f197cb223

View file

@ -30,6 +30,45 @@
static void *context;
#ifdef ZMQ_BUILD_DRAFT_API
/** Read one event off the monitor socket; return value and address
* by reference, if not null, and event number by value.
*
* @returnval -1 In case of error. */
static int get_monitor_event(void *monitor, int *value, char **address)
{
/* First frame in message contains event number and value */
zmq_msg_t msg;
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; /* Interruped, presumably. */
assert (zmq_msg_more (&msg));
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
if (value)
*value = *(uint32_t *) (data + 2);
/* Second frame in message contains event address */
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; /* Interruped, presumably. */
assert (!zmq_msg_more (&msg));
if (address) {
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
size_t size = zmq_msg_size (&msg);
*address = (char *) malloc (size + 1);
memcpy (*address, data, size);
*address [size] = 0;
}
return event;
}
#endif
int zeromq_reverse(struct node *n)
{
struct zeromq *z = n->_vd;
@ -171,6 +210,20 @@ int zeromq_start(struct node *n)
/* Subscribe to all pubsub messages. */
zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, NULL, 0);
#ifdef ZMQ_BUILD_DRAFT_API
/* Monitor handshake events on the server */
ret = zmq_socket_monitor(z->subscriber.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED);
assert(ret == 0);
/* Create socket for collecting monitor events */
void *server_mon = zmq_socket(context, ZMQ_PAIR);
assert(server_mon);
/* Connect it to the inproc endpoints so they'll get events */
ret = zmq_connect(server_mon, "inproc://monitor-server");
assert(ret == 0);
#endif
/* Connect publisher socket */
for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) {
@ -180,8 +233,13 @@ int zeromq_start(struct node *n)
if (ret)
return ret;
}
#ifdef ZMQ_BUILD_DRAFT_API
ret = get_monitor_event(server_mon, NULL, NULL);
return ret == ZMQ_EVENT_HANDSHAKE_SUCCEED;
#else
return 0;
#endif
}
int zeromq_stop(struct node *n)