mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
node: allow nodes to have multiple filedescriptors
This commit is contained in:
parent
4e55971faa
commit
d3ff9f2d41
19 changed files with 113 additions and 70 deletions
|
@ -177,7 +177,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
|
|||
|
||||
int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release);
|
||||
|
||||
int node_fd(struct node *n);
|
||||
int node_poll_fds(struct node *n, int fds[]);
|
||||
|
||||
struct node_type * node_type(struct node *n);
|
||||
|
||||
|
|
|
@ -207,13 +207,13 @@ struct node_type {
|
|||
*/
|
||||
int (*reverse)(struct node *n);
|
||||
|
||||
/** Return a file descriptor which can be used by poll / select to detect the availability of new data.
|
||||
/** Get list of file descriptors which can be used by poll/select to detect the availability of new data.
|
||||
*
|
||||
* This callback is optional.
|
||||
*
|
||||
* @return This file descriptor.
|
||||
* @return The number of file descriptors which have been put into \p fds.
|
||||
*/
|
||||
int (*fd)(struct node *n);
|
||||
int (*poll_fds)(struct node *n, int fds[]);
|
||||
|
||||
/** Return a memory allocator which should be used for sample pools passed to this node. */
|
||||
struct memory_type * (*memory_type)(struct node *n, struct memory_type *parent);
|
||||
|
|
|
@ -609,7 +609,10 @@ int node_reverse(struct node *n)
|
|||
return node_type(n)->reverse ? node_type(n)->reverse(n) : -1;
|
||||
}
|
||||
|
||||
int node_fd(struct node *n)
|
||||
int node_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
return node_type(n)->poll_fds ? node_type(n)->poll_fds(n, fds) : -1;
|
||||
}
|
||||
{
|
||||
return node_type(n)->fd ? node_type(n)->fd(n) : -1;
|
||||
}
|
||||
|
|
|
@ -356,13 +356,15 @@ int amqp_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
|
|||
return cnt;
|
||||
}
|
||||
|
||||
int amqp_fd(struct node *n)
|
||||
int amqp_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct amqp *a = n->_vd;
|
||||
|
||||
amqp_socket_t *sock = amqp_get_socket(a->consumer);
|
||||
|
||||
return amqp_socket_get_sockfd(sock);
|
||||
fds[0] = amqp_socket_get_sockfd(sock);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int amqp_destroy(struct node *n)
|
||||
|
@ -404,7 +406,7 @@ static struct plugin p = {
|
|||
.read = amqp_read,
|
||||
.write = amqp_write,
|
||||
.destroy = amqp_destroy,
|
||||
.fd = amqp_fd
|
||||
.poll_fds = amqp_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -154,11 +154,13 @@ int cbuilder_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned
|
|||
return 1;
|
||||
}
|
||||
|
||||
int cbuilder_fd(struct node *n)
|
||||
int cbuilder_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct cbuilder *cb = (struct cbuilder *) n->_vd;
|
||||
|
||||
return cb->eventfd;
|
||||
fds[0] = cb->eventfd;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -173,7 +175,7 @@ static struct plugin p = {
|
|||
.stop = cbuilder_stop,
|
||||
.read = cbuilder_read,
|
||||
.write = cbuilder_write,
|
||||
.fd = cbuilder_fd
|
||||
.poll_fds = cbuilder_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -978,10 +978,13 @@ void comedi_dump_cmd(comedi_cmd *cmd, int debug_level)
|
|||
debug(LOG_COMEDI | debug_level, "stop: %-8s %u", src, cmd->stop_arg);
|
||||
}
|
||||
|
||||
int comedi_fd(struct node *n)
|
||||
int comedi_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct comedi *c = (struct comedi *) n->_vd;
|
||||
return comedi_fileno(c->dev);
|
||||
|
||||
fds[0] = comedi_fileno(c->dev);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -997,7 +1000,7 @@ static struct plugin p = {
|
|||
.stop = comedi_stop,
|
||||
.read = comedi_read,
|
||||
.write = comedi_write,
|
||||
.fd = comedi_fd
|
||||
.poll_fds = comedi_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -421,15 +421,19 @@ int file_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
|
|||
return cnt;
|
||||
}
|
||||
|
||||
int file_fd(struct node *n)
|
||||
int file_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct file *f = (struct file *) n->_vd;
|
||||
|
||||
if (f->rate)
|
||||
return task_fd(&f->task);
|
||||
if (f->rate) {
|
||||
fds[0] = task_fd(&f->task);
|
||||
return 1;
|
||||
}
|
||||
else {
|
||||
if (f->epoch_mode == FILE_EPOCH_ORIGINAL)
|
||||
return io_fd(&f->io);
|
||||
if (f->epoch_mode == FILE_EPOCH_ORIGINAL) {
|
||||
fds[0] = io_fd(&f->io);
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
return -1; /** @todo not supported yet */
|
||||
}
|
||||
|
@ -449,7 +453,7 @@ static struct plugin p = {
|
|||
.restart = file_restart,
|
||||
.read = file_read,
|
||||
.write = file_write,
|
||||
.fd = file_fd
|
||||
.poll_fds = file_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -416,11 +416,13 @@ int iec61850_sv_write(struct node *n, struct sample *smps[], unsigned cnt, unsig
|
|||
return cnt;
|
||||
}
|
||||
|
||||
int iec61850_sv_fd(struct node *n)
|
||||
int iec61850_sv_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct iec61850_sv *i = (struct iec61850_sv *) n->_vd;
|
||||
|
||||
return queue_signalled_fd(&i->in.queue);
|
||||
fds[0] = queue_signalled_fd(&i->in.queue);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -439,7 +441,7 @@ static struct plugin p = {
|
|||
.destroy = iec61850_sv_destroy,
|
||||
.read = iec61850_sv_read,
|
||||
.write = iec61850_sv_write,
|
||||
.fd = iec61850_sv_fd
|
||||
.poll_fds = iec61850_sv_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -136,11 +136,13 @@ char * loopback_print(struct node *n)
|
|||
return buf;
|
||||
}
|
||||
|
||||
int loopback_fd(struct node *n)
|
||||
int loopback_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct loopback *l = (struct loopback *) n->_vd;
|
||||
|
||||
return queue_signalled_fd(&l->queue);
|
||||
fds[0] = queue_signalled_fd(&l->queue);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -149,15 +151,15 @@ static struct plugin p = {
|
|||
.type = PLUGIN_TYPE_NODE,
|
||||
.node = {
|
||||
.vectorize = 0,
|
||||
.flags = NODE_TYPE_PROVIDES_SIGNALS,
|
||||
.size = sizeof(struct loopback),
|
||||
.parse = loopback_parse,
|
||||
.print = loopback_print,
|
||||
.start = loopback_start,
|
||||
.stop = loopback_stop,
|
||||
.read = loopback_read,
|
||||
.write = loopback_write,
|
||||
.fd = loopback_fd
|
||||
.flags = NODE_TYPE_PROVIDES_SIGNALS,
|
||||
.size = sizeof(struct loopback),
|
||||
.parse = loopback_parse,
|
||||
.print = loopback_print,
|
||||
.start = loopback_start,
|
||||
.stop = loopback_stop,
|
||||
.read = loopback_read,
|
||||
.write = loopback_write,
|
||||
.poll_fds = loopback_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -406,18 +406,19 @@ int mqtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
|
|||
return -abs(ret);
|
||||
}
|
||||
}
|
||||
else {
|
||||
warning("MQTT: no publish for node %s possible because no publish topic is given", node_name(n));
|
||||
}
|
||||
else
|
||||
warning("MQTT: no publish for node %s possible because no publish topic is given", node_name(n));
|
||||
|
||||
return cnt;
|
||||
}
|
||||
|
||||
int mqtt_fd(struct node *n)
|
||||
int mqtt_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct mqtt *m = (struct mqtt *) n->_vd;
|
||||
|
||||
return queue_signalled_fd(&m->queue);
|
||||
fds[0] = queue_signalled_fd(&m->queue);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -437,7 +438,7 @@ static struct plugin p = {
|
|||
.stop = mqtt_stop,
|
||||
.read = mqtt_read,
|
||||
.write = mqtt_write,
|
||||
.fd = mqtt_fd
|
||||
.poll_fds = mqtt_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -264,7 +264,7 @@ int nanomsg_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned
|
|||
return cnt;
|
||||
}
|
||||
|
||||
int nanomsg_fd(struct node *n)
|
||||
int nanomsg_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
int ret;
|
||||
struct nanomsg *m = (struct nanomsg *) n->_vd;
|
||||
|
@ -276,7 +276,9 @@ int nanomsg_fd(struct node *n)
|
|||
if (ret)
|
||||
return ret;
|
||||
|
||||
return fd;
|
||||
fds[0] = fd;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -294,7 +296,7 @@ static struct plugin p = {
|
|||
.stop = nanomsg_stop,
|
||||
.read = nanomsg_read,
|
||||
.write = nanomsg_write,
|
||||
.fd = nanomsg_fd
|
||||
.poll_fds = nanomsg_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -572,11 +572,13 @@ int ngsi_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
|
|||
return ret ? 0 : cnt;
|
||||
}
|
||||
|
||||
int ngsi_fd(struct node *n)
|
||||
int ngsi_poll_fds(struct node *n)
|
||||
{
|
||||
struct ngsi *i = (struct ngsi *) n->_vd;
|
||||
|
||||
return task_fd(&i->task);
|
||||
fds[0] = task_fd(&i->task);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -594,7 +596,7 @@ static struct plugin p = {
|
|||
.stop = ngsi_stop,
|
||||
.read = ngsi_read,
|
||||
.write = ngsi_write,
|
||||
.fd = ngsi_fd
|
||||
.poll_fds = ngsi_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -289,11 +289,13 @@ char * signal_generator_print(struct node *n)
|
|||
return buf;
|
||||
}
|
||||
|
||||
int signal_generator_fd(struct node *n)
|
||||
int signal_generator_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct signal_generator *s = (struct signal_generator *) n->_vd;
|
||||
|
||||
return task_fd(&s->task);
|
||||
fds[0] = task_fd(&s->task);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -309,7 +311,7 @@ static struct plugin p = {
|
|||
.start = signal_generator_start,
|
||||
.stop = signal_generator_stop,
|
||||
.read = signal_generator_read,
|
||||
.fd = signal_generator_fd
|
||||
.poll_fds = signal_generator_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -844,11 +844,13 @@ int socket_compare_addr(struct sockaddr *x, struct sockaddr *y)
|
|||
#undef CMP
|
||||
}
|
||||
|
||||
int socket_fd(struct node *n)
|
||||
int socket_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct socket *s = (struct socket *) n->_vd;
|
||||
|
||||
return s->sd;
|
||||
fds[0] = s->sd;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -873,7 +875,7 @@ static struct plugin p = {
|
|||
.stop = socket_stop,
|
||||
.read = socket_read,
|
||||
.write = socket_write,
|
||||
.fd = socket_fd
|
||||
.poll_fds = socket_fds,
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -197,11 +197,13 @@ int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigne
|
|||
return 1;
|
||||
}
|
||||
|
||||
int stats_node_fd(struct node *n)
|
||||
int stats_node_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
return task_fd(&s->task);
|
||||
fds[0] = task_fd(&s->task);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -219,7 +221,7 @@ static struct plugin p = {
|
|||
.start = stats_node_start,
|
||||
.stop = stats_node_stop,
|
||||
.read = stats_node_read,
|
||||
.fd = stats_node_fd
|
||||
.poll_fds = stats_node_poll_fds,
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -380,11 +380,13 @@ int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned
|
|||
return i;
|
||||
}
|
||||
|
||||
int test_rtt_fd(struct node *n)
|
||||
int test_rtt_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct test_rtt *t = (struct test_rtt *) n->_vd;
|
||||
|
||||
return task_fd(&t->task);
|
||||
fds[0] = task_fd(&t->task);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -402,7 +404,7 @@ static struct plugin p = {
|
|||
.stop = test_rtt_stop,
|
||||
.read = test_rtt_read,
|
||||
.write = test_rtt_write,
|
||||
.fd = test_rtt_fd
|
||||
.poll_fds = test_rtt_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -602,11 +602,13 @@ char * websocket_print(struct node *n)
|
|||
return buf;
|
||||
}
|
||||
|
||||
int websocket_fd(struct node *n)
|
||||
int websocket_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
struct websocket *w = (struct websocket *) n->_vd;
|
||||
|
||||
return queue_signalled_fd(&w->queue);
|
||||
fds[0] = queue_signalled_fd(&w->queue);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -624,7 +626,7 @@ static struct plugin p = {
|
|||
.write = websocket_write,
|
||||
.print = websocket_print,
|
||||
.parse = websocket_parse,
|
||||
.fd = websocket_fd
|
||||
.poll_fds = websocket_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -525,7 +525,7 @@ fail:
|
|||
return ret;
|
||||
}
|
||||
|
||||
int zeromq_fd(struct node *n)
|
||||
int zeromq_poll_fds(struct node *n, int fds[])
|
||||
{
|
||||
int ret;
|
||||
struct zeromq *z = (struct zeromq *) n->_vd;
|
||||
|
@ -537,7 +537,9 @@ int zeromq_fd(struct node *n)
|
|||
if (ret)
|
||||
return ret;
|
||||
|
||||
return fd;
|
||||
fds[0] = fd;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static struct plugin p = {
|
||||
|
@ -557,7 +559,7 @@ static struct plugin p = {
|
|||
.destroy = zeromq_destroy,
|
||||
.read = zeromq_read,
|
||||
.write = zeromq_write,
|
||||
.fd = zeromq_fd
|
||||
.poll_fds = zeromq_poll_fds
|
||||
}
|
||||
};
|
||||
|
||||
|
|
20
lib/path.c
20
lib/path.c
|
@ -365,12 +365,17 @@ int path_init_poll(struct path *p)
|
|||
for (int i = 0; i < vlist_length(&p->sources); i++) {
|
||||
struct path_source *ps = (struct path_source *) vlist_at(&p->sources, i);
|
||||
|
||||
/* This slot is only used if it is not masked */
|
||||
p->reader.pfds[i].events = POLLIN;
|
||||
p->reader.pfds[i].fd = node_fd(ps->node);
|
||||
int fds[16];
|
||||
int num_fds = node_poll_fds(ps->node, fds);
|
||||
|
||||
if (p->reader.pfds[i].fd < 0)
|
||||
error("Failed to get file descriptor for node %s", node_name(ps->node));
|
||||
for (int i = 0; i < num_fds; i++) {
|
||||
if (fds[i] < 0)
|
||||
error("Failed to get file descriptor for node %s", node_name(ps->node));
|
||||
|
||||
/* This slot is only used if it is not masked */
|
||||
p->reader.pfds[i].events = POLLIN;
|
||||
p->reader.pfds[i].fd = fds[i];
|
||||
}
|
||||
}
|
||||
|
||||
/* We use the last slot for the timeout timer. */
|
||||
|
@ -641,7 +646,10 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
|
|||
if (p->poll == -1) {
|
||||
struct path_source *ps = (struct path_source *) vlist_at(&p->sources, 0);
|
||||
|
||||
p->poll = (p->rate > 0 || vlist_length(&p->sources) > 1) && node_fd(ps->node) != -1;
|
||||
int fds[16];
|
||||
int num_fds = node_poll_fds(ps->node, fds);
|
||||
|
||||
p->poll = (p->rate > 0 || vlist_length(&p->sources) > 1) && num_fds > 0;
|
||||
}
|
||||
|
||||
ret = vlist_destroy(&sources, NULL, false);
|
||||
|
|
Loading…
Add table
Reference in a new issue