diff --git a/include/villas/nodes/nanomsg.h b/include/villas/nodes/nanomsg.h index 91d2496b2..c180f49ff 100644 --- a/include/villas/nodes/nanomsg.h +++ b/include/villas/nodes/nanomsg.h @@ -47,12 +47,7 @@ struct nanomsg { struct { int socket; struct list endpoints; - } publisher; - - struct { - int socket; - struct list endpoints; - } subscriber; + } in, out; struct format_type *format; struct io io; diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index 9748f1340..0abaf7a32 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -33,15 +33,15 @@ int nanomsg_reverse(struct node *n) { struct nanomsg *m = (struct nanomsg *) n->_vd; - if (list_length(&m->publisher.endpoints) != 1 || - list_length(&m->subscriber.endpoints) != 1) + if (list_length(&m->out.endpoints) != 1 || + list_length(&m->in.endpoints) != 1) return -1; - char *subscriber = list_first(&m->subscriber.endpoints); - char *publisher = list_first(&m->publisher.endpoints); + char *subscriber = list_first(&m->in.endpoints); + char *publisher = list_first(&m->out.endpoints); - list_set(&m->subscriber.endpoints, 0, publisher); - list_set(&m->publisher.endpoints, 0, subscriber); + list_set(&m->in.endpoints, 0, publisher); + list_set(&m->out.endpoints, 0, subscriber); return 0; } @@ -86,28 +86,30 @@ int nanomsg_parse(struct node *n, json_t *cfg) json_error_t err; - json_t *json_pub = NULL; - json_t *json_sub = NULL; + json_t *json_out_endpoints = NULL; + json_t *json_in_endpoints = NULL; - list_init(&m->publisher.endpoints); - list_init(&m->subscriber.endpoints); + list_init(&m->out.endpoints); + list_init(&m->in.endpoints); - ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: s }", - "publish", &json_pub, - "subscribe", &json_sub, - "format", &format + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: { s?: o }, s?: { s?: o } }", + "format", &format, + "out", + "endpoints", &json_out_endpoints, + "in", + "endpoints", &json_in_endpoints ); if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); - if (json_pub) { - ret = nanomsg_parse_endpoints(&m->publisher.endpoints, json_pub); + if (json_out_endpoints) { + ret = nanomsg_parse_endpoints(&m->out.endpoints, json_out_endpoints); if (ret < 0) error("Invalid type for 'publish' setting of node %s", node_name(n)); } - if (json_sub) { - ret = nanomsg_parse_endpoints(&m->subscriber.endpoints, json_sub); + if (json_in_endpoints) { + ret = nanomsg_parse_endpoints(&m->in.endpoints, json_in_endpoints); if (ret < 0) error("Invalid type for 'subscribe' setting of node %s", node_name(n)); } @@ -127,16 +129,16 @@ char * nanomsg_print(struct node *n) strcatf(&buf, "format=%s, in.endpoints=[ ", format_type_name(m->format)); - for (size_t i = 0; i < list_length(&m->subscriber.endpoints); i++) { - char *ep = (char *) list_at(&m->subscriber.endpoints, i); + for (size_t i = 0; i < list_length(&m->in.endpoints); i++) { + char *ep = (char *) list_at(&m->in.endpoints, i); strcatf(&buf, "%s ", ep); } - strcatf(&buf, "], publish=[ "); + strcatf(&buf, "], out.endpoints=[ "); - for (size_t i = 0; i < list_length(&m->publisher.endpoints); i++) { - char *ep = (char *) list_at(&m->publisher.endpoints, i); + for (size_t i = 0; i < list_length(&m->out.endpoints); i++) { + char *ep = (char *) list_at(&m->out.endpoints, i); strcatf(&buf, "%s ", ep); } @@ -151,41 +153,49 @@ int nanomsg_start(struct node *n) int ret; struct nanomsg *m = (struct nanomsg *) n->_vd; - ret = io_init(&m->io, m->format, n, SAMPLE_HAS_ALL); + ret = io_init(&m->io, m->format, &n->signals, SAMPLE_HAS_ALL); if (ret) return ret; - ret = m->subscriber.socket = nn_socket(AF_SP, NN_SUB); - if (ret < 0) + ret = io_check(&m->io); + if (ret) return ret; - ret = m->publisher.socket = nn_socket(AF_SP, NN_PUB); - if (ret < 0) + ret = m->in.socket = nn_socket(AF_SP, NN_SUB); + if (ret < 0) { + warn("Failed to create nanomsg socket: node=%s, error=%s", node_name(n), nn_strerror(errno)); return ret; + } + + ret = m->out.socket = nn_socket(AF_SP, NN_PUB); + if (ret < 0) { + warn("Failed to create nanomsg socket: node=%s, error=%s", node_name(n), nn_strerror(errno)); + return ret; + } /* Subscribe to all topics */ - ret = nn_setsockopt(ret = m->subscriber.socket, NN_SUB, NN_SUB_SUBSCRIBE, "", 0); + ret = nn_setsockopt(ret = m->in.socket, NN_SUB, NN_SUB_SUBSCRIBE, "", 0); if (ret < 0) return ret; /* Bind publisher to socket */ - for (size_t i = 0; i < list_length(&m->publisher.endpoints); i++) { - char *ep = (char *) list_at(&m->publisher.endpoints, i); + for (size_t i = 0; i < list_length(&m->out.endpoints); i++) { + char *ep = (char *) list_at(&m->out.endpoints, i); - ret = nn_bind(m->publisher.socket, ep); + ret = nn_bind(m->out.socket, ep); if (ret < 0) { - info("Failed to connect nanomsg socket: node=%s, endpoint=%s, error=%s", node_name(n), ep, nn_strerror(errno)); + warn("Failed to connect nanomsg socket: node=%s, endpoint=%s, error=%s", node_name(n), ep, nn_strerror(errno)); return ret; } } - /* Sonnect subscribers socket */ - for (size_t i = 0; i < list_length(&m->subscriber.endpoints); i++) { - char *ep = (char *) list_at(&m->subscriber.endpoints, i); + /* Connect subscribers socket */ + for (size_t i = 0; i < list_length(&m->in.endpoints); i++) { + char *ep = (char *) list_at(&m->in.endpoints, i); - ret = nn_connect(m->subscriber.socket, ep); + ret = nn_connect(m->in.socket, ep); if (ret < 0) { - info("Failed to connect nanomsg socket: node=%s, endpoint=%s, error=%s", node_name(n), ep, nn_strerror(errno)); + warn("Failed to connect nanomsg socket: node=%s, endpoint=%s, error=%s", node_name(n), ep, nn_strerror(errno)); return ret; } } @@ -198,22 +208,14 @@ int nanomsg_stop(struct node *n) int ret; struct nanomsg *m = (struct nanomsg *) n->_vd; - ret = nn_close(m->subscriber.socket); + ret = nn_close(m->in.socket); if (ret < 0) return ret; - ret = nn_close(m->publisher.socket); + ret = nn_close(m->out.socket); if (ret < 0) return ret; - return 0; -} - -int nanomsg_destroy(struct node *n) -{ - int ret; - struct nanomsg *m = (struct nanomsg *) n->_vd; - ret = io_destroy(&m->io); if (ret) return ret; @@ -235,7 +237,7 @@ int nanomsg_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned * char data[NANOMSG_MAX_PACKET_LEN]; /* Receive payload */ - bytes = nn_recv(m->subscriber.socket, data, sizeof(data), 0); + bytes = nn_recv(m->in.socket, data, sizeof(data), 0); if (bytes < 0) return -1; @@ -255,7 +257,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned if (ret <= 0) return -1; - ret = nn_send(m->publisher.socket, data, wbytes, 0); + ret = nn_send(m->out.socket, data, wbytes, 0); if (ret < 0) return ret; @@ -270,7 +272,7 @@ int nanomsg_fd(struct node *n) int fd; size_t len = sizeof(fd); - ret = nn_getsockopt(m->subscriber.socket, NN_SOL_SOCKET, NN_RCVFD, &fd, &len); + ret = nn_getsockopt(m->in.socket, NN_SOL_SOCKET, NN_RCVFD, &fd, &len); if (ret) return ret; @@ -284,13 +286,12 @@ static struct plugin p = { .node = { .vectorize = 0, .size = sizeof(struct nanomsg), - .type.start = nanomsg_type_stop, + .type.stop = nanomsg_type_stop, .reverse = nanomsg_reverse, .parse = nanomsg_parse, .print = nanomsg_print, .start = nanomsg_start, .stop = nanomsg_stop, - .destroy = nanomsg_destroy, .read = nanomsg_read, .write = nanomsg_write, .fd = nanomsg_fd