diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index ba6fb3a94..9f80a3cad 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -54,6 +54,7 @@ struct zeromq { struct { void *socket; /**< ZeroMQ socket. */ + void *mon_socket; char *endpoint; } subscriber; diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 4083d8d57..47a3acf93 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -43,25 +43,25 @@ static int get_monitor_event(void *monitor, int *value, char **address) if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; /* Interruped, presumably. */ - assert (zmq_msg_more (&msg)); + assert(zmq_msg_more (&msg)); - uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + uint8_t *data = (uint8_t *) zmq_msg_data(&msg); uint16_t event = *(uint16_t *) (data); if (value) *value = *(uint32_t *) (data + 2); /* Second frame in message contains event address */ - zmq_msg_init (&msg); - if (zmq_msg_recv (&msg, monitor, 0) == -1) + zmq_msg_init(&msg); + if (zmq_msg_recv(&msg, monitor, 0) == -1) return -1; /* Interruped, presumably. */ - assert (!zmq_msg_more (&msg)); + assert(!zmq_msg_more(&msg)); if (address) { - uint8_t *data = (uint8_t *) zmq_msg_data (&msg); - size_t size = zmq_msg_size (&msg); - *address = (char *) malloc (size + 1); - memcpy (*address, data, size); + uint8_t *data = (uint8_t *) zmq_msg_data(&msg); + size_t size = zmq_msg_size(&msg); + *address = (char *) malloc(size + 1); + memcpy(*address, data, size); *address [size] = 0; } @@ -233,17 +233,22 @@ int zeromq_start(struct node *n) break; } + if (!z->subscriber.socket || !z->publisher.socket) { + ret = -1; + goto fail; + } + ret = zmq_setsockopt(z->publisher.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) - return ret; + goto fail; ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) return ret; /* Subscribe to pubsub messages. */ - if (z->filter && z->pattern == ZEROMQ_PATTERN_PUBSUB) { - ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, strlen(z->filter)); + if (z->pattern == ZEROMQ_PATTERN_PUBSUB) { + ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0); if (ret < 0) return ret; } @@ -252,50 +257,55 @@ int zeromq_start(struct node *n) /* Publisher has server role */ ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SECRETKEY, z->curve.server.secret_key, 41); if (ret) - return ret; + goto fail; ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_PUBLICKEY, z->curve.server.public_key, 41); if (ret) - return ret; + goto fail; int curve_server = 1; ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SERVER, &curve_server, sizeof(curve_server)); if (ret) - return ret; + goto fail; } if (z->curve.enabled) { /* Create temporary client keys first */ ret = zmq_curve_keypair(z->curve.client.public_key, z->curve.client.secret_key); if (ret) - return ret; + goto fail; /* Subscriber has client role */ ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SECRETKEY, z->curve.client.secret_key, 41); if (ret) - return ret; + goto fail; ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_PUBLICKEY, z->curve.client.public_key, 41); if (ret) - return ret; + goto fail; ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SERVERKEY, z->curve.server.public_key, 41); if (ret) - return ret; + goto fail; } #ifdef ZMQ_BUILD_DRAFT_API /* Monitor handshake events on the server */ ret = zmq_socket_monitor(z->subscriber.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED); - assert(ret == 0); + if (ret < 0) + goto fail; /* Create socket for collecting monitor events */ - void *server_mon = zmq_socket(context, ZMQ_PAIR); - assert(server_mon); + z->subscriber.mon_socket = zmq_socket(context, ZMQ_PAIR); + if (!z->subscriber.mon_socket) { + ret = -1; + goto fail; + } /* Connect it to the inproc endpoints so they'll get events */ - ret = zmq_connect(server_mon, "inproc://monitor-server"); - assert(ret == 0); + ret = zmq_connect(z->subscriber.mon_socket, "inproc://monitor-server"); + if (ret < 0) + goto fail; #endif /* Bind subscriber socket */ @@ -312,18 +322,21 @@ int zeromq_start(struct node *n) char *ep = list_at(&z->publisher.endpoints, i); ret = zmq_connect(z->publisher.socket, ep); - if (ret) { - info("Failed to connect to ZeroMQ endpoint: endpoint=%s, error=%s", ep, zmq_strerror(errno)); - return ret; - } + if (ret) + goto fail; } #ifdef ZMQ_BUILD_DRAFT_API - ret = get_monitor_event(server_mon, NULL, NULL); + ret = get_monitor_event(z->subscriber.mon_socket, NULL, NULL); return ret == ZMQ_EVENT_HANDSHAKE_SUCCEED; #else return 0; #endif + +fail: + info("Failed to start ZeroMQ node: %s, error=%s", node_name(n), zmq_strerror(errno)); + + return ret; } int zeromq_stop(struct node *n)