From 98eed183e37ecc228b52e0cf6199bf9d14ba646d Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Fri, 14 Jul 2017 16:29:05 +0200 Subject: [PATCH] loopback: we need an additional pool for now --- include/villas/nodes/loopback.h | 3 +++ lib/nodes/loopback.c | 41 +++++++++++++++++++++++++++++---- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/include/villas/nodes/loopback.h b/include/villas/nodes/loopback.h index 6298b6383..e92e8fe39 100644 --- a/include/villas/nodes/loopback.h +++ b/include/villas/nodes/loopback.h @@ -32,6 +32,7 @@ #include #include "queue_signalled.h" +#include "pool.h" /* Forward declarations */ struct node; @@ -42,8 +43,10 @@ struct sample; */ struct loopback { int queuelen; + int samplelen; struct queue_signalled queue; + struct pool pool; }; /** @see node_type::print */ diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c index ecd4fa1d5..dcb3c2761 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.c @@ -34,35 +34,68 @@ int loopback_parse(struct node *n, config_setting_t *cfg) if (!config_setting_lookup_int(cfg, "queuelen", &l->queuelen)) l->queuelen = DEFAULT_QUEUELEN; + if (!config_setting_lookup_int(cfg, "samplelen", &l->samplelen)) + l->samplelen = DEFAULT_SAMPLELEN; + return 0; } int loopback_open(struct node *n) { + int ret; struct loopback *l = n->_vd; + ret = pool_init(&l->pool, l->queuelen, SAMPLE_LEN(l->samplelen), &memtype_hugepage); + if (ret) + return ret; + return queue_signalled_init(&l->queue, l->queuelen, &memtype_hugepage); } int loopback_close(struct node *n) { + int ret; struct loopback *l= n->_vd; + ret = pool_destroy(&l->pool); + if (ret) + return ret; + return queue_signalled_destroy(&l->queue); } int loopback_read(struct node *n, struct sample *smps[], unsigned cnt) { - struct loopback *l = n->_vd; + int avail; - return queue_signalled_pull_many(&l->queue, (void **) smps, cnt); + struct loopback *l = n->_vd; + struct sample *cpys[cnt]; + + avail = queue_signalled_pull_many(&l->queue, (void **) cpys, cnt); + + for (int i = 0; i < avail; i++) { + sample_copy(smps[i], cpys[i]); + sample_put(cpys[i]); + } + + return avail; } int loopback_write(struct node *n, struct sample *smps[], unsigned cnt) { - struct loopback *l = n->_vd; + int avail; - return queue_signalled_push_many(&l->queue, (void **) smps, cnt); + struct loopback *l = n->_vd; + struct sample *cpys[cnt]; + + avail = sample_alloc(&l->pool, cpys, cnt); + if (avail < cnt) + warn("Pool underun for node %s", node_name(n)); + + for (int i = 0; i < avail; i++) + sample_copy(cpys[i], smps[i]); + + return queue_signalled_push_many(&l->queue, (void **) cpys, avail); } char * loopback_print(struct node *n)