From f6edec4d4120d4fc71f9d1ec9f39e7c7878337d1 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Wed, 30 Aug 2017 23:53:35 +0200 Subject: [PATCH] add mux feature --- include/villas/path.h | 20 +++- lib/path.c | 249 +++++++++++++++++++++++++++--------------- 2 files changed, 176 insertions(+), 93 deletions(-) diff --git a/include/villas/path.h b/include/villas/path.h index 183c4b68c..a12d0b598 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -48,32 +48,40 @@ struct path_source { struct node *node; struct pool pool; - struct mapping_entry *mapping; - - int samplelen; - pthread_t tid; + struct list hooks; /**< Read Hooks. */ + struct list mappings; /**< List of struct mapping_entry */ }; struct path_destination { struct node *node; struct queue queue; int queuelen; - pthread_t tid; + + struct list hooks; /**< Write Hooks. */ }; /** The datastructure for a path. */ struct path { enum state state; /**< Path state. */ + + struct { + int nfds; + struct pollfd *pfds; + } reader; + + struct pool pool; + struct sample *last_sample; struct list sources; /**< List of all incoming nodes (struct path_source). */ struct list destinations; /**< List of all outgoing nodes (struct path_destination). */ - struct list hooks; /**< List of function pointers to hooks. */ + struct list hooks; /**< Processing hooks. */ int enabled; /**< Is this path enabled. */ int reverse; /**< This path as a matching reverse path. */ int samplelen; int queuelen; + int sequence; pthread_t tid; /**< The thread id for this path. */ diff --git a/lib/path.c b/lib/path.c index e727681de..8fee9acb1 100644 --- a/lib/path.c +++ b/lib/path.c @@ -25,6 +25,7 @@ #include #include #include +#include #include "config.h" #include "utils.h" @@ -39,33 +40,28 @@ #include "stats.h" #include "node.h" -static void path_read(struct path *p) +static void path_read_source(struct path *p, struct path_source *ps) { - int recv; - int enqueue; - int enqueued; - int ready; /**< Number of blocks in smps[] which are allocated and ready to be used by node_read(). */ - - struct path_source *ps = list_first(&p->sources); - + int ready, recv, mux, enqueue, enqueued; int cnt = ps->node->vectorize; - struct sample *smps[cnt]; + struct sample *read_smps[cnt]; + struct sample *muxed_smps[cnt]; /* Fill smps[] free sample blocks from the pool */ - ready = sample_alloc(&ps->pool, smps, cnt); + ready = sample_alloc(&ps->pool, read_smps, cnt); if (ready != cnt) - warn("Pool underrun for path %s", path_name(p)); + warn("Pool underrun for path source %s", node_name(ps->node)); /* Read ready samples and store them to blocks pointed by smps[] */ - recv = node_read(ps->node, smps, ready); + recv = node_read(ps->node, read_smps, ready); if (recv < 0) error("Failed to receive message from node %s", node_name(ps->node)); else if (recv < ready) warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready); - - /* Run preprocessing hooks for vector of samples */ - enqueue = hook_read_list(&p->hooks, smps, recv); + + /* Run read hooks */ + enqueue = hook_read_list(&p->hooks, read_smps, recv); if (enqueue != recv) { debug(LOG_PATH | 10, "Hooks skipped %u out of %u samples for path %s", recv - enqueue, recv, path_name(p)); @@ -73,22 +69,63 @@ static void path_read(struct path *p) stats_update(p->stats->delta, STATS_SKIPPED, recv - enqueue); } + /* Mux samples */ + mux = sample_alloc(&p->pool, muxed_smps, recv); + if (mux != recv) + warn("Pool underrun for path %s", path_name(p)); + + for (int i = 0; i < mux; i++) { + mapping_remap(&ps->mappings, p->last_sample, read_smps[i], NULL); + + p->last_sample->sequence = p->sequence++; + + sample_copy(muxed_smps[i], p->last_sample); + } + + /* Run processing hooks */ + enqueue = hook_process_list(&p->hooks, muxed_smps, mux); + if (enqueue != mux) { + debug(LOG_PATH | 10, "Hooks skipped %u out of %u samples for path %s", mux - enqueue, recv, path_name(p)); + + if (p->stats) + stats_update(p->stats->delta, STATS_SKIPPED, mux - enqueue); + } + /* Keep track of the lowest index that wasn't enqueued; * all following samples must be freed here */ for (size_t i = 0; i < list_length(&p->destinations); i++) { struct path_destination *pd = list_at(&p->destinations, i); - enqueued = queue_push_many(&pd->queue, (void **) smps, enqueue); + enqueued = queue_push_many(&pd->queue, (void **) muxed_smps, enqueue); if (enqueue != enqueued) warn("Queue overrun for path %s", path_name(p)); /* Increase reference counter of these samples as they are now also owned by the queue. */ - sample_get_many(smps, enqueued); + sample_get_many(muxed_smps, enqueued); debug(LOG_PATH | 15, "Enqueued %u samples from %s to queue of %s", enqueued, node_name(ps->node), node_name(pd->node)); } - sample_put_many(smps, ready); + sample_put_many(muxed_smps, ready); +} + +static void path_poll(struct path *p) +{ + int ret; + + ret = poll(p->reader.pfds, p->reader.nfds, -1); + if (ret < 0) + serror("Failed to poll"); + + int updates = 0; + for (int i = 0; i < p->reader.nfds; i++) { + struct path_source *ps = list_at(&p->sources, i); + + if (p->reader.pfds[i].revents & POLLIN) { + path_read_source(p, ps); + updates++; + } + } } static void path_write(struct path *p) @@ -137,7 +174,7 @@ static void * path_run(void *arg) struct path *p = arg; for (;;) { - path_read(p); + path_poll(p); path_write(p); } @@ -182,6 +219,77 @@ int path_init(struct path *p, struct super_node *sn) return 0; } +int path_init2(struct path *p) +{ + int ret, queuelen = 0; + + assert(p->state == STATE_CHECKED); + + /* Add internal hooks if they are not already in the list*/ + for (size_t i = 0; i < list_length(&plugins); i++) { + struct plugin *q = list_at(&plugins, i); + + if (q->type == PLUGIN_TYPE_HOOK) { + struct hook_type *vt = &q->hook; + + if (vt->builtin) { + struct hook *h = alloc(sizeof(struct hook)); + + ret = hook_init(h, vt, p); + if (ret) { + free(h); + return ret; + } + + list_push(&p->hooks, h); + } + } + } + + /* We sort the hooks according to their priority before starting the path */ + list_sort(&p->hooks, hook_cmp_priority); + + /* Initialize destinations */ + for (size_t i = 0; i < list_length(&p->destinations); i++) { + struct path_destination *pd = list_at(&p->destinations, i); + + ret = queue_init(&pd->queue, pd->queuelen, &memtype_hugepage); + if (ret) + return ret; + + if (pd->queuelen > queuelen) + queuelen = pd->queuelen; + } + + /* Initialize sources */ + for (size_t i = 0; i < list_length(&p->sources); i++) { + struct path_source *ps = list_at(&p->sources, i); + + ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, queuelen), SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage); + if (ret) + error("Failed to allocate memory pool for path"); + } + + ret = pool_init(&p->pool, MAX(DEFAULT_QUEUELEN, queuelen), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage); + if (ret) + return ret; + + sample_alloc(&p->pool, &p->last_sample, 1); + + /* Prepare poll() */ + p->reader.nfds = list_length(&p->sources); + p->reader.pfds = alloc(sizeof(struct pollfd) * p->reader.nfds); + + for (int i = 0; i < p->reader.nfds; i++) { + struct path_source *ps = list_at(&p->sources, i); + + p->reader.pfds[i].fd = node_fd(ps->node); + p->reader.pfds[i].events = POLLIN; + } + + return 0; +} + int path_parse(struct path *p, json_t *cfg, struct list *nodes) { int ret; @@ -223,22 +331,33 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) jerror(&err, "Failed to parse output nodes"); } - if (json_hooks) { - ret = hook_parse_list(&p->hooks, json_hooks, p); - if (ret) - return ret; - } - for (size_t i = 0; i < list_length(&sources); i++) { struct mapping_entry *me = list_at(&sources, i); - struct path_source *ps = alloc(sizeof(struct path_source)); + struct path_source *ps = NULL; - ps->node = me->node; - ps->samplelen = p->samplelen; - ps->mapping = me; + /* Check if there is already a path_source for this source */ + for (size_t j = 0; j < list_length(&p->sources); j++) { + struct path_source *pt = list_at(&p->sources, j); + + if (pt->node == me->node) { + ps = pt; + break; + } + } - list_push(&p->sources, ps); + if (!ps) { + ps = alloc(sizeof(struct path_source)); + + ps->node = me->node; + + list_init(&ps->mappings); + list_init(&ps->hooks); + + list_push(&p->sources, ps); + } + + list_push(&ps->mappings, me); } for (size_t i = 0; i < list_length(&destinations); i++) { @@ -248,10 +367,18 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) pd->node = n; pd->queuelen = p->queuelen; + + list_init(&pd->hooks); list_push(&p->destinations, pd); } + if (json_hooks) { + ret = hook_parse_list(&p->hooks, json_hooks, p); + if (ret) + return ret; + } + list_destroy(&sources, NULL, false); list_destroy(&destinations, NULL, false); @@ -289,67 +416,13 @@ int path_check(struct path *p) return 0; } -int path_init2(struct path *p) -{ - int ret, queuelen = 0; - - assert(p->state == STATE_CHECKED); - - /* Add internal hooks if they are not already in the list*/ - for (size_t i = 0; i < list_length(&plugins); i++) { - struct plugin *q = list_at(&plugins, i); - - if (q->type == PLUGIN_TYPE_HOOK) { - struct hook_type *vt = &q->hook; - - if (vt->builtin) { - struct hook *h = alloc(sizeof(struct hook)); - - ret = hook_init(h, vt, p); - if (ret) { - free(h); - return ret; - } - - list_push(&p->hooks, h); - } - } - } - - /* We sort the hooks according to their priority before starting the path */ - list_sort(&p->hooks, hook_cmp_priority); - - /* Initialize destinations */ - for (size_t i = 0; i < list_length(&p->destinations); i++) { - struct path_destination *pd = list_at(&p->destinations, i); - - ret = queue_init(&pd->queue, pd->queuelen, &memtype_hugepage); - if (ret) - error("Failed to initialize queue for path"); - - if (pd->queuelen > queuelen) - queuelen = pd->queuelen; - } - - /* Initialize sources */ - for (size_t i = 0; i < list_length(&p->sources); i++) { - struct path_source *ps = list_at(&p->sources, i); - - ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, queuelen), SAMPLE_LEN(ps->samplelen), &memtype_hugepage); - if (ret) - error("Failed to allocate memory pool for path"); - } - - return 0; -} - int path_start(struct path *p) { int ret; assert(p->state == STATE_CHECKED); - info("Starting path %s: #hooks=%zu, #destinations=%zu", path_name(p), list_length(&p->hooks), list_length(&p->destinations)); + info("Starting path %s: #hooks=%zu, #sources=%zu, #destinations=%zu", path_name(p), list_length(&p->hooks), list_length(&p->sources), list_length(&p->destinations)); for (size_t i = 0; i < list_length(&p->hooks); i++) { struct hook *h = list_at(&p->hooks, i); @@ -358,6 +431,8 @@ int path_start(struct path *p) if (ret) return ret; } + + p->sequence = 0; /* Start one thread per path for sending to destinations */ ret = pthread_create(&p->tid, NULL, &path_run, p); @@ -369,7 +444,6 @@ int path_start(struct path *p) return 0; } - int path_stop(struct path *p) { int ret; @@ -407,6 +481,8 @@ int path_destroy(struct path *p) if (p->_name) free(p->_name); + + pool_destroy(&p->pool); p->state = STATE_DESTROYED; @@ -478,7 +554,6 @@ int path_reverse(struct path *p, struct path *r) new_pd->queuelen = orig_pd->queuelen; new_ps->node = orig_pd->node; - new_ps->samplelen = orig_ps->samplelen; list_push(&r->destinations, new_pd); list_push(&r->sources, new_ps);