diff --git a/include/villas/path.h b/include/villas/path.h index 267b29fe5..01bd5bfc1 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -55,7 +55,6 @@ struct path_source { struct path_destination { struct node *node; struct queue queue; - int queuelen; struct list hooks; /**< Write Hooks. */ }; @@ -78,8 +77,8 @@ struct path { int enabled; /**< Is this path enabled. */ int reverse; /**< This path as a matching reverse path. */ - - int queuelen; + int queuelen; /**< The queue length for each path_destination::queue */ + int samplelen; /**< Will be calculated based on path::sources.mappings */ int sequence; pthread_t tid; /**< The thread id for this path. */ diff --git a/lib/path.c b/lib/path.c index c818a3a1f..6c5f1c695 100644 --- a/lib/path.c +++ b/lib/path.c @@ -208,7 +208,6 @@ int path_init(struct path *p, struct super_node *sn) /* Default values */ p->reverse = 0; p->enabled = 1; - p->queuelen = DEFAULT_QUEUELEN; p->super_node = sn; @@ -220,7 +219,7 @@ int path_init(struct path *p, struct super_node *sn) int path_init2(struct path *p) { - int ret, queuelen = 0; + int ret; assert(p->state == STATE_CHECKED); @@ -252,24 +251,30 @@ int path_init2(struct path *p) for (size_t i = 0; i < list_length(&p->destinations); i++) { struct path_destination *pd = list_at(&p->destinations, i); - ret = queue_init(&pd->queue, pd->queuelen, &memtype_hugepage); + ret = queue_init(&pd->queue, p->queuelen, &memtype_hugepage); if (ret) return ret; - - if (pd->queuelen > queuelen) - queuelen = pd->queuelen; } /* Initialize sources */ + p->samplelen = 0; for (size_t i = 0; i < list_length(&p->sources); i++) { struct path_source *ps = list_at(&p->sources, i); - ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, queuelen), SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage); + /** @todo replace p->queuelen with ps->node->vectorize ? */ + ret = pool_init(&ps->pool, p->queuelen, SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage); if (ret) error("Failed to allocate memory pool for path"); + + for (size_t i = 0; i < list_length(&ps->mappings); i++) { + struct mapping_entry *me = list_at(&ps->mappings, i); + + if (me->offset + me->length > p->samplelen) + p->samplelen = me->offset + me->length; + } } - ret = pool_init(&p->pool, MAX(DEFAULT_QUEUELEN, queuelen), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage); + ret = pool_init(&p->pool, list_length(&p->destinations) * p->queuelen, SAMPLE_LEN(p->samplelen), &memtype_hugepage); if (ret) return ret; @@ -364,7 +369,6 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) struct path_destination *pd = alloc(sizeof(struct path_destination)); pd->node = n; - pd->queuelen = p->queuelen; list_init(&pd->hooks); @@ -549,7 +553,6 @@ int path_reverse(struct path *p, struct path *r) struct path_source *new_ps = alloc(sizeof(struct path_source)); new_pd->node = orig_ps->node; - new_pd->queuelen = orig_pd->queuelen; new_ps->node = orig_pd->node;