diff --git a/include/villas/path.h b/include/villas/path.h index e3a7a4934..eb5d99969 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -35,6 +35,7 @@ #include "list.h" #include "queue.h" #include "pool.h" +#include "bitset.h" #include "common.h" #include "hook.h" #include "mapping.h" @@ -48,6 +49,8 @@ struct super_node; struct path_source { struct node *node; + bool masked; + struct pool pool; struct list mappings; /**< List of mappings (struct mapping_entry). */ }; @@ -93,8 +96,8 @@ struct path { char *_name; /**< Singleton: A string which is used to print this path to screen. */ - uintmax_t mask; /**< A mask of path_sources which are enabled for poll(). */ - uintmax_t received; /**< A mask of path_sources for which we already received samples. */ + struct bitset mask; /**< A mask of path_sources which are enabled for poll(). */ + struct bitset received; /**< A mask of path_sources for which we already received samples. */ pthread_t tid; /**< The thread id for this path. */ json_t *cfg; /**< A JSON object containing the configuration of the path. */ diff --git a/lib/path.c b/lib/path.c index 1b27bcfd0..5c5c9597c 100644 --- a/lib/path.c +++ b/lib/path.c @@ -158,7 +158,7 @@ static void * path_run(void *arg) else if (recv < ready) warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready); - p->received |= 1 << i; + bitset_set(&p->received, i); if (p->mode == PATH_MODE_ANY) { /* Mux all samples */ tomux_smps = read_smps; @@ -181,15 +181,17 @@ static void * path_run(void *arg) sample_copy(p->last_sample, muxed_smps[tomux-1]); - if (p->mask & (1 << i)) { + info("received = %s", bitset_dump(&p->received)); + + if (bitset_test(&p->mask, i)) { /* Check if we received an update from all nodes/ */ if ((p->mode == PATH_MODE_ANY) || - (p->mode == PATH_MODE_ALL && p->mask == p->received)) + (p->mode == PATH_MODE_ALL && !bitset_cmp(&p->mask, &p->received))) { path_destination_enqueue(p, muxed_smps, tomux); /* Reset bitset of updated nodes */ - p->received = 0; + bitset_clear_all(&p->received); } } @@ -250,7 +252,6 @@ int path_init(struct path *p) /* Default values */ p->mode = PATH_MODE_ANY; p->rate = 0; /* Disabled */ - p->mask = 0; /* None */ p->reverse = 0; p->enabled = 1; @@ -308,11 +309,17 @@ int path_init2(struct path *p) return ret; } - /* Calc sample length of path */ + bitset_init(&p->received, list_length(&p->sources)); + bitset_init(&p->mask, list_length(&p->sources)); + + /* Calc sample length of path and initialize bitset */ p->samplelen = 0; for (size_t i = 0; i < list_length(&p->sources); i++) { struct path_source *ps = list_at(&p->sources, i); + if (ps->masked) + bitset_set(&p->mask, i); + for (size_t i = 0; i < list_length(&ps->mappings); i++) { struct mapping_entry *me = list_at(&ps->mappings, i); @@ -438,6 +445,7 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) ps = alloc(sizeof(struct path_source)); ps->node = me->node; + ps->masked = false; ps->mappings.state = STATE_DESTROYED; @@ -469,7 +477,7 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) json_array_foreach(json_mask, i, json_entry) { const char *name; struct node *node; - struct path_source *source = NULL; + struct path_source *ps = NULL; name = json_string_value(json_entry); if (!name) @@ -481,24 +489,26 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) /* Search correspondending path_source to node */ for (size_t i = 0; i < list_length(&p->sources); i++) { - struct path_source *ps = list_at(&p->sources, i); + struct path_source *pt = list_at(&p->sources, i); - if (ps->node == node) { - source = ps; + if (pt->node == node) { + ps = pt; break; } } - if (!source) + if (!ps) error("Node %s is not a source of the path %s", node_name(node), path_name(p)); - p->mask |= 1 << i; + ps->masked = true; } } - else { - /* Enable all by default */ - for (size_t i = 0; i < list_length(&p->sources); i++) - p->mask |= 1 << i; + else {/* Enable all by default */ + for (size_t i = 0; i < list_length(&p->sources); i++) { + struct path_source *ps = list_at(&p->sources, i); + + ps->masked = true; + } } if (json_hooks) { @@ -550,21 +560,23 @@ int path_check(struct path *p) int path_start(struct path *p) { int ret; - const char *mode; + char *mode, *mask; assert(p->state == STATE_CHECKED); switch (p->mode) { - case PATH_MODE_ANY: mode = "any"; break; - case PATH_MODE_ALL: mode = "all"; break; - default: mode = "unknown"; break; + case PATH_MODE_ANY: mode = "any"; break; + case PATH_MODE_ALL: mode = "all"; break; + default: mode = "unknown"; break; } - info("Starting path %s: mode=%s, rate=%f, mask=%#lx, enabled=%s, reversed=%s, queuelen=%d, samplelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu", + mask = bitset_dump(&p->mask); + + info("Starting path %s: mode=%s, mask=%s, rate=%.2f, enabled=%s, reversed=%s, queuelen=%d, samplelen=%d, #hooks=%zu, #sources=%zu, #destinations=%zu", path_name(p), mode, + mask, p->rate, - p->mask, p->enabled ? "yes": "no", p->reverse ? "yes": "no", p->queuelen, p->samplelen, @@ -573,6 +585,8 @@ int path_start(struct path *p) list_length(&p->destinations) ); + free(mask); + for (size_t i = 0; i < list_length(&p->hooks); i++) { struct hook *h = list_at(&p->hooks, i); @@ -582,7 +596,8 @@ int path_start(struct path *p) } p->last_sequence = 0; - p->received = 0; + + bitset_clear_all(&p->received); /* We initialize the intial sample with zeros */ for (size_t i = 0; i < list_length(&p->sources); i++) {