1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

add node_fd() support to most node types

This commit is contained in:
Steffen Vogel 2017-08-30 00:25:42 +02:00
parent f9324000fa
commit 9f3d806755
8 changed files with 88 additions and 7 deletions

View file

@ -187,6 +187,8 @@ int io_stream_fd(struct io *io)
case IO_MODE_CUSTOM:
return -1;
}
return -1;
}

View file

@ -339,6 +339,20 @@ int file_write(struct node *n, struct sample *smps[], unsigned cnt)
return cnt;
}
int file_fd(struct node *n)
{
struct file *f = n->_vd;
if (f->rate)
return task_fd(&f->task);
else {
if (f->epoch_mode == FILE_EPOCH_ORIGINAL)
return io_fd(&f->io);
else
return -1; /** @todo not supported yet */
}
}
static struct plugin p = {
.name = "file",
.description = "support for file log / replay node type",
@ -351,7 +365,8 @@ static struct plugin p = {
.start = file_start,
.stop = file_stop,
.read = file_read,
.write = file_write
.write = file_write,
.fd = file_fd
}
};

View file

@ -246,6 +246,21 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt)
return cnt;
}
int nanomsg_fd(struct node *n)
{
int ret;
struct nanomsg *m = n->_vd;
int fd;
size_t len = sizeof(fd);
ret = nn_getsockopt(m->subscriber.socket, NN_SOL_SOCKET, NN_RCVFD, &fd, &len);
if (ret)
return ret;
return fd;
}
static struct plugin p = {
.name = "nanomsg",
.description = "scalability protocols library (libnanomsg)",
@ -260,7 +275,8 @@ static struct plugin p = {
.stop = nanomsg_stop,
.deinit = nanomsg_deinit,
.read = nanomsg_read,
.write = nanomsg_write
.write = nanomsg_write,
.fd = nanomsg_fd
}
};

View file

@ -573,6 +573,13 @@ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt)
return ret ? 0 : cnt;
}
int ngsi_fd(struct node *n)
{
struct ngsi *i = n->_vd;
return task_fd(&i->task);
}
static struct plugin p = {
.name = "ngsi",
.description = "OMA Next Generation Services Interface 10 (libcurl, libjansson)",
@ -587,7 +594,8 @@ static struct plugin p = {
.read = ngsi_read,
.write = ngsi_write,
.init = ngsi_init,
.deinit = ngsi_deinit
.deinit = ngsi_deinit,
.fd = ngsi_fd
}
};

View file

@ -265,7 +265,14 @@ char * signal_print(struct node *n)
strcatf(&buf, ", limit=%d", s->limit);
return buf;
};
}
int signal_fd(struct node *n)
{
struct signal *s = n->_vd;
return task_fd(&s->task);
}
static struct plugin p = {
.name = "signal",
@ -280,6 +287,7 @@ static struct plugin p = {
.start = signal_open,
.stop = signal_close,
.read = signal_read,
.fd = signal_fd
}
};

View file

@ -703,6 +703,13 @@ int socket_compare_addr(struct sockaddr *x, struct sockaddr *y)
#undef CMP
}
int socket_fd(struct node *n)
{
struct socket *s = n->_vd;
return s->sd;
}
static struct plugin p = {
.name = "socket",
.description = "BSD network sockets for Ethernet / IP / UDP (libnl3)",
@ -719,7 +726,8 @@ static struct plugin p = {
.read = socket_read,
.write = socket_write,
.init = socket_init,
.deinit = socket_deinit
.deinit = socket_deinit,
.fd = socket_fd
}
};

View file

@ -235,6 +235,13 @@ int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt)
return i;
}
int test_rtt_fd(struct node *n)
{
struct test_rtt *t = n->_vd;
return task_fd(&t->task);
}
static struct plugin p = {
.name = "test_rtt",
.description = "Test round-trip time with loopback",
@ -247,7 +254,8 @@ static struct plugin p = {
.start = test_rtt_start,
.stop = test_rtt_stop,
.read = test_rtt_read,
.write = test_rtt_write
.write = test_rtt_write,
.fd = test_rtt_fd
}
};

View file

@ -488,6 +488,21 @@ fail:
return ret;
}
int zeromq_fd(struct node *n)
{
int ret;
struct zeromq *z = n->_vd;
int fd;
size_t len = sizeof(fd);
ret = zmq_getsockopt(z->subscriber.socket, ZMQ_FD, &fd, &len);
if (ret)
return ret;
return fd;
}
static struct plugin p = {
.name = "zeromq",
.description = "ZeroMQ Distributed Messaging (libzmq)",
@ -503,7 +518,8 @@ static struct plugin p = {
.init = zeromq_init,
.deinit = zeromq_deinit,
.read = zeromq_read,
.write = zeromq_write
.write = zeromq_write,
.fd = zeromq_fd
}
};