diff --git a/include/villas/path.h b/include/villas/path.h index 92a6c7249..e3a7a4934 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -38,6 +38,7 @@ #include "common.h" #include "hook.h" #include "mapping.h" +#include "task.h" /* Forward declarations */ struct stats; @@ -48,7 +49,7 @@ struct path_source { struct node *node; struct pool pool; - struct list mappings; /**< List of mappings (struct mapping_entry). */ + struct list mappings; /**< List of mappings (struct mapping_entry). */ }; struct path_destination { @@ -57,9 +58,17 @@ struct path_destination { struct queue queue; }; +/** The register mode determines under which condition the path is triggered. */ +enum path_mode { + PATH_MODE_ANY, /**< The path is triggered whenever one of the sources receives samples. */ + PATH_MODE_ALL /**< The path is triggered only after all sources have received at least 1 sample. */ +}; + /** The datastructure for a path. */ struct path { - enum state state; /**< Path state. */ + enum state state; /**< Path state. */ + + enum path_mode mode; /**< Determines when this path is triggered. */ struct { int nfds; @@ -68,19 +77,25 @@ struct path { struct pool pool; struct sample *last_sample; + int last_sequence; struct list sources; /**< List of all incoming nodes (struct path_source). */ struct list destinations; /**< List of all outgoing nodes (struct path_destination). */ struct list hooks; /**< List of processing hooks (struct hook). */ + struct task timeout; + + double rate; /**< A timeout for */ int enabled; /**< Is this path enabled. */ int reverse; /**< This path as a matching reverse path. */ int queuelen; /**< The queue length for each path_destination::queue */ int samplelen; /**< Will be calculated based on path::sources.mappings */ - int sequence; char *_name; /**< Singleton: A string which is used to print this path to screen. */ + uintmax_t mask; /**< A mask of path_sources which are enabled for poll(). */ + uintmax_t received; /**< A mask of path_sources for which we already received samples. */ + pthread_t tid; /**< The thread id for this path. */ json_t *cfg; /**< A JSON object containing the configuration of the path. */ }; diff --git a/lib/path.c b/lib/path.c index 082f95993..1b27bcfd0 100644 --- a/lib/path.c +++ b/lib/path.c @@ -65,64 +65,6 @@ static int path_source_destroy(struct path_source *ps) return 0; } -static void path_source_read(struct path *p, struct path_source *ps) -{ - int ready, recv, mux, enqueue, enqueued; - int cnt = ps->node->vectorize; - - struct sample *read_smps[cnt]; - struct sample *muxed_smps[cnt]; - - /* Fill smps[] free sample blocks from the pool */ - ready = sample_alloc(&ps->pool, read_smps, cnt); - if (ready != cnt) - warn("Pool underrun for path source %s", node_name(ps->node)); - - /* Read ready samples and store them to blocks pointed by smps[] */ - recv = node_read(ps->node, read_smps, ready); - if (recv == 0) - goto out2; - else if (recv < 0) - error("Failed to receive message from node %s", node_name(ps->node)); - else if (recv < ready) - warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready); - - /* Mux samples */ - mux = sample_alloc(&p->pool, muxed_smps, recv); - if (mux != recv) - warn("Pool underrun for path %s", path_name(p)); - - for (int i = 0; i < mux; i++) { - p->last_sample->sequence = p->sequence++; - - mapping_remap(&ps->mappings, p->last_sample, read_smps[i], NULL); - sample_copy(muxed_smps[i], p->last_sample); - } - - /* Run processing hooks */ - enqueue = hook_process_list(&p->hooks, muxed_smps, mux); - if (enqueue == 0) - goto out1; - - /* Keep track of the lowest index that wasn't enqueued; - * all following samples must be freed here */ - for (size_t i = 0; i < list_length(&p->destinations); i++) { - struct path_destination *pd = list_at(&p->destinations, i); - - enqueued = queue_push_many(&pd->queue, (void **) muxed_smps, enqueue); - if (enqueue != enqueued) - warn("Queue overrun for path %s", path_name(p)); - - /* Increase reference counter of these samples as they are now also owned by the queue. */ - sample_get_many(muxed_smps, enqueued); - - debug(LOG_PATH | 15, "Enqueued %u samples from %s to queue of %s", enqueued, node_name(ps->node), node_name(pd->node)); - } - -out1: sample_put_many(muxed_smps, recv); -out2: sample_put_many(read_smps, ready); -} - static int path_destination_init(struct path_destination *pd, int queuelen) { int ret; @@ -145,69 +87,149 @@ static int path_destination_destroy(struct path_destination *pd) return 0; } -static void path_poll(struct path *p) +static void path_destination_enqueue(struct path *p, struct sample *smps[], unsigned cnt) { - int ret; + unsigned enqueued, cloned; - ret = poll(p->reader.pfds, p->reader.nfds, -1); - if (ret < 0) - serror("Failed to poll"); + struct sample *clones[cnt]; - int updates = 0; - for (int i = 0; i < p->reader.nfds; i++) { - struct path_source *ps = list_at(&p->sources, i); + cloned = sample_clone_many(clones, smps, cnt); + if (cloned < cnt) + warn("Pool underrun in path %s", path_name(p)); - if (p->reader.pfds[i].revents & POLLIN) { - path_source_read(p, ps); - updates++; - } - } -} - -static void path_write(struct path *p) -{ for (size_t i = 0; i < list_length(&p->destinations); i++) { struct path_destination *pd = list_at(&p->destinations, i); - int cnt = pd->node->vectorize; - int sent; - int available; - int released; + enqueued = queue_push_many(&pd->queue, (void **) clones, cloned); + if (enqueued != cnt) + warn("Queue overrun for path %s", path_name(p)); - struct sample *smps[cnt]; + /* Increase reference counter of these samples as they are now also owned by the queue. */ + sample_get_many(clones, cloned); - /* As long as there are still samples in the queue */ - while (1) { - available = queue_pull_many(&pd->queue, (void **) smps, cnt); - if (available == 0) - break; - else if (available < cnt) - debug(LOG_PATH | 5, "Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); - - debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p)); - - sent = node_write(pd->node, smps, available); - if (sent < 0) - error("Failed to sent %u samples to node %s", cnt, node_name(pd->node)); - else if (sent < available) - warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, available); - - released = sample_put_many(smps, sent); - - debug(LOG_PATH | 15, "Released %d samples back to memory pool", released); - } + debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p)); } + + sample_put_many(clones, cloned); } /** Main thread function per path: receive -> sent messages */ static void * path_run(void *arg) { + int ret, recv, tomux, ready, cnt; struct path *p = arg; for (;;) { - /* We only need to poll in case there is more than one source */ - path_poll(p); - path_write(p); + ret = poll(p->reader.pfds, p->reader.nfds, -1); + if (ret < 0) + serror("Failed to poll"); + + for (int i = 0; i < p->reader.nfds; i++) { + struct path_source *ps = list_at(&p->sources, i); + + if (p->reader.pfds[i].revents & POLLIN) { + /* Timeout: re-enqueue the last sample */ + if (p->reader.pfds[i].fd == task_fd(&p->timeout)) { + task_wait(&p->timeout); + + p->last_sample->sequence = p->last_sequence++; + + path_destination_enqueue(p, &p->last_sample, 1); + } + /* A source is ready to receive samples */ + else { + cnt = ps->node->vectorize; + + struct sample *read_smps[cnt]; + struct sample *muxed_smps[cnt]; + struct sample **tomux_smps; + + /* Fill smps[] free sample blocks from the pool */ + ready = sample_alloc_many(&ps->pool, read_smps, cnt); + if (ready != cnt) + warn("Pool underrun for path source %s", node_name(ps->node)); + + /* Read ready samples and store them to blocks pointed by smps[] */ + recv = node_read(ps->node, read_smps, ready); + if (recv == 0) + goto out2; + else if (recv < 0) + error("Failed to receive message from node %s", node_name(ps->node)); + else if (recv < ready) + warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready); + + p->received |= 1 << i; + + if (p->mode == PATH_MODE_ANY) { /* Mux all samples */ + tomux_smps = read_smps; + tomux = recv; + } + else { /* Mux only last sample and discard others */ + tomux_smps = read_smps + recv - 1; + tomux = 1; + } + + for (int i = 0; i < tomux; i++) { + muxed_smps[i] = i == 0 + ? sample_clone(p->last_sample) + : sample_clone(muxed_smps[i-1]); + + muxed_smps[i]->sequence = p->last_sequence++; + + mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL); + } + + sample_copy(p->last_sample, muxed_smps[tomux-1]); + + if (p->mask & (1 << i)) { + /* Check if we received an update from all nodes/ */ + if ((p->mode == PATH_MODE_ANY) || + (p->mode == PATH_MODE_ALL && p->mask == p->received)) + { + path_destination_enqueue(p, muxed_smps, tomux); + + /* Reset bitset of updated nodes */ + p->received = 0; + } + } + + sample_put_many(muxed_smps, tomux); +out2: sample_put_many(read_smps, ready); + } + } + } + + for (size_t i = 0; i < list_length(&p->destinations); i++) { + struct path_destination *pd = list_at(&p->destinations, i); + + int cnt = pd->node->vectorize; + int sent; + int available; + int released; + + struct sample *smps[cnt]; + + /* As long as there are still samples in the queue */ + while (1) { + available = queue_pull_many(&pd->queue, (void **) smps, cnt); + if (available == 0) + break; + else if (available < cnt) + debug(LOG_PATH | 5, "Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt); + + debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p)); + + sent = node_write(pd->node, smps, available); + if (sent < 0) + error("Failed to sent %u samples to node %s", cnt, node_name(pd->node)); + else if (sent < available) + warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, available); + + released = sample_put_many(smps, sent); + + debug(LOG_PATH | 15, "Released %d samples back to memory pool", released); + } + } } return NULL; @@ -226,6 +248,10 @@ int path_init(struct path *p) p->_name = NULL; /* Default values */ + p->mode = PATH_MODE_ANY; + p->rate = 0; /* Disabled */ + p->mask = 0; /* None */ + p->reverse = 0; p->enabled = 1; p->queuelen = DEFAULT_QUEUELEN; @@ -305,19 +331,35 @@ int path_init2(struct path *p) if (ret) return ret; - ret = sample_alloc(&p->pool, &p->last_sample, 1); - if (ret != 1) + p->last_sample = sample_alloc(&p->pool); + if (!p->last_sample) return -1; /* Prepare poll() */ - p->reader.nfds = list_length(&p->sources); + int nfds = list_length(&p->sources); + + if (p->rate > 0) + nfds++; + + p->reader.nfds = nfds; p->reader.pfds = alloc(sizeof(struct pollfd) * p->reader.nfds); - for (int i = 0; i < p->reader.nfds; i++) { + for (int i = 0; i < list_length(&p->sources); i++) { struct path_source *ps = list_at(&p->sources, i); - p->reader.pfds[i].fd = node_fd(ps->node); + /* This slot is only used if it is not masked */ p->reader.pfds[i].events = POLLIN; + p->reader.pfds[i].fd = node_fd(ps->node); + } + + /* We use the last slot for the timeout timer. */ + if (p->rate > 0) { + ret = task_init(&p->timeout, p->rate, CLOCK_MONOTONIC); + if (ret) + return ret; + + p->reader.pfds[nfds-1].fd = task_fd(&p->timeout); + p->reader.pfds[nfds-1].events = POLLIN; } return 0; @@ -331,6 +373,9 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) json_t *json_in; json_t *json_out = NULL; json_t *json_hooks = NULL; + json_t *json_mask = NULL; + + const char *mode = NULL; struct list sources = { .state = STATE_DESTROYED }; struct list destinations = { .state = STATE_DESTROYED }; @@ -338,13 +383,16 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) list_init(&sources); list_init(&destinations); - ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: i }", + ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: i, s?: s, s?: F, s?: o }", "in", &json_in, "out", &json_out, "hooks", &json_hooks, "reverse", &p->reverse, "enabled", &p->enabled, - "queuelen", &p->queuelen + "queuelen", &p->queuelen, + "mode", &mode, + "rate", &p->rate, + "mask", &json_mask ); if (ret) jerror(&err, "Failed to parse path configuration"); @@ -355,6 +403,14 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) error("Failed to parse input mapping of path %s", path_name(p)); /* Optional settings */ + if (mode) { + if (!strcmp(mode, "any")) + p->mode = PATH_MODE_ANY; + else if (!strcmp(mode, "all")) + p->mode = PATH_MODE_ALL; + else + error("Invalid path mode '%s'", mode); + } /* Output node(s) */ if (json_out) { @@ -403,6 +459,48 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) list_push(&p->destinations, pd); } + if (json_mask) { + json_t *json_entry; + size_t i; + + if (!json_is_array(json_mask)) + error("The 'mask' setting must be a list of node names"); + + json_array_foreach(json_mask, i, json_entry) { + const char *name; + struct node *node; + struct path_source *source = NULL; + + name = json_string_value(json_entry); + if (!name) + error("The 'mask' setting must be a list of node names"); + + node = list_lookup(nodes, name); + if (!node) + error("The 'mask' entry '%s' is not a valid node name", name); + + /* Search correspondending path_source to node */ + for (size_t i = 0; i < list_length(&p->sources); i++) { + struct path_source *ps = list_at(&p->sources, i); + + if (ps->node == node) { + source = ps; + break; + } + } + + if (!source) + error("Node %s is not a source of the path %s", node_name(node), path_name(p)); + + p->mask |= 1 << i; + } + } + else { + /* Enable all by default */ + for (size_t i = 0; i < list_length(&p->sources); i++) + p->mask |= 1 << i; + } + if (json_hooks) { ret = hook_parse_list(&p->hooks, json_hooks, p, NULL); if (ret) @@ -422,6 +520,9 @@ int path_check(struct path *p) { assert(p->state != STATE_DESTROYED); + if (p->rate < 0) + error("Setting 'rate' of path %s must be a positive number.", path_name(p)); + for (size_t i = 0; i < list_length(&p->sources); i++) { struct path_source *ps = list_at(&p->sources, i); @@ -449,11 +550,21 @@ int path_check(struct path *p) int path_start(struct path *p) { int ret; + const char *mode; assert(p->state == STATE_CHECKED); - info("Starting path %s: enabled=%s, reversed=%s, queuelen=%d, samplelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu", + switch (p->mode) { + case PATH_MODE_ANY: mode = "any"; break; + case PATH_MODE_ALL: mode = "all"; break; + default: mode = "unknown"; break; + } + + info("Starting path %s: mode=%s, rate=%f, mask=%#lx, enabled=%s, reversed=%s, queuelen=%d, samplelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu", path_name(p), + mode, + p->rate, + p->mask, p->enabled ? "yes": "no", p->reverse ? "yes": "no", p->queuelen, p->samplelen, @@ -470,7 +581,8 @@ int path_start(struct path *p) return ret; } - p->sequence = 0; + p->last_sequence = 0; + p->received = 0; /* We initialize the intial sample with zeros */ for (size_t i = 0; i < list_length(&p->sources); i++) { @@ -542,9 +654,15 @@ int path_destroy(struct path *p) list_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true); list_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true); + if (p->reader.pfds) + free(p->reader.pfds); + if (p->_name) free(p->_name); + if (p->rate > 0) + task_destroy(&p->timeout); + pool_destroy(&p->pool); p->state = STATE_DESTROYED;