diff --git a/lib/io.c b/lib/io.c index 4dedc1873..261edd48c 100644 --- a/lib/io.c +++ b/lib/io.c @@ -187,6 +187,8 @@ int io_stream_fd(struct io *io) case IO_MODE_CUSTOM: return -1; } + + return -1; } diff --git a/lib/nodes/file.c b/lib/nodes/file.c index 061f546d4..6ecef27f7 100644 --- a/lib/nodes/file.c +++ b/lib/nodes/file.c @@ -339,6 +339,20 @@ int file_write(struct node *n, struct sample *smps[], unsigned cnt) return cnt; } +int file_fd(struct node *n) +{ + struct file *f = n->_vd; + + if (f->rate) + return task_fd(&f->task); + else { + if (f->epoch_mode == FILE_EPOCH_ORIGINAL) + return io_fd(&f->io); + else + return -1; /** @todo not supported yet */ + } +} + static struct plugin p = { .name = "file", .description = "support for file log / replay node type", @@ -351,7 +365,8 @@ static struct plugin p = { .start = file_start, .stop = file_stop, .read = file_read, - .write = file_write + .write = file_write, + .fd = file_fd } }; diff --git a/lib/nodes/nanomsg.c b/lib/nodes/nanomsg.c index 734d84152..3bf0e4b5c 100644 --- a/lib/nodes/nanomsg.c +++ b/lib/nodes/nanomsg.c @@ -246,6 +246,21 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt) return cnt; } +int nanomsg_fd(struct node *n) +{ + int ret; + struct nanomsg *m = n->_vd; + + int fd; + size_t len = sizeof(fd); + + ret = nn_getsockopt(m->subscriber.socket, NN_SOL_SOCKET, NN_RCVFD, &fd, &len); + if (ret) + return ret; + + return fd; +} + static struct plugin p = { .name = "nanomsg", .description = "scalability protocols library (libnanomsg)", @@ -260,7 +275,8 @@ static struct plugin p = { .stop = nanomsg_stop, .deinit = nanomsg_deinit, .read = nanomsg_read, - .write = nanomsg_write + .write = nanomsg_write, + .fd = nanomsg_fd } }; diff --git a/lib/nodes/ngsi.c b/lib/nodes/ngsi.c index 0c2ba042d..84a14b732 100644 --- a/lib/nodes/ngsi.c +++ b/lib/nodes/ngsi.c @@ -573,6 +573,13 @@ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt) return ret ? 0 : cnt; } +int ngsi_fd(struct node *n) +{ + struct ngsi *i = n->_vd; + + return task_fd(&i->task); +} + static struct plugin p = { .name = "ngsi", .description = "OMA Next Generation Services Interface 10 (libcurl, libjansson)", @@ -587,7 +594,8 @@ static struct plugin p = { .read = ngsi_read, .write = ngsi_write, .init = ngsi_init, - .deinit = ngsi_deinit + .deinit = ngsi_deinit, + .fd = ngsi_fd } }; diff --git a/lib/nodes/signal.c b/lib/nodes/signal.c index cba5dff57..f718c9beb 100644 --- a/lib/nodes/signal.c +++ b/lib/nodes/signal.c @@ -265,7 +265,14 @@ char * signal_print(struct node *n) strcatf(&buf, ", limit=%d", s->limit); return buf; -}; +} + +int signal_fd(struct node *n) +{ + struct signal *s = n->_vd; + + return task_fd(&s->task); +} static struct plugin p = { .name = "signal", @@ -280,6 +287,7 @@ static struct plugin p = { .start = signal_open, .stop = signal_close, .read = signal_read, + .fd = signal_fd } }; diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 4035694c4..30f557be6 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -703,6 +703,13 @@ int socket_compare_addr(struct sockaddr *x, struct sockaddr *y) #undef CMP } +int socket_fd(struct node *n) +{ + struct socket *s = n->_vd; + + return s->sd; +} + static struct plugin p = { .name = "socket", .description = "BSD network sockets for Ethernet / IP / UDP (libnl3)", @@ -719,7 +726,8 @@ static struct plugin p = { .read = socket_read, .write = socket_write, .init = socket_init, - .deinit = socket_deinit + .deinit = socket_deinit, + .fd = socket_fd } }; diff --git a/lib/nodes/test_rtt.c b/lib/nodes/test_rtt.c index 7a718d46f..6da4b7415 100644 --- a/lib/nodes/test_rtt.c +++ b/lib/nodes/test_rtt.c @@ -235,6 +235,13 @@ int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt) return i; } +int test_rtt_fd(struct node *n) +{ + struct test_rtt *t = n->_vd; + + return task_fd(&t->task); +} + static struct plugin p = { .name = "test_rtt", .description = "Test round-trip time with loopback", @@ -247,7 +254,8 @@ static struct plugin p = { .start = test_rtt_start, .stop = test_rtt_stop, .read = test_rtt_read, - .write = test_rtt_write + .write = test_rtt_write, + .fd = test_rtt_fd } }; diff --git a/lib/nodes/zeromq.c b/lib/nodes/zeromq.c index 721a834e4..66819249e 100644 --- a/lib/nodes/zeromq.c +++ b/lib/nodes/zeromq.c @@ -488,6 +488,21 @@ fail: return ret; } +int zeromq_fd(struct node *n) +{ + int ret; + struct zeromq *z = n->_vd; + + int fd; + size_t len = sizeof(fd); + + ret = zmq_getsockopt(z->subscriber.socket, ZMQ_FD, &fd, &len); + if (ret) + return ret; + + return fd; +} + static struct plugin p = { .name = "zeromq", .description = "ZeroMQ Distributed Messaging (libzmq)", @@ -503,7 +518,8 @@ static struct plugin p = { .init = zeromq_init, .deinit = zeromq_deinit, .read = zeromq_read, - .write = zeromq_write + .write = zeromq_write, + .fd = zeromq_fd } };