1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

path: add a new configuration setting to switch between poll and single mode

This commit is contained in:
Steffen Vogel 2018-05-23 02:24:55 +02:00
parent 76073cddcf
commit b21b975a17
2 changed files with 59 additions and 32 deletions

View file

@ -90,6 +90,7 @@ struct path {
double rate; /**< A timeout for */
int enabled; /**< Is this path enabled. */
int poll; /**< Weather or not to use poll(2). */
int reverse; /**< This path as a matching reverse path. */
int builtin; /**< This path should use built-in hooks by default. */
int queuelen; /**< The queue length for each path_destination::queue */

View file

@ -245,6 +245,8 @@ static void * path_run_poll(void *arg)
if (ret < 0)
serror("Failed to poll");
debug(10, "Path %s returned from poll(2)", path_name(p));
for (int i = 0; i < p->reader.nfds; i++) {
struct path_source *ps = (struct path_source *) list_at(&p->sources, i);
@ -290,6 +292,7 @@ int path_init(struct path *p)
p->builtin = 1;
p->reverse = 0;
p->enabled = 1;
p->poll = -1;
p->queuelen = DEFAULT_QUEUELEN;
#ifdef WITH_HOOKS
@ -325,6 +328,41 @@ int path_init(struct path *p)
return 0;
}
int path_init_poll(struct path *p)
{
int ret, nfds;
nfds = list_length(&p->sources);
if (p->rate > 0)
nfds++;
p->reader.nfds = nfds;
p->reader.pfds = alloc(sizeof(struct pollfd) * p->reader.nfds);
for (int i = 0; i < list_length(&p->sources); i++) {
struct path_source *ps = (struct path_source *) list_at(&p->sources, i);
/* This slot is only used if it is not masked */
p->reader.pfds[i].events = POLLIN;
p->reader.pfds[i].fd = node_fd(ps->node);
//if (p->reader.pfds[i].fd < 0)
// error("Failed to get file descriptor for node %s", node_name(ps->node));
}
/* We use the last slot for the timeout timer. */
if (p->rate > 0) {
ret = task_init(&p->timeout, p->rate, CLOCK_MONOTONIC);
if (ret)
return ret;
p->reader.pfds[nfds-1].fd = task_fd(&p->timeout);
p->reader.pfds[nfds-1].events = POLLIN;
}
return 0;
}
int path_init2(struct path *p)
{
int ret;
@ -388,30 +426,10 @@ int path_init2(struct path *p)
return -1;
/* Prepare poll() */
int nfds = list_length(&p->sources);
if (p->rate > 0)
nfds++;
p->reader.nfds = nfds;
p->reader.pfds = alloc(sizeof(struct pollfd) * p->reader.nfds);
for (int i = 0; i < list_length(&p->sources); i++) {
struct path_source *ps = (struct path_source *) list_at(&p->sources, i);
/* This slot is only used if it is not masked */
p->reader.pfds[i].events = POLLIN;
p->reader.pfds[i].fd = node_fd(ps->node);
}
/* We use the last slot for the timeout timer. */
if (p->rate > 0) {
ret = task_init(&p->timeout, p->rate, CLOCK_MONOTONIC);
if (p->poll) {
ret = path_init_poll(p);
if (ret)
return ret;
p->reader.pfds[nfds-1].fd = task_fd(&p->timeout);
p->reader.pfds[nfds-1].events = POLLIN;
}
return 0;
@ -435,7 +453,7 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes)
list_init(&sources);
list_init(&destinations);
ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: b, s?: i, s?: s, s?: F, s?: o }",
ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: b, s?: i, s?: s, s?: b, s?: F, s?: o }",
"in", &json_in,
"out", &json_out,
"hooks", &json_hooks,
@ -444,6 +462,7 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes)
"builtin", &p->builtin,
"queuelen", &p->queuelen,
"mode", &mode,
"poll", &p->poll,
"rate", &p->rate,
"mask", &json_mask
);
@ -565,6 +584,17 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes)
}
#endif /* WITH_HOOKS */
/* Autodetect whether to use poll() for this path or not */
if (p->poll == -1) {
struct path_source *ps = (struct path_source *) list_at(&p->sources, 0);
p->poll = (
p->rate > 0 ||
list_length(&p->sources) > 1
)
&& node_fd(ps->node) != 1;
}
list_destroy(&sources, NULL, false);
list_destroy(&destinations, NULL, false);
@ -620,13 +650,14 @@ int path_start(struct path *p)
mask = bitset_dump(&p->mask);
info("Starting path %s: mode=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, samplelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu",
info("Starting path %s: mode=%s, poll=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, samplelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu",
path_name(p),
mode,
p->poll ? "yes" : "no",
mask,
p->rate,
p->enabled ? "yes": "no",
p->reverse ? "yes": "no",
p->enabled ? "yes" : "no",
p->reverse ? "yes" : "no",
p->queuelen, p->samplelen,
list_length(&p->hooks),
list_length(&p->sources),
@ -676,12 +707,7 @@ int path_start(struct path *p)
* does not offer a file descriptor for polling, we will use a special
* thread function.
*/
struct path_source *ps = (struct path_source *) list_at(&p->sources, 0);
if (list_length(&p->sources) == 1 && node_fd(ps->node) == -1)
ret = pthread_create(&p->tid, NULL, &path_run_single, p);
else
ret = pthread_create(&p->tid, NULL, &path_run_poll, p);
ret = pthread_create(&p->tid, NULL, p->poll ? path_run_poll : path_run_single, p);
if (ret)
return ret;