diff --git a/include/villas/nodes/zeromq.h b/include/villas/nodes/zeromq.h index 5dc4c8d26..87c477a67 100644 --- a/include/villas/nodes/zeromq.h +++ b/include/villas/nodes/zeromq.h @@ -51,7 +51,6 @@ struct sample; struct zeromq { int ipv6; - char *filter; struct format_type *format; struct io io; @@ -75,12 +74,14 @@ struct zeromq { void *socket; /**< ZeroMQ socket. */ void *mon_socket; char *endpoint; - } subscriber; + char *filter; + } in; struct { void *socket; /**< ZeroMQ socket. */ struct list endpoints; - } publisher; + char *filter; + } out; }; /** @see node_type::print */ diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 05c40b69a..4c19f5b50 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -79,14 +79,14 @@ int zeromq_reverse(struct node *n) { struct zeromq *z = (struct zeromq *) n->_vd; - if (list_length(&z->publisher.endpoints) != 1) + if (list_length(&z->out.endpoints) != 1) return -1; - char *subscriber = z->subscriber.endpoint; - char *publisher = list_first(&z->publisher.endpoints); + char *subscriber = z->in.endpoint; + char *publisher = list_first(&z->out.endpoints); - z->subscriber.endpoint = publisher; - list_set(&z->publisher.endpoints, 0, subscriber); + z->in.endpoint = publisher; + list_set(&z->out.endpoints, 0, subscriber); return 0; } @@ -98,7 +98,8 @@ int zeromq_parse(struct node *n, json_t *cfg) int ret; const char *ep = NULL; const char *type = NULL; - const char *filter = NULL; + const char *in_filter = NULL; + const char *out_filter = NULL; const char *format = "villas.human"; size_t i; @@ -107,16 +108,19 @@ int zeromq_parse(struct node *n, json_t *cfg) json_t *json_val; json_error_t err; - list_init(&z->publisher.endpoints); + list_init(&z->out.endpoints); z->curve.enabled = false; z->ipv6 = 0; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: o, s?: o, s?: s, s?: s, s?: b, s?: s }", - "subscribe", &ep, - "publish", &json_pub, + ret = json_unpack_ex(cfg, &err, 0, "{ s?: { s?: s, s?: s }, s?: { s?: o, s?: s }, s?: o, s?: s, s?: b, s?: s }", + "in", + "subscribe", &ep, + "filter", &in_filter, + "out", + "publish", &json_pub, + "filter", &out_filter, "curve", &json_curve, - "filter", &filter, "pattern", &type, "ipv6", &z->ipv6, "format", &format @@ -124,8 +128,9 @@ int zeromq_parse(struct node *n, json_t *cfg) if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); - z->subscriber.endpoint = ep ? strdup(ep) : NULL; - z->filter = filter ? strdup(filter) : NULL; + z->in.endpoint = ep ? strdup(ep) : NULL; + z->in.filter = in_filter ? strdup(in_filter) : NULL; + z->out.filter = out_filter ? strdup(out_filter) : NULL; z->format = format_type_lookup(format); if (!z->format) @@ -139,14 +144,14 @@ int zeromq_parse(struct node *n, json_t *cfg) if (!ep) error("All 'publish' settings must be strings"); - list_push(&z->publisher.endpoints, strdup(ep)); + list_push(&z->out.endpoints, strdup(ep)); } break; case JSON_STRING: ep = json_string_value(json_pub); - list_push(&z->publisher.endpoints, strdup(ep)); + list_push(&z->out.endpoints, strdup(ep)); break; @@ -210,24 +215,27 @@ char * zeromq_print(struct node *n) #endif } - strcatf(&buf, "format=%s, pattern=%s, ipv6=%s, crypto=%s, subscribe=%s, publish=[ ", + strcatf(&buf, "format=%s, pattern=%s, ipv6=%s, crypto=%s, in.subscribe=%s, out.publish=[ ", format_type_name(z->format), pattern, z->ipv6 ? "yes" : "no", z->curve.enabled ? "yes" : "no", - z->subscriber.endpoint + z->in.endpoint ? z->in.endpoint : "" ); - for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { - char *ep = (char *) list_at(&z->publisher.endpoints, i); + for (size_t i = 0; i < list_length(&z->out.endpoints); i++) { + char *ep = (char *) list_at(&z->out.endpoints, i); strcatf(&buf, "%s ", ep); } strcatf(&buf, "]"); - if (z->filter) - strcatf(&buf, ", filter=%s", z->filter); + if (z->in.filter) + strcatf(&buf, ", in.filter=%s", z->in.filter); + + if (z->out.filter) + strcatf(&buf, ", out.filter=%s", z->out.filter); return buf; } @@ -249,25 +257,29 @@ int zeromq_start(struct node *n) int ret; struct zeromq *z = (struct zeromq *) n->_vd; - ret = io_init(&z->io, z->format, n, SAMPLE_HAS_ALL); + ret = io_init(&z->io, z->format, &n->signals, SAMPLE_HAS_ALL); + if (ret) + return ret; + + ret = io_check(&z->io); if (ret) return ret; switch (z->pattern) { #ifdef ZMQ_BUILD_DISH case ZEROMQ_PATTERN_RADIODISH: - z->subscriber.socket = zmq_socket(context, ZMQ_DISH); - z->publisher.socket = zmq_socket(context, ZMQ_RADIO); + z->in.socket = zmq_socket(context, ZMQ_DISH); + z->out.socket = zmq_socket(context, ZMQ_RADIO); break; #endif case ZEROMQ_PATTERN_PUBSUB: - z->subscriber.socket = zmq_socket(context, ZMQ_SUB); - z->publisher.socket = zmq_socket(context, ZMQ_PUB); + z->in.socket = zmq_socket(context, ZMQ_SUB); + z->out.socket = zmq_socket(context, ZMQ_PUB); break; } - if (!z->subscriber.socket || !z->publisher.socket) { + if (!z->in.socket || !z->out.socket) { ret = -1; goto fail; } @@ -276,12 +288,12 @@ int zeromq_start(struct node *n) switch (z->pattern) { #ifdef ZMQ_BUILD_DISH case ZEROMQ_PATTERN_RADIODISH: - ret = zmq_join(z->subscriber.socket, z->filter); + ret = zmq_join(z->in.socket, z->in.filter); break; #endif case ZEROMQ_PATTERN_PUBSUB: - ret = zmq_setsockopt(z->subscriber.socket, ZMQ_SUBSCRIBE, z->filter, z->filter ? strlen(z->filter) : 0); + ret = zmq_setsockopt(z->in.socket, ZMQ_SUBSCRIBE, z->in.filter, z->in.filter ? strlen(z->in.filter) : 0); break; default: @@ -291,90 +303,88 @@ int zeromq_start(struct node *n) if (ret < 0) goto fail; - ret = zmq_setsockopt(z->publisher.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); + ret = zmq_setsockopt(z->out.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) goto fail; - ret = zmq_setsockopt(z->subscriber.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); + ret = zmq_setsockopt(z->in.socket, ZMQ_IPV6, &z->ipv6, sizeof(z->ipv6)); if (ret) goto fail; if (z->curve.enabled) { /* Publisher has server role */ - ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SECRETKEY, z->curve.server.secret_key, 41); + ret = zmq_setsockopt(z->out.socket, ZMQ_CURVE_SECRETKEY, z->curve.server.secret_key, 41); if (ret) goto fail; - ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_PUBLICKEY, z->curve.server.public_key, 41); + ret = zmq_setsockopt(z->out.socket, ZMQ_CURVE_PUBLICKEY, z->curve.server.public_key, 41); if (ret) goto fail; int curve_server = 1; - ret = zmq_setsockopt(z->publisher.socket, ZMQ_CURVE_SERVER, &curve_server, sizeof(curve_server)); + ret = zmq_setsockopt(z->out.socket, ZMQ_CURVE_SERVER, &curve_server, sizeof(curve_server)); if (ret) goto fail; - } - if (z->curve.enabled) { /* Create temporary client keys first */ ret = zmq_curve_keypair(z->curve.client.public_key, z->curve.client.secret_key); if (ret) goto fail; /* Subscriber has client role */ - ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SECRETKEY, z->curve.client.secret_key, 41); + ret = zmq_setsockopt(z->in.socket, ZMQ_CURVE_SECRETKEY, z->curve.client.secret_key, 41); if (ret) goto fail; - ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_PUBLICKEY, z->curve.client.public_key, 41); + ret = zmq_setsockopt(z->in.socket, ZMQ_CURVE_PUBLICKEY, z->curve.client.public_key, 41); if (ret) goto fail; - ret = zmq_setsockopt(z->subscriber.socket, ZMQ_CURVE_SERVERKEY, z->curve.server.public_key, 41); + ret = zmq_setsockopt(z->in.socket, ZMQ_CURVE_SERVERKEY, z->curve.server.public_key, 41); if (ret) goto fail; } #ifdef ZMQ_BUILD_DRAFT_API /* Monitor handshake events on the server */ - ret = zmq_socket_monitor(z->subscriber.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED); + ret = zmq_socket_monitor(z->in.socket, "inproc://monitor-server", ZMQ_EVENT_HANDSHAKE_SUCCEED | ZMQ_EVENT_HANDSHAKE_FAILED); if (ret < 0) goto fail; /* Create socket for collecting monitor events */ - z->subscriber.mon_socket = zmq_socket(context, ZMQ_PAIR); - if (!z->subscriber.mon_socket) { + z->in.mon_socket = zmq_socket(context, ZMQ_PAIR); + if (!z->in.mon_socket) { ret = -1; goto fail; } /* Connect it to the inproc endpoints so they'll get events */ - ret = zmq_connect(z->subscriber.mon_socket, "inproc://monitor-server"); + ret = zmq_connect(z->in.mon_socket, "inproc://monitor-server"); if (ret < 0) goto fail; #endif /* Spawn server for publisher */ - for (size_t i = 0; i < list_length(&z->publisher.endpoints); i++) { - char *ep = (char *) list_at(&z->publisher.endpoints, i); + for (size_t i = 0; i < list_length(&z->out.endpoints); i++) { + char *ep = (char *) list_at(&z->out.endpoints, i); - ret = zmq_bind(z->publisher.socket, ep); + ret = zmq_bind(z->out.socket, ep); if (ret < 0) goto fail; } /* Connect subscribers to server socket */ - if (z->subscriber.endpoint) { - ret = zmq_connect(z->subscriber.socket, z->subscriber.endpoint); + if (z->in.endpoint) { + ret = zmq_connect(z->in.socket, z->in.endpoint); if (ret < 0) { - info("Failed to bind ZeroMQ socket: endpoint=%s, error=%s", z->subscriber.endpoint, zmq_strerror(errno)); + info("Failed to bind ZeroMQ socket: endpoint=%s, error=%s", z->in.endpoint, zmq_strerror(errno)); return ret; } } #ifdef ZMQ_BUILD_DRAFT_API if (z->curve.enabled) { - ret = get_monitor_event(z->subscriber.mon_socket, NULL, NULL); + ret = get_monitor_event(z->in.mon_socket, NULL, NULL); return ret == ZMQ_EVENT_HANDSHAKE_SUCCEED; } else @@ -394,17 +404,21 @@ int zeromq_stop(struct node *n) int ret; struct zeromq *z = (struct zeromq *) n->_vd; - ret = zmq_close(z->subscriber.socket); + ret = zmq_close(z->in.socket); if (ret) return ret; #ifdef ZMQ_BUILD_DRAFT_API - ret = zmq_close(z->subscriber.mon_socket); + ret = zmq_close(z->in.mon_socket); if (ret) return ret; #endif - return zmq_close(z->publisher.socket); + ret = io_destroy(&z->io); + if (ret) + return ret; + + return zmq_close(z->out.socket); } int zeromq_destroy(struct node *n) @@ -412,7 +426,13 @@ int zeromq_destroy(struct node *n) int ret; struct zeromq *z = (struct zeromq *) n->_vd; - ret = io_destroy(&z->io); + if (z->in.filter) + free(z->in.filter); + + if (z->out.filter) + free(z->out.filter); + + ret = list_destroy(&z->out.endpoints, NULL, true); if (ret) return ret; @@ -430,11 +450,11 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r if (ret < 0) return ret; - if (z->filter) { + if (z->in.filter) { switch (z->pattern) { case ZEROMQ_PATTERN_PUBSUB: /* Discard envelope */ - zmq_recv(z->subscriber.socket, NULL, 0, 0); + zmq_recv(z->in.socket, NULL, 0, 0); break; default: { } @@ -442,7 +462,7 @@ int zeromq_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r } /* Receive payload */ - ret = zmq_msg_recv(&m, z->subscriber.socket, 0); + ret = zmq_msg_recv(&m, z->in.socket, 0); if (ret < 0) return ret; @@ -471,25 +491,25 @@ int zeromq_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned * ret = zmq_msg_init_size(&m, wbytes); - if (z->filter) { + if (z->out.filter) { switch (z->pattern) { #ifdef ZMQ_BUILD_DISH case ZEROMQ_PATTERN_RADIODISH: - ret = zmq_msg_set_group(&m, z->filter); + ret = zmq_msg_set_group(&m, z->out.filter); if (ret < 0) goto fail; break; #endif case ZEROMQ_PATTERN_PUBSUB: /* Send envelope */ - zmq_send(z->publisher.socket, z->filter, strlen(z->filter), ZMQ_SNDMORE); + zmq_send(z->out.socket, z->out.filter, strlen(z->out.filter), ZMQ_SNDMORE); break; } } memcpy(zmq_msg_data(&m), data, wbytes); - ret = zmq_msg_send(&m, z->publisher.socket, 0); + ret = zmq_msg_send(&m, z->out.socket, 0); if (ret < 0) goto fail; @@ -513,7 +533,7 @@ int zeromq_fd(struct node *n) int fd; size_t len = sizeof(fd); - ret = zmq_getsockopt(z->subscriber.socket, ZMQ_FD, &fd, &len); + ret = zmq_getsockopt(z->in.socket, ZMQ_FD, &fd, &len); if (ret) return ret;