diff --git a/include/villas/path.h b/include/villas/path.h index 73f22c542..183c4b68c 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -37,22 +37,24 @@ #include "pool.h" #include "common.h" #include "hook.h" +#include "mapping.h" /* Forward declarations */ struct stats; struct node; struct super_node; -struct path_source -{ +struct path_source { struct node *node; struct pool pool; + + struct mapping_entry *mapping; + int samplelen; pthread_t tid; }; -struct path_destination -{ +struct path_destination { struct node *node; struct queue queue; int queuelen; @@ -60,14 +62,11 @@ struct path_destination }; /** The datastructure for a path. */ -struct path -{ +struct path { enum state state; /**< Path state. */ - /* Each path has a single source and multiple destinations */ - struct path_source *source; /**< Pointer to the incoming node */ + struct list sources; /**< List of all incoming nodes (struct path_source). */ struct list destinations; /**< List of all outgoing nodes (struct path_destination). */ - struct list hooks; /**< List of function pointers to hooks. */ int enabled; /**< Is this path enabled. */ diff --git a/lib/path.c b/lib/path.c index db2661eca..e727681de 100644 --- a/lib/path.c +++ b/lib/path.c @@ -46,7 +46,7 @@ static void path_read(struct path *p) int enqueued; int ready; /**< Number of blocks in smps[] which are allocated and ready to be used by node_read(). */ - struct path_source *ps = p->source; + struct path_source *ps = list_first(&p->sources); int cnt = ps->node->vectorize; @@ -164,6 +164,7 @@ int path_init(struct path *p, struct super_node *sn) list_init(&p->hooks); list_init(&p->destinations); + list_init(&p->sources); p->_name = NULL; @@ -184,21 +185,22 @@ int path_init(struct path *p, struct super_node *sn) int path_parse(struct path *p, json_t *cfg, struct list *nodes) { int ret; - const char *in; json_error_t err; - json_t *cfg_out = NULL; - json_t *cfg_hooks = NULL; + json_t *json_in; + json_t *json_out = NULL; + json_t *json_hooks = NULL; - struct node *source; + struct list sources = { .state = STATE_DESTROYED }; struct list destinations = { .state = STATE_DESTROYED }; + list_init(&sources); list_init(&destinations); - ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: o, s?: o, s?: b, s?: b, s?: i, s?: i }", - "in", &in, - "out", &cfg_out, - "hooks", &cfg_hooks, + ret = json_unpack_ex(cfg, &err, 0, "{ s: o, s?: o, s?: o, s?: b, s?: b, s?: i, s?: i }", + "in", &json_in, + "out", &json_out, + "hooks", &json_hooks, "reverse", &p->reverse, "enabled", &p->enabled, "samplelen", &p->samplelen, @@ -207,34 +209,38 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) if (ret) jerror(&err, "Failed to parse path configuration"); - /* Input node */ - source = list_lookup(nodes, in); - if (!source) - jerror(&err, "Invalid input node '%s'", in); + /* Input node(s) */ + ret = node_parse_mapping_list(&sources, json_in, nodes); + if (ret) + error("Failed to parse input mapping of path %s", path_name(p)); + + /* Optional settings */ /* Output node(s) */ - if (cfg_out) { - ret = node_parse_list(&destinations, cfg_out, nodes); + if (json_out) { + ret = node_parse_list(&destinations, json_out, nodes); if (ret) jerror(&err, "Failed to parse output nodes"); } - /* Optional settings */ - if (cfg_hooks) { - ret = hook_parse_list(&p->hooks, cfg_hooks, p); + if (json_hooks) { + ret = hook_parse_list(&p->hooks, json_hooks, p); if (ret) return ret; } - - if (!IS_POW2(p->queuelen)) { - p->queuelen = LOG2_CEIL(p->queuelen); - warn("Queue length should always be a power of 2. Adjusting to %d", p->queuelen); + + for (size_t i = 0; i < list_length(&sources); i++) { + struct mapping_entry *me = list_at(&sources, i); + + struct path_source *ps = alloc(sizeof(struct path_source)); + + ps->node = me->node; + ps->samplelen = p->samplelen; + ps->mapping = me; + + list_push(&p->sources, ps); } - p->source = alloc(sizeof(struct path_source)); - p->source->node = source; - p->source->samplelen = p->samplelen; - for (size_t i = 0; i < list_length(&destinations); i++) { struct node *n = list_at(&destinations, i); @@ -246,6 +252,7 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) list_push(&p->destinations, pd); } + list_destroy(&sources, NULL, false); list_destroy(&destinations, NULL, false); p->cfg = cfg; @@ -258,6 +265,13 @@ int path_check(struct path *p) { assert(p->state != STATE_DESTROYED); + for (size_t i = 0; i < list_length(&p->sources); i++) { + struct path_source *ps = list_at(&p->sources, i); + + if (!ps->node->_vt->read) + error("Source node '%s' is not supported as a source for path '%s'", node_name(ps->node), path_name(p)); + } + for (size_t i = 0; i < list_length(&p->destinations); i++) { struct path_destination *pd = list_at(&p->destinations, i); @@ -265,8 +279,10 @@ int path_check(struct path *p) error("Destiation node '%s' is not supported as a sink for path '%s'", node_name(pd->node), path_name(p)); } - if (!p->source->node->_vt->read) - error("Source node '%s' is not supported as source for path '%s'", node_name(p->source->node), path_name(p)); + if (!IS_POW2(p->queuelen)) { + p->queuelen = LOG2_CEIL(p->queuelen); + warn("Queue length should always be a power of 2. Adjusting to %d", p->queuelen); + } p->state = STATE_CHECKED; @@ -315,10 +331,14 @@ int path_init2(struct path *p) queuelen = pd->queuelen; } - /* Initialize source */ - ret = pool_init(&p->source->pool, MAX(DEFAULT_QUEUELEN, queuelen), SAMPLE_LEN(p->source->samplelen), &memtype_hugepage); - if (ret) - error("Failed to allocate memory pool for path"); + /* Initialize sources */ + for (size_t i = 0; i < list_length(&p->sources); i++) { + struct path_source *ps = list_at(&p->sources, i); + + ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, queuelen), SAMPLE_LEN(ps->samplelen), &memtype_hugepage); + if (ret) + error("Failed to allocate memory pool for path"); + } return 0; } @@ -381,16 +401,13 @@ int path_destroy(struct path *p) return 0; list_destroy(&p->hooks, (dtor_cb_t) hook_destroy, true); + + list_destroy(&p->sources, (dtor_cb_t) path_source_destroy, true); list_destroy(&p->destinations, (dtor_cb_t) path_destination_destroy, true); - path_source_destroy(p->source); - if (p->_name) free(p->_name); - if (p->source) - free(p->source); - p->state = STATE_DESTROYED; return 0; @@ -399,65 +416,72 @@ int path_destroy(struct path *p) const char * path_name(struct path *p) { if (!p->_name) { - if (list_length(&p->destinations) == 1) { - struct path_destination *pd = (struct path_destination *) list_first(&p->destinations); + strcatf(&p->_name, "["); - strcatf(&p->_name, "%s " CLR_MAG("=>") " %s", - node_name_short(p->source->node), - node_name_short(pd->node)); + for (size_t i = 0; i < list_length(&p->sources); i++) { + struct path_source *ps = list_at(&p->sources, i); + + strcatf(&p->_name, " %s", node_name_short(ps->node)); } - else { - strcatf(&p->_name, "%s " CLR_MAG("=>") " [", node_name_short(p->source->node)); + + strcatf(&p->_name, " ] " CLR_MAG("=>") " ["); - for (size_t i = 0; i < list_length(&p->destinations); i++) { - struct path_destination *pd = list_at(&p->destinations, i); + for (size_t i = 0; i < list_length(&p->destinations); i++) { + struct path_destination *pd = list_at(&p->destinations, i); - strcatf(&p->_name, " %s", node_name_short(pd->node)); - } - - strcatf(&p->_name, " ]"); + strcatf(&p->_name, " %s", node_name_short(pd->node)); } + + strcatf(&p->_name, " ]"); } return p->_name; } -int path_uses_node(struct path *p, struct node *n) { +int path_uses_node(struct path *p, struct node *n) +{ for (size_t i = 0; i < list_length(&p->destinations); i++) { struct path_destination *pd = list_at(&p->destinations, i); if (pd->node == n) return 0; } + + for (size_t i = 0; i < list_length(&p->sources); i++) { + struct path_source *ps = list_at(&p->sources, i); - return p->source->node == n ? 0 : -1; + if (ps->node == n) + return 0; + } + + return -1; } int path_reverse(struct path *p, struct path *r) { int ret; - if (list_length(&p->destinations) > 1) + if (list_length(&p->destinations) != 1 || list_length(&p->sources) != 1) return -1; - - struct path_destination *first_pd = list_first(&p->destinations); - + /* General */ r->enabled = p->enabled; + + /* Source / Destinations */ + struct path_destination *orig_pd = list_first(&p->destinations); + struct path_source *orig_ps = list_first(&p->sources); - struct path_destination *pd = alloc(sizeof(struct path_destination)); + struct path_destination *new_pd = alloc(sizeof(struct path_destination)); + struct path_source *new_ps = alloc(sizeof(struct path_source)); - pd->node = p->source->node; - pd->queuelen = first_pd->queuelen; + new_pd->node = orig_ps->node; + new_pd->queuelen = orig_pd->queuelen; + + new_ps->node = orig_pd->node; + new_ps->samplelen = orig_ps->samplelen; - list_push(&r->destinations, pd); - - struct path_source *ps = alloc(sizeof(struct path_source)); - - ps->node = first_pd->node; - ps->samplelen = p->source->samplelen; - - r->source = ps; + list_push(&r->destinations, new_pd); + list_push(&r->sources, new_ps); for (size_t i = 0; i < list_length(&p->hooks); i++) { struct hook *h = list_at(&p->hooks, i);