From b23484550f01455a602388ac16c27bdfd5eff321 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 7 Nov 2016 22:17:45 -0500 Subject: [PATCH] major rewrite with MPMC queues --- include/villas/path.h | 54 +++++---- lib/cfg.c | 78 +++++++------ lib/path.c | 265 ++++++++++++++++++++++++++++-------------- 3 files changed, 248 insertions(+), 149 deletions(-) diff --git a/include/villas/path.h b/include/villas/path.h index 0bcb4cf56..7bf71a003 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -26,55 +26,50 @@ #include "hooks.h" #include "queue.h" #include "pool.h" +#include "stats.h" -/** The datastructure for a path. - * - * @todo Add support for multiple outgoing nodes - */ +struct path_source +{ + struct node *node; + struct pool pool; + int samplelen; + pthread_t tid; +}; + +struct path_destination +{ + struct node *node; + struct queue queue; + int queuelen; + pthread_t tid; +}; + +/** The datastructure for a path. */ struct path { enum { PATH_INVALID, /**< Path is invalid. */ - PATH_CREATED, /**< Path has been created: lists initialized */ PATH_INITIALIZED, /**< Path queues, memory pools & hook system initialized. */ PATH_RUNNING, /**< Path is currently running. */ PATH_STOPPED, /**< Path has been stopped. */ PATH_DESTROYED /**< Path is destroyed. */ } state; /**< Path state */ - struct node *in; /**< Pointer to the incoming node */ - - struct queue queue; /**< A ring buffer for all received messages (unmodified) */ - struct pool pool; /**< Memory pool for messages / samples. */ + /* Each path has a single source and multiple destinations */ + struct path_source *source; /**< Pointer to the incoming node */ + struct list destinations; /**< List of all outgoing nodes (struct path_destination). */ - struct list destinations; /**< List of all outgoing nodes */ struct list hooks; /**< List of function pointers to hooks */ - int samplelen; /**< Maximum number of values per sample for this path. */ - int queuelen; /**< Size of sample queue for this path. */ int enabled; /**< Is this path enabled */ pthread_t tid; /**< The thread id for this path */ - - config_setting_t *cfg; /**< A pointer to the libconfig object which instantiated this path */ char *_name; /**< Singleton: A string which is used to print this path to screen. */ - /** The following fields are mostly managed by hook_ functions @{ */ + struct stats *stats; /**< Statistic counters. This is a pointer to the statistic hooks private data. */ - struct { - struct hist owd; /**< Histogram for one-way-delay (OWD) of received messages */ - struct hist gap_msg; /**< Histogram for inter message timestamps (as sent by remote) */ - struct hist gap_recv; /**< Histogram for inter message arrival time (as seen by this instance) */ - struct hist gap_seq; /**< Histogram of sequence number displacement of received messages */ - } hist; - - /* Statistic counters */ - uintmax_t invalid; /**< Counter for invalid messages */ - uintmax_t skipped; /**< Counter for skipped messages due to hooks */ - uintmax_t dropped; /**< Counter for dropped messages due to reordering */ - - /** @} */ + config_setting_t *cfg; /**< A pointer to the libconfig object which instantiated this path */ }; /** Initialize internal data structures. */ @@ -119,6 +114,9 @@ void path_print_stats(struct path *p); */ const char * path_name(struct path *p); +/** Reverse a path */ +int path_reverse(struct path *p, struct path *r); + /** Check if node is used as source or destination of a path. */ int path_uses_node(struct path *p, struct node *n); diff --git a/lib/cfg.c b/lib/cfg.c index a541de4c1..038744dc9 100644 --- a/lib/cfg.c +++ b/lib/cfg.c @@ -149,14 +149,14 @@ int cfg_parse_path(config_setting_t *cfg, { config_setting_t *cfg_out, *cfg_hook; const char *in; - int ret, reverse; + int ret, reverse, samplelen, queuelen; struct path *p; + + struct node *source; + struct list destinations; /* Allocate memory and intialize path structure */ p = alloc(sizeof(struct path)); - - list_init(&p->destinations); - list_init(&p->hooks); /* Input node */ if (!config_setting_lookup_string(cfg, "in", &in) && @@ -165,8 +165,8 @@ int cfg_parse_path(config_setting_t *cfg, !config_setting_lookup_string(cfg, "source", &in)) cerror(cfg, "Missing input node for path"); - p->in = list_lookup(nodes, in); - if (!p->in) + source = list_lookup(nodes, in); + if (!source) cerror(cfg, "Invalid input node '%s'", in); /* Output node(s) */ @@ -177,58 +177,66 @@ int cfg_parse_path(config_setting_t *cfg, !(cfg_out = config_setting_get_member(cfg, "sink"))) cerror(cfg, "Missing output nodes for path"); - ret = cfg_parse_nodelist(cfg_out, &p->destinations, nodes); + list_init(&destinations); + ret = cfg_parse_nodelist(cfg_out, &destinations, nodes); if (ret <= 0) cerror(cfg_out, "Invalid output nodes"); - - /* Check if nodes are suitable */ - if (p->in->_vt->read == NULL) - cerror(cfg, "Input node '%s' is not supported as a source.", node_name(p->in)); - - list_foreach(struct node *n, &p->destinations) { - if (n->_vt->write == NULL) - cerror(cfg_out, "Output node '%s' is not supported as a destination.", node_name(n)); - } /* Optional settings */ + list_init(&p->hooks); cfg_hook = config_setting_get_member(cfg, "hook"); if (cfg_hook) cfg_parse_hooklist(cfg_hook, &p->hooks); - if (!config_setting_lookup_int(cfg, "values", &p->samplelen)) - p->samplelen = DEFAULT_VALUES; - if (!config_setting_lookup_int(cfg, "queuelen", &p->queuelen)) - p->queuelen = DEFAULT_QUEUELEN; if (!config_setting_lookup_bool(cfg, "reverse", &reverse)) reverse = 0; if (!config_setting_lookup_bool(cfg, "enabled", &p->enabled)) p->enabled = 1; + if (!config_setting_lookup_int(cfg, "values", &samplelen)) + samplelen = DEFAULT_VALUES; + if (!config_setting_lookup_int(cfg, "queuelen", &queuelen)) + queuelen = DEFAULT_QUEUELEN; - if (!IS_POW2(p->queuelen)) { - p->queuelen = LOG2_CEIL(p->queuelen); - warn("Queue length should always be a power of 2. Adjusting to %d", p->queuelen); + if (!IS_POW2(queuelen)) { + queuelen = LOG2_CEIL(queuelen); + warn("Queue length should always be a power of 2. Adjusting to %d", queuelen); } p->cfg = cfg; + + /* Check if nodes are suitable */ + if (source->_vt->read == NULL) + cerror(cfg, "Input node '%s' is not supported as a source.", node_name(source)); + + p->source = alloc(sizeof(struct path_source)); + p->source->node = source; + p->source->samplelen = samplelen; + + list_foreach(struct node *n, &destinations) { + if (n->_vt->write == NULL) + cerror(cfg_out, "Output node '%s' is not supported as a destination.", node_name(n)); + + struct path_destination *pd = alloc(sizeof(struct path_destination)); + + pd->node = n; + pd->queuelen = queuelen; + + list_push(&p->destinations, pd); + } list_push(paths, p); if (reverse) { - if (list_length(&p->destinations) > 1) - cerror(cfg, "Can't reverse path with multiple destination nodes"); - - struct path *r = memdup(p, sizeof(struct path)); - path_init(r); - - /* Swap source and destination node */ - r->in = list_first(&p->destinations); - list_push(&r->destinations, p->in); - - if (cfg_hook) - cfg_parse_hooklist(cfg_hook, &r->hooks); + struct path *r = alloc(sizeof(struct path)); + + ret = path_reverse(p, r); + if (ret) + cerror(cfg, "Failed to reverse path %s", path_name(p)); list_push(paths, r); } + + list_destroy(&destinations, NULL, false); return 0; } diff --git a/lib/path.c b/lib/path.c index 7d22f768e..4a9045072 100644 --- a/lib/path.c +++ b/lib/path.c @@ -19,74 +19,103 @@ #include "pool.h" #include "queue.h" +static void path_read(struct path *p) +{ + int recv; + int enqueue; + int enqueued; + int ready = 0; /**< Number of blocks in smps[] which are allocated and ready to be used by node_read(). */ + + struct path_source *ps = p->source; + + int cnt = ps->node->vectorize; + + struct sample *smps[cnt]; + + /* Fill smps[] free sample blocks from the pool */ + ready += sample_alloc(&ps->pool, smps, cnt - ready); + if (ready != cnt) + warn("Pool underrun for path %s", path_name(p)); + + /* Read ready samples and store them to blocks pointed by smps[] */ + recv = node_read(ps->node, smps, ready); + if (recv < 0) + error("Failed to receive message from node %s", node_name(ps->node)); + else if (recv < ready) + warn("Partial read for path %s: read=%u expected=%u", path_name(p), recv, ready); + + debug(DBG_PATH | 15, "Received %u messages from node %s", recv, node_name(ps->node)); + + /* Run preprocessing hooks for vector of samples */ + enqueue = hook_run(p, smps, recv, HOOK_READ); + if (enqueue != recv) { + info("Hooks skipped %u out of %u samples for path %s", recv - enqueue, recv, path_name(p)); + + stats_update(p->stats, STATS_SKIPPED, recv - enqueue); + } + + list_foreach(struct path_destination *pd, &p->destinations) { + enqueued = queue_push_many(&pd->queue, (void **) smps, enqueue); + if (enqueue != enqueued) + warn("Queue overrun for path %s", path_name(p)); + + for (int i = 0; i < enqueued; i++) + sample_get(smps[i]); /* increase reference count */ + + debug(DBG_PATH | 15, "Enqueued %u samples to queue of path %s", enqueued, path_name(p)); + } +} + +static void path_write(struct path *p) +{ + list_foreach(struct path_destination *pd, &p->destinations) { + int cnt = pd->node->vectorize; + int sent; + int tosend; + int available; + int released; + + struct sample *smps[cnt]; + + /* As long as there are still samples in the queue */ + while (1) { + available = queue_pull_many(&pd->queue, (void **) smps, cnt); + if (available == 0) + break; + else if (available < cnt) + warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); + + debug(DBG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p)); + + tosend = hook_run(p, smps, available, HOOK_WRITE); + if (tosend == 0) + continue; + + sent = node_write(pd->node, smps, tosend); + if (sent < 0) + error("Failed to sent %u samples to node %s", cnt, node_name(pd->node)); + else if (sent < tosend) + warn("Partial write to node %s", node_name(pd->node)); + + debug(DBG_PATH | 15, "Sent %u messages to node %s", sent, node_name(pd->node)); + + released = 0; + for (int i = 0; i < sent; i++) + released += sample_put(smps[i]); + + debug(DBG_PATH | 15, "Released %d samples back to memory pool", released); + } + } +} + /** Main thread function per path: receive -> sent messages */ static void * path_run(void *arg) { struct path *p = arg; - unsigned cnt = p->in->vectorize; - int recv, enqueue, enqueued; - int ready = 0; /**< Number of blocks in smps[] which are allocated and ready to be used by node_read(). */ - struct sample *smps[cnt]; - /* Main thread loop */ for (;;) { - /* Fill smps[] free sample blocks from the pool */ - ready += sample_get_many(&p->pool, smps, cnt - ready); - if (ready != cnt) - warn("Pool underrun for path %s", path_name(p)); - - /* Read ready samples and store them to blocks pointed by smps[] */ - recv = p->in->_vt->read(p->in, smps, ready); - if (recv < 0) - error("Failed to receive message from node %s", node_name(p->in)); - else if (recv < ready) - warn("Partial read for path %s: read=%u expected=%u", path_name(p), recv, ready); - - debug(DBG_PATH | 15, "Received %u messages from node %s", recv, node_name(p->in)); - - /* Run preprocessing hooks for vector of samples */ - enqueue = hook_run(p, smps, recv, HOOK_READ); - if (enqueue != recv) { - info("Hooks skipped %u out of %u samples for path %s", recv - enqueue, recv, path_name(p)); - p->skipped += recv - enqueue; - } - - enqueued = queue_push_many(&p->queue, (void **) smps, enqueue); - if (enqueue != enqueued) - warn("Failed to enqueue %u samples for path %s", enqueue - enqueued, path_name(p)); - - ready -= enqueued; - - debug(DBG_PATH | 3, "Enqueuing %u samples to queue of path %s", enqueue, path_name(p)); - - list_foreach(struct node *n, &p->destinations) { - int cnt = n->vectorize; - int sent, tosend, available, released; - struct sample *smps[n->vectorize]; - - available = queue_pull_many(&p->queue, (void **) smps, cnt); - if (available < cnt) - warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); - - if (available == 0) - continue; - - tosend = hook_run(p, smps, available, HOOK_WRITE); - if (tosend == 0) - continue; - - sent = node_write(n, smps, tosend); - if (sent < 0) - error("Failed to sent %u samples to node %s", cnt, node_name(n)); - else if (sent < tosend) - warn("Partial write to node %s", node_name(n)); - - debug(DBG_PATH | 15, "Sent %u messages to node %s", sent, node_name(n)); - - released = pool_put_many(&p->pool, (void **) smps, sent); - if (sent != released) - warn("Failed to release %u samples to pool for path %s", sent - released, path_name(p)); - } + path_read(p); + path_write(p); } return NULL; @@ -126,10 +155,10 @@ int path_stop(struct path *p) const char * path_name(struct path *p) { if (!p->_name) { - strcatf(&p->_name, "%s " MAG("=>"), node_name_short(p->in)); + strcatf(&p->_name, "%s " MAG("=>"), node_name_short(p->source->node)); - list_foreach(struct node *n, &p->destinations) - strcatf(&p->_name, " %s", node_name_short(n)); + list_foreach(struct path_destination *pd, &p->destinations) + strcatf(&p->_name, " %s", node_name_short(pd->node)); } return p->_name; @@ -137,18 +166,20 @@ const char * path_name(struct path *p) int path_init(struct path *p) { - int ret; - + int ret, max_queuelen = 0; + /* Add internal hooks if they are not already in the list*/ list_foreach(struct hook *h, &hooks) { - if ((h->type & HOOK_INTERNAL) && (list_lookup(&p->hooks, h->name) == NULL)) + if ( + (h->type & HOOK_INTERNAL) && /* is internal hook? */ + (list_lookup(&p->hooks, h->name) == NULL) /* is not already in list? */ + ) list_push(&p->hooks, memdup(h, sizeof(struct hook))); } - + /* We sort the hooks according to their priority before starting the path */ list_sort(&p->hooks, hooks_sort_priority); - /* Allocate hook private memory for storing state / parameters */ ret = hook_run(p, NULL, 0, HOOK_INIT); if (ret) error("Failed to initialize hooks of path: %s", path_name(p)); @@ -157,36 +188,98 @@ int path_init(struct path *p) ret = hook_run(p, NULL, 0, HOOK_PARSE); if (ret) error("Failed to parse arguments for hooks of path: %s", path_name(p)); - - /* Initialize queue */ - ret = pool_init(&p->pool, SAMPLE_LEN(p->samplelen), p->queuelen, &memtype_hugepage); + + /* Initialize destinations */ + list_foreach(struct path_destination *pd, &p->destinations) { + ret = queue_init(&pd->queue, pd->queuelen, &memtype_hugepage); + if (ret) + error("Failed to initialize queue for path"); + + if (pd->queuelen > max_queuelen) + max_queuelen = pd->queuelen; + } + + /* Initialize source */ + ret = pool_init(&p->source->pool, max_queuelen, SAMPLE_LEN(p->source->samplelen), &memtype_hugepage); if (ret) error("Failed to allocate memory pool for path"); - ret = queue_init(&p->queue, p->queuelen, &memtype_hugepage); - if (ret) - error("Failed to initialize queue for path"); - p->state = PATH_INITIALIZED; return 0; } +void path_source_destroy(struct path_source *ps) +{ + pool_destroy(&ps->pool); +} + +void path_destination_destroy(struct path_destination *pd) +{ + queue_destroy(&pd->queue); +} + void path_destroy(struct path *p) { - hook_run(p, NULL, 0, HOOK_DEINIT); /* Release memory */ + list_destroy(&p->hooks, (dtor_cb_t) hook_destroy, true); + list_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true); - list_destroy(&p->destinations, NULL, false); - list_destroy(&p->hooks, NULL, true); - - queue_destroy(&p->queue); - pool_destroy(&p->pool); - - p->state = PATH_DESTROYED; + path_source_destroy(p->source); free(p->_name); + free(p->source); + + p->state = PATH_DESTROYED; } int path_uses_node(struct path *p, struct node *n) { - return (p->in == n) || list_contains(&p->destinations, n) ? 0 : 1; + list_foreach(struct path_destination *pd, &p->destinations) { + if (pd->node == n) + return 0; + } + + return p->source->node == n ? 0 : -1; } + +int path_reverse(struct path *p, struct path *r) +{ + int ret; + + if (list_length(&p->destinations) > 1) + return -1; + + struct path_destination *first_pd = list_first(&p->destinations); + + list_init(&r->destinations); + list_init(&r->hooks); + + /* General */ + r->enabled = p->enabled; + r->cfg = p->cfg; + + struct path_destination *pd = alloc(sizeof(struct path_destination)); + + pd->node = p->source->node; + pd->queuelen = first_pd->queuelen; + + list_push(&r->destinations, pd); + + struct path_source *ps = alloc(sizeof(struct path_source)); + + ps->node = first_pd->node; + ps->samplelen = p->source->samplelen; + + r->source = ps; + + list_foreach(struct hook *h, &p->hooks) { + struct hook *hc = alloc(sizeof(struct hook)); + + ret = hook_copy(h, hc); + if (ret) + return ret; + + list_push(&r->hooks, hc); + } + + return 0; +} \ No newline at end of file