From b21b975a1725425c5cbab20acac971bcfd24f61c Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 23 May 2018 02:24:55 +0200 Subject: [PATCH] path: add a new configuration setting to switch between poll and single mode --- include/villas/path.h | 1 + lib/path.c | 90 ++++++++++++++++++++++++++++--------------- 2 files changed, 59 insertions(+), 32 deletions(-) diff --git a/include/villas/path.h b/include/villas/path.h index 7da01f817..1db52dcf4 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -90,6 +90,7 @@ struct path { double rate; /**< A timeout for */ int enabled; /**< Is this path enabled. */ + int poll; /**< Weather or not to use poll(2). */ int reverse; /**< This path as a matching reverse path. */ int builtin; /**< This path should use built-in hooks by default. */ int queuelen; /**< The queue length for each path_destination::queue */ diff --git a/lib/path.c b/lib/path.c index 0c5f21d65..2f93e6e14 100644 --- a/lib/path.c +++ b/lib/path.c @@ -245,6 +245,8 @@ static void * path_run_poll(void *arg) if (ret < 0) serror("Failed to poll"); + debug(10, "Path %s returned from poll(2)", path_name(p)); + for (int i = 0; i < p->reader.nfds; i++) { struct path_source *ps = (struct path_source *) list_at(&p->sources, i); @@ -290,6 +292,7 @@ int path_init(struct path *p) p->builtin = 1; p->reverse = 0; p->enabled = 1; + p->poll = -1; p->queuelen = DEFAULT_QUEUELEN; #ifdef WITH_HOOKS @@ -325,6 +328,41 @@ int path_init(struct path *p) return 0; } +int path_init_poll(struct path *p) +{ + int ret, nfds; + + nfds = list_length(&p->sources); + if (p->rate > 0) + nfds++; + + p->reader.nfds = nfds; + p->reader.pfds = alloc(sizeof(struct pollfd) * p->reader.nfds); + + for (int i = 0; i < list_length(&p->sources); i++) { + struct path_source *ps = (struct path_source *) list_at(&p->sources, i); + + /* This slot is only used if it is not masked */ + p->reader.pfds[i].events = POLLIN; + p->reader.pfds[i].fd = node_fd(ps->node); + + //if (p->reader.pfds[i].fd < 0) + // error("Failed to get file descriptor for node %s", node_name(ps->node)); + } + + /* We use the last slot for the timeout timer. */ + if (p->rate > 0) { + ret = task_init(&p->timeout, p->rate, CLOCK_MONOTONIC); + if (ret) + return ret; + + p->reader.pfds[nfds-1].fd = task_fd(&p->timeout); + p->reader.pfds[nfds-1].events = POLLIN; + } + + return 0; +} + int path_init2(struct path *p) { int ret; @@ -388,30 +426,10 @@ int path_init2(struct path *p) return -1; /* Prepare poll() */ - int nfds = list_length(&p->sources); - - if (p->rate > 0) - nfds++; - - p->reader.nfds = nfds; - p->reader.pfds = alloc(sizeof(struct pollfd) * p->reader.nfds); - - for (int i = 0; i < list_length(&p->sources); i++) { - struct path_source *ps = (struct path_source *) list_at(&p->sources, i); - - /* This slot is only used if it is not masked */ - p->reader.pfds[i].events = POLLIN; - p->reader.pfds[i].fd = node_fd(ps->node); - } - - /* We use the last slot for the timeout timer. */ - if (p->rate > 0) { - ret = task_init(&p->timeout, p->rate, CLOCK_MONOTONIC); + if (p->poll) { + ret = path_init_poll(p); if (ret) return ret; - - p->reader.pfds[nfds-1].fd = task_fd(&p->timeout); - p->reader.pfds[nfds-1].events = POLLIN; } return 0; @@ -435,7 +453,7 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) list_init(&sources); list_init(&destinations); - ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: b, s?: i, s?: s, s?: F, s?: o }", + ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: b, s?: i, s?: s, s?: b, s?: F, s?: o }", "in", &json_in, "out", &json_out, "hooks", &json_hooks, @@ -444,6 +462,7 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) "builtin", &p->builtin, "queuelen", &p->queuelen, "mode", &mode, + "poll", &p->poll, "rate", &p->rate, "mask", &json_mask ); @@ -565,6 +584,17 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) } #endif /* WITH_HOOKS */ + /* Autodetect whether to use poll() for this path or not */ + if (p->poll == -1) { + struct path_source *ps = (struct path_source *) list_at(&p->sources, 0); + + p->poll = ( + p->rate > 0 || + list_length(&p->sources) > 1 + ) + && node_fd(ps->node) != 1; + } + list_destroy(&sources, NULL, false); list_destroy(&destinations, NULL, false); @@ -620,13 +650,14 @@ int path_start(struct path *p) mask = bitset_dump(&p->mask); - info("Starting path %s: mode=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, samplelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu", + info("Starting path %s: mode=%s, poll=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, samplelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu", path_name(p), mode, + p->poll ? "yes" : "no", mask, p->rate, - p->enabled ? "yes": "no", - p->reverse ? "yes": "no", + p->enabled ? "yes" : "no", + p->reverse ? "yes" : "no", p->queuelen, p->samplelen, list_length(&p->hooks), list_length(&p->sources), @@ -676,12 +707,7 @@ int path_start(struct path *p) * does not offer a file descriptor for polling, we will use a special * thread function. */ - struct path_source *ps = (struct path_source *) list_at(&p->sources, 0); - if (list_length(&p->sources) == 1 && node_fd(ps->node) == -1) - ret = pthread_create(&p->tid, NULL, &path_run_single, p); - else - ret = pthread_create(&p->tid, NULL, &path_run_poll, p); - + ret = pthread_create(&p->tid, NULL, p->poll ? path_run_poll : path_run_single, p); if (ret) return ret;