diff --git a/include/villas/node.h b/include/villas/node.h index 22088a958..10eaa15cf 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -177,7 +177,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release); -int node_fd(struct node *n); +int node_poll_fds(struct node *n, int fds[]); struct node_type * node_type(struct node *n); diff --git a/include/villas/node_type.h b/include/villas/node_type.h index 39060298f..3fa79dea2 100644 --- a/include/villas/node_type.h +++ b/include/villas/node_type.h @@ -207,13 +207,13 @@ struct node_type { */ int (*reverse)(struct node *n); - /** Return a file descriptor which can be used by poll / select to detect the availability of new data. + /** Get list of file descriptors which can be used by poll/select to detect the availability of new data. * * This callback is optional. * - * @return This file descriptor. + * @return The number of file descriptors which have been put into \p fds. */ - int (*fd)(struct node *n); + int (*poll_fds)(struct node *n, int fds[]); /** Return a memory allocator which should be used for sample pools passed to this node. */ struct memory_type * (*memory_type)(struct node *n, struct memory_type *parent); diff --git a/lib/node.c b/lib/node.c index 0f517343d..6dc775f00 100644 --- a/lib/node.c +++ b/lib/node.c @@ -609,7 +609,10 @@ int node_reverse(struct node *n) return node_type(n)->reverse ? node_type(n)->reverse(n) : -1; } -int node_fd(struct node *n) +int node_poll_fds(struct node *n, int fds[]) +{ + return node_type(n)->poll_fds ? node_type(n)->poll_fds(n, fds) : -1; +} { return node_type(n)->fd ? node_type(n)->fd(n) : -1; } diff --git a/lib/nodes/amqp.c b/lib/nodes/amqp.c index f6f1b3f0a..d177545e5 100644 --- a/lib/nodes/amqp.c +++ b/lib/nodes/amqp.c @@ -356,13 +356,15 @@ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re return cnt; } -int amqp_fd(struct node *n) +int amqp_poll_fds(struct node *n, int fds[]) { struct amqp *a = n->_vd; amqp_socket_t *sock = amqp_get_socket(a->consumer); - return amqp_socket_get_sockfd(sock); + fds[0] = amqp_socket_get_sockfd(sock); + + return 1; } int amqp_destroy(struct node *n) @@ -404,7 +406,7 @@ static struct plugin p = { .read = amqp_read, .write = amqp_write, .destroy = amqp_destroy, - .fd = amqp_fd + .poll_fds = amqp_poll_fds } }; diff --git a/lib/nodes/cbuilder.c b/lib/nodes/cbuilder.c index 319a6a54b..af440804c 100644 --- a/lib/nodes/cbuilder.c +++ b/lib/nodes/cbuilder.c @@ -154,11 +154,13 @@ int cbuilder_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned return 1; } -int cbuilder_fd(struct node *n) +int cbuilder_poll_fds(struct node *n, int fds[]) { struct cbuilder *cb = (struct cbuilder *) n->_vd; - return cb->eventfd; + fds[0] = cb->eventfd; + + return 1; } static struct plugin p = { @@ -173,7 +175,7 @@ static struct plugin p = { .stop = cbuilder_stop, .read = cbuilder_read, .write = cbuilder_write, - .fd = cbuilder_fd + .poll_fds = cbuilder_poll_fds } }; diff --git a/lib/nodes/comedi.c b/lib/nodes/comedi.c index 1963f313a..3f564f8db 100644 --- a/lib/nodes/comedi.c +++ b/lib/nodes/comedi.c @@ -978,10 +978,13 @@ void comedi_dump_cmd(comedi_cmd *cmd, int debug_level) debug(LOG_COMEDI | debug_level, "stop: %-8s %u", src, cmd->stop_arg); } -int comedi_fd(struct node *n) +int comedi_poll_fds(struct node *n, int fds[]) { struct comedi *c = (struct comedi *) n->_vd; - return comedi_fileno(c->dev); + + fds[0] = comedi_fileno(c->dev); + + return 0; } static struct plugin p = { @@ -997,7 +1000,7 @@ static struct plugin p = { .stop = comedi_stop, .read = comedi_read, .write = comedi_write, - .fd = comedi_fd + .poll_fds = comedi_poll_fds } }; diff --git a/lib/nodes/file.c b/lib/nodes/file.c index b87207273..f854ecb0d 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -421,15 +421,19 @@ int file_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re return cnt; } -int file_fd(struct node *n) +int file_poll_fds(struct node *n, int fds[]) { struct file *f = (struct file *) n->_vd; - if (f->rate) - return task_fd(&f->task); + if (f->rate) { + fds[0] = task_fd(&f->task); + return 1; + } else { - if (f->epoch_mode == FILE_EPOCH_ORIGINAL) - return io_fd(&f->io); + if (f->epoch_mode == FILE_EPOCH_ORIGINAL) { + fds[0] = io_fd(&f->io); + return 1; + } else return -1; /** @todo not supported yet */ } @@ -449,7 +453,7 @@ static struct plugin p = { .restart = file_restart, .read = file_read, .write = file_write, - .fd = file_fd + .poll_fds = file_poll_fds } }; diff --git a/lib/nodes/iec61850_sv.c b/lib/nodes/iec61850_sv.c index aefff482a..25c7d481a 100644 --- a/lib/nodes/iec61850_sv.c +++ b/lib/nodes/iec61850_sv.c @@ -416,11 +416,13 @@ int iec61850_sv_write(struct node *n, struct sample *smps[], unsigned cnt, unsig return cnt; } -int iec61850_sv_fd(struct node *n) +int iec61850_sv_poll_fds(struct node *n, int fds[]) { struct iec61850_sv *i = (struct iec61850_sv *) n->_vd; - return queue_signalled_fd(&i->in.queue); + fds[0] = queue_signalled_fd(&i->in.queue); + + return 1; } static struct plugin p = { @@ -439,7 +441,7 @@ static struct plugin p = { .destroy = iec61850_sv_destroy, .read = iec61850_sv_read, .write = iec61850_sv_write, - .fd = iec61850_sv_fd + .poll_fds = iec61850_sv_poll_fds } }; diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c index 2c0b06802..8dd864d6e 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.c @@ -136,11 +136,13 @@ char * loopback_print(struct node *n) return buf; } -int loopback_fd(struct node *n) +int loopback_poll_fds(struct node *n, int fds[]) { struct loopback *l = (struct loopback *) n->_vd; - return queue_signalled_fd(&l->queue); + fds[0] = queue_signalled_fd(&l->queue); + + return 1; } static struct plugin p = { @@ -149,15 +151,15 @@ static struct plugin p = { .type = PLUGIN_TYPE_NODE, .node = { .vectorize = 0, - .flags = NODE_TYPE_PROVIDES_SIGNALS, - .size = sizeof(struct loopback), - .parse = loopback_parse, - .print = loopback_print, - .start = loopback_start, - .stop = loopback_stop, - .read = loopback_read, - .write = loopback_write, - .fd = loopback_fd + .flags = NODE_TYPE_PROVIDES_SIGNALS, + .size = sizeof(struct loopback), + .parse = loopback_parse, + .print = loopback_print, + .start = loopback_start, + .stop = loopback_stop, + .read = loopback_read, + .write = loopback_write, + .poll_fds = loopback_poll_fds } }; diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index d6d118f56..1cd0b410a 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -406,18 +406,19 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re return -abs(ret); } } - else { - warning("MQTT: no publish for node %s possible because no publish topic is given", node_name(n)); - } + else + warning("MQTT: no publish for node %s possible because no publish topic is given", node_name(n)); return cnt; } -int mqtt_fd(struct node *n) +int mqtt_poll_fds(struct node *n, int fds[]) { struct mqtt *m = (struct mqtt *) n->_vd; - return queue_signalled_fd(&m->queue); + fds[0] = queue_signalled_fd(&m->queue); + + return 1; } static struct plugin p = { @@ -437,7 +438,7 @@ static struct plugin p = { .stop = mqtt_stop, .read = mqtt_read, .write = mqtt_write, - .fd = mqtt_fd + .poll_fds = mqtt_poll_fds } }; diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index a9dc4b191..990766430 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -264,7 +264,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned return cnt; } -int nanomsg_fd(struct node *n) +int nanomsg_poll_fds(struct node *n, int fds[]) { int ret; struct nanomsg *m = (struct nanomsg *) n->_vd; @@ -276,7 +276,9 @@ int nanomsg_fd(struct node *n) if (ret) return ret; - return fd; + fds[0] = fd; + + return 1; } static struct plugin p = { @@ -294,7 +296,7 @@ static struct plugin p = { .stop = nanomsg_stop, .read = nanomsg_read, .write = nanomsg_write, - .fd = nanomsg_fd + .poll_fds = nanomsg_poll_fds } }; diff --git a/lib/nodes/ngsi.c b/lib/nodes/ngsi.c index d576d2e42..489ec059e 100644 --- a/lib/nodes/ngsi.c +++ b/lib/nodes/ngsi.c @@ -572,11 +572,13 @@ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re return ret ? 0 : cnt; } -int ngsi_fd(struct node *n) +int ngsi_poll_fds(struct node *n) { struct ngsi *i = (struct ngsi *) n->_vd; - return task_fd(&i->task); + fds[0] = task_fd(&i->task); + + return 1; } static struct plugin p = { @@ -594,7 +596,7 @@ static struct plugin p = { .stop = ngsi_stop, .read = ngsi_read, .write = ngsi_write, - .fd = ngsi_fd + .poll_fds = ngsi_poll_fds } }; diff --git a/lib/nodes/signal_generator.c b/lib/nodes/signal_generator.c index e33bb6b96..18812bf00 100644 --- a/lib/nodes/signal_generator.c +++ b/lib/nodes/signal_generator.c @@ -289,11 +289,13 @@ char * signal_generator_print(struct node *n) return buf; } -int signal_generator_fd(struct node *n) +int signal_generator_poll_fds(struct node *n, int fds[]) { struct signal_generator *s = (struct signal_generator *) n->_vd; - return task_fd(&s->task); + fds[0] = task_fd(&s->task); + + return 1; } static struct plugin p = { @@ -309,7 +311,7 @@ static struct plugin p = { .start = signal_generator_start, .stop = signal_generator_stop, .read = signal_generator_read, - .fd = signal_generator_fd + .poll_fds = signal_generator_poll_fds } }; diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 3ae0bdd70..8c2350705 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -844,11 +844,13 @@ int socket_compare_addr(struct sockaddr *x, struct sockaddr *y) #undef CMP } -int socket_fd(struct node *n) +int socket_fds(struct node *n, int fds[]) { struct socket *s = (struct socket *) n->_vd; - return s->sd; + fds[0] = s->sd; + + return 1; } static struct plugin p = { @@ -873,7 +875,7 @@ static struct plugin p = { .stop = socket_stop, .read = socket_read, .write = socket_write, - .fd = socket_fd + .poll_fds = socket_fds, } }; diff --git a/lib/nodes/stats.c b/lib/nodes/stats.c index bf76d17de..643097919 100644 --- a/lib/nodes/stats.c +++ b/lib/nodes/stats.c @@ -197,11 +197,13 @@ int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigne return 1; } -int stats_node_fd(struct node *n) +int stats_node_poll_fds(struct node *n, int fds[]) { struct stats_node *s = (struct stats_node *) n->_vd; - return task_fd(&s->task); + fds[0] = task_fd(&s->task); + + return 0; } static struct plugin p = { @@ -219,7 +221,7 @@ static struct plugin p = { .start = stats_node_start, .stop = stats_node_stop, .read = stats_node_read, - .fd = stats_node_fd + .poll_fds = stats_node_poll_fds, } }; diff --git a/lib/nodes/test_rtt.c b/lib/nodes/test_rtt.c index 7eaf5770e..01e5921ac 100644 --- a/lib/nodes/test_rtt.c +++ b/lib/nodes/test_rtt.c @@ -380,11 +380,13 @@ int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned return i; } -int test_rtt_fd(struct node *n) +int test_rtt_poll_fds(struct node *n, int fds[]) { struct test_rtt *t = (struct test_rtt *) n->_vd; - return task_fd(&t->task); + fds[0] = task_fd(&t->task); + + return 1; } static struct plugin p = { @@ -402,7 +404,7 @@ static struct plugin p = { .stop = test_rtt_stop, .read = test_rtt_read, .write = test_rtt_write, - .fd = test_rtt_fd + .poll_fds = test_rtt_poll_fds } }; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 853eac391..80ec10f7a 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -602,11 +602,13 @@ char * websocket_print(struct node *n) return buf; } -int websocket_fd(struct node *n) +int websocket_poll_fds(struct node *n, int fds[]) { struct websocket *w = (struct websocket *) n->_vd; - return queue_signalled_fd(&w->queue); + fds[0] = queue_signalled_fd(&w->queue); + + return 1; } static struct plugin p = { @@ -624,7 +626,7 @@ static struct plugin p = { .write = websocket_write, .print = websocket_print, .parse = websocket_parse, - .fd = websocket_fd + .poll_fds = websocket_poll_fds } }; diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 3a17aa1f6..642fad8ad 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -525,7 +525,7 @@ fail: return ret; } -int zeromq_fd(struct node *n) +int zeromq_poll_fds(struct node *n, int fds[]) { int ret; struct zeromq *z = (struct zeromq *) n->_vd; @@ -537,7 +537,9 @@ int zeromq_fd(struct node *n) if (ret) return ret; - return fd; + fds[0] = fd; + + return 1; } static struct plugin p = { @@ -557,7 +559,7 @@ static struct plugin p = { .destroy = zeromq_destroy, .read = zeromq_read, .write = zeromq_write, - .fd = zeromq_fd + .poll_fds = zeromq_poll_fds } }; diff --git a/lib/path.c b/lib/path.c index d8595931a..68ffa39a0 100644 --- a/lib/path.c +++ b/lib/path.c @@ -365,12 +365,17 @@ int path_init_poll(struct path *p) for (int i = 0; i < vlist_length(&p->sources); i++) { struct path_source *ps = (struct path_source *) vlist_at(&p->sources, i); - /* This slot is only used if it is not masked */ - p->reader.pfds[i].events = POLLIN; - p->reader.pfds[i].fd = node_fd(ps->node); + int fds[16]; + int num_fds = node_poll_fds(ps->node, fds); - if (p->reader.pfds[i].fd < 0) - error("Failed to get file descriptor for node %s", node_name(ps->node)); + for (int i = 0; i < num_fds; i++) { + if (fds[i] < 0) + error("Failed to get file descriptor for node %s", node_name(ps->node)); + + /* This slot is only used if it is not masked */ + p->reader.pfds[i].events = POLLIN; + p->reader.pfds[i].fd = fds[i]; + } } /* We use the last slot for the timeout timer. */ @@ -641,7 +646,10 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) if (p->poll == -1) { struct path_source *ps = (struct path_source *) vlist_at(&p->sources, 0); - p->poll = (p->rate > 0 || vlist_length(&p->sources) > 1) && node_fd(ps->node) != -1; + int fds[16]; + int num_fds = node_poll_fds(ps->node, fds); + + p->poll = (p->rate > 0 || vlist_length(&p->sources) > 1) && num_fds > 0; } ret = vlist_destroy(&sources, NULL, false);