From 80ee8d786ddeec73f0d564ebd86f59e3db4ffa07 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 16 Oct 2017 08:08:17 +0200 Subject: [PATCH] refactored sample by adding more *_many() functions --- include/villas/sample.h | 12 +++++- lib/hooks/map.c | 4 +- lib/nodes/loopback.c | 15 +++----- lib/nodes/shmem.c | 8 ++-- lib/nodes/websocket.c | 4 +- lib/sample.c | 83 +++++++++++++++++++++++++++++++++++++---- lib/shmem.c | 2 +- src/hook.c | 6 +-- src/pipe.c | 4 +- src/signal.c | 4 +- src/test-cmp.c | 4 +- tests/unit/io.c | 12 +++--- 12 files changed, 116 insertions(+), 42 deletions(-) diff --git a/include/villas/sample.h b/include/villas/sample.h index 9f80d2e22..fd0ad5d37 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -107,13 +107,19 @@ struct sample { /** Get the address of the pool to which the sample belongs. */ #define sample_pool(s) ((struct pool *) ((char *) (s) + (s)->pool_off)) +struct sample * sample_alloc(struct pool *p); + +struct sample * sample_clone(struct sample *smp); + +void sample_free(struct sample *s); + /** Request \p cnt samples from memory pool \p p and initialize them. * The reference count will already be set to 1. * Use the sample_get() function to increase it. */ -int sample_alloc(struct pool *p, struct sample *smps[], int cnt); +int sample_alloc_many(struct pool *p, struct sample *smps[], int cnt); /** Release an array of samples back to their pools */ -void sample_free(struct sample *smps[], int cnt); +void sample_free_many(struct sample *smps[], int cnt); /** Increase reference count of sample */ int sample_get(struct sample *s); @@ -123,9 +129,11 @@ int sample_put(struct sample *s); int sample_copy(struct sample *dst, struct sample *src); + /** Compare two samples */ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags); +int sample_clone_many(struct sample *clones[], struct sample *origs[], int cnt); int sample_copy_many(struct sample *dsts[], struct sample *srcs[], int cnt); int sample_get_many(struct sample *smps[], int cnt); int sample_put_many(struct sample *smps[], int cnt); diff --git a/lib/hooks/map.c b/lib/hooks/map.c index 0f6bd2687..1e9ea0512 100644 --- a/lib/hooks/map.c +++ b/lib/hooks/map.c @@ -88,7 +88,7 @@ static int map_process(struct hook *h, struct sample *smps[], unsigned *cnt) if (!p) return -1; - ret = sample_alloc(p, tmp, *cnt); + ret = sample_alloc_many(p, tmp, *cnt); if (ret != *cnt) return ret; @@ -101,7 +101,7 @@ static int map_process(struct hook *h, struct sample *smps[], unsigned *cnt) SWAP(smps[i], tmp[i]); } - sample_free(tmp, *cnt); + sample_free_many(tmp, *cnt); return 0; } diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c index 14f49b634..3b4badaf0 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.c @@ -89,19 +89,16 @@ int loopback_read(struct node *n, struct sample *smps[], unsigned cnt) int loopback_write(struct node *n, struct sample *smps[], unsigned cnt) { - int avail; + int cloned; struct loopback *l = n->_vd; - struct sample *cpys[cnt]; + struct sample *clones[cnt]; - avail = sample_alloc(&l->pool, cpys, cnt); - if (avail < cnt) - warn("Pool underun for node %s", node_name(n)); + cloned = sample_clone_many(smps, clones, cnt); + if (cloned < cnt) + warn("Pool underrun for node %s", node_name(n)); - for (int i = 0; i < avail; i++) - sample_copy(cpys[i], smps[i]); - - return queue_signalled_push_many(&l->queue, (void **) cpys, avail); + return queue_signalled_push_many(&l->queue, (void **) clones, cloned); } char * loopback_print(struct node *n) diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 1dffb95b8..a141226f4 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -135,15 +135,15 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) { struct shmem *shm = n->_vd; struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */ - int avail, pushed; + int avail, pushed, copied; - avail = sample_alloc(&shm->intf.write.shared->pool, shared_smps, cnt); + avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, cnt); if (avail != cnt) warn("Pool underrun for shmem node %s", shm->out_name); - for (int i = 0; i < avail; i++) { - sample_copy(shared_smps[i], smps[i]); + copied = sample_copy_many(shared_smps, smps, avail); + for (int i = 0; i < copied; i++) { /* Since the node isn't in shared memory, the source can't be accessed */ shared_smps[i]->source = NULL; shared_smps[i]->flags &= ~SAMPLE_HAS_SOURCE; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 90831f9d4..6c77ab36e 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -268,7 +268,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi struct websocket *w = c->node->_vd; struct sample **smps = alloca(cnt * sizeof(struct sample *)); - ret = sample_alloc(&w->pool, smps, cnt); + ret = sample_alloc_many(&w->pool, smps, cnt); if (ret != cnt) { warn("Pool underrun for connection: %s", websocket_connection_name(c)); break; @@ -464,7 +464,7 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt) struct sample *cpys[cnt]; /* Make copies of all samples */ - avail = sample_alloc(&w->pool, cpys, cnt); + avail = sample_alloc_many(&w->pool, cpys, cnt); if (avail < cnt) warn("Pool underrun for node %s: avail=%u", node_name(n), avail); diff --git a/lib/sample.c b/lib/sample.c index 8a9bddd92..c0cda934b 100644 --- a/lib/sample.c +++ b/lib/sample.c @@ -29,7 +29,37 @@ #include "utils.h" #include "timing.h" -int sample_alloc(struct pool *p, struct sample *smps[], int cnt) +int sample_init(struct sample *s) +{ + struct pool *p = sample_pool(s); + + s->length = 0; + s->format = 0; /* all sample values are float by default */ + s->capacity = (p->blocksz - sizeof(struct sample)) / sizeof(s->data[0]); + s->refcnt = ATOMIC_VAR_INIT(1); + + return 0; +} + +struct sample * sample_alloc(struct pool *p) +{ + struct sample *s = pool_get(p); + + s->pool_off = (char *) p - (char *) s; + + sample_init(s); + + return s; +} + +void sample_free(struct sample *s) +{ + struct pool *p = sample_pool(s); + + pool_put(p, s); +} + +int sample_alloc_many(struct pool *p, struct sample *smps[], int cnt) { int ret; @@ -38,19 +68,21 @@ int sample_alloc(struct pool *p, struct sample *smps[], int cnt) return ret; for (int i = 0; i < ret; i++) { - smps[i]->capacity = (p->blocksz - sizeof(**smps)) / sizeof(smps[0]->data[0]); smps[i]->pool_off = (char *) p - (char *) smps[i]; - smps[i]->format = 0; /* all sample values are float by default */ - smps[i]->refcnt = ATOMIC_VAR_INIT(1); + + sample_init(smps[i]); } return ret; } -void sample_free(struct sample *smps[], int cnt) +void sample_free_many(struct sample *smps[], int cnt) { - for (int i = 0; i < cnt; i++) - pool_put(sample_pool(smps[i]), smps[i]); + for (int i = 0; i < cnt; i++) { + struct pool *p = sample_pool(smps[i]); + + pool_put(p, smps[i]); + } } int sample_put_many(struct sample *smps[], int cnt) @@ -105,6 +137,43 @@ int sample_copy(struct sample *dst, struct sample *src) return 0; } +struct sample * sample_clone(struct sample *orig) +{ + struct sample *clone; + struct pool *pool; + + pool = sample_pool(orig); + if (!pool) + return NULL; + + clone = sample_alloc(pool); + if (!clone) + return NULL; + + sample_copy(clone, orig); + + return clone; +} + +int sample_clone_many(struct sample *clones[], struct sample *origs[], int cnt) +{ + int alloced, copied; + struct pool *pool; + + if (cnt <= 0) + return 0; + + pool = sample_pool(origs[0]); + if (!pool) + return 0; + + alloced = sample_alloc_many(pool, clones, cnt); + + copied = sample_copy_many(clones, origs, alloced); + + return copied; +} + int sample_copy_many(struct sample *dsts[], struct sample *srcs[], int cnt) { for (int i = 0; i < cnt; i++) diff --git a/lib/shmem.c b/lib/shmem.c index 74b93bc68..c9ab74640 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -198,5 +198,5 @@ int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt) int shmem_int_alloc(struct shmem_int *shm, struct sample *smps[], unsigned cnt) { - return sample_alloc(&shm->write.shared->pool, smps, cnt); + return sample_alloc_many(&shm->write.shared->pool, smps, cnt); } diff --git a/src/hook.c b/src/hook.c index f2af04eb2..16edede5e 100644 --- a/src/hook.c +++ b/src/hook.c @@ -65,7 +65,7 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) if (ret) error("Failed to destroy hook"); - sample_free(smps, cnt); + sample_free_many(smps, cnt); ret = pool_destroy(&q); if (ret) @@ -215,7 +215,7 @@ check: if (optarg == endptr) pause(); } - ret = sample_alloc(&q, smps, cnt); + ret = sample_alloc_many(&q, smps, cnt); if (ret != cnt) error("Failed to allocate %d smps from pool", cnt); @@ -231,7 +231,7 @@ check: if (optarg == endptr) io_print(&io, smps, recv); - sample_free(smps, cnt); + sample_free_many(smps, cnt); } return 0; diff --git a/src/pipe.c b/src/pipe.c index e366b8154..f3196545b 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -116,7 +116,7 @@ static void * send_loop(void *ctx) error("Failed to allocate memory for receive pool."); while (!io_eof(&io)) { - ready = sample_alloc(&sendd.pool, smps, node->vectorize); + ready = sample_alloc_many(&sendd.pool, smps, node->vectorize); if (ret < 0) error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret); else if (ready < node->vectorize) @@ -172,7 +172,7 @@ static void * recv_loop(void *ctx) error("Failed to allocate memory for receive pool."); for (;;) { - ready = sample_alloc(&recvv.pool, smps, node->vectorize); + ready = sample_alloc_many(&recvv.pool, smps, node->vectorize); if (ready < 0) error("Failed to allocate %u samples from receive pool.", node->vectorize); else if (ready < node->vectorize) diff --git a/src/signal.c b/src/signal.c index 7472f07a9..92b56d741 100644 --- a/src/signal.c +++ b/src/signal.c @@ -154,7 +154,7 @@ int main(int argc, char *argv[]) if (ret) error("Failed to verify node configuration"); - ret = pool_init(&q, 1, SAMPLE_LEN(n.samplelen), &memtype_heap); + ret = pool_init(&q, 16, SAMPLE_LEN(n.samplelen), &memtype_heap); if (ret) error("Failed to initialize pool"); @@ -163,7 +163,7 @@ int main(int argc, char *argv[]) serror("Failed to start node"); for (;;) { - sample_alloc(&q, &t, 1); + t = sample_alloc(&q); node_read(&n, &t, 1); io_print(&io, &t, 1); diff --git a/src/test-cmp.c b/src/test-cmp.c index cb4c6cc1e..745797a4f 100644 --- a/src/test-cmp.c +++ b/src/test-cmp.c @@ -140,8 +140,8 @@ check: if (optarg == endptr) if (ret) error("Failed to open file: %s", s[i].path); - ret = sample_alloc(&pool, &s[i].sample, 1); - if (ret != 1) + s[i].sample = sample_alloc(&pool); + if (!s[i].sample) error("Failed to allocate samples"); } diff --git a/tests/unit/io.c b/tests/unit/io.c index fc355efc4..8294ae29a 100644 --- a/tests/unit/io.c +++ b/tests/unit/io.c @@ -64,10 +64,10 @@ void generate_samples(struct pool *p, struct sample *smps[], struct sample *smpt int ret; /* Prepare a sample with arbitrary data */ - ret = sample_alloc(p, smps, cnt); + ret = sample_alloc_many(p, smps, cnt); cr_assert_eq(ret, NUM_SAMPLES); - ret = sample_alloc(p, smpt, cnt); + ret = sample_alloc_many(p, smpt, cnt); cr_assert_eq(ret, cnt); for (int i = 0; i < cnt; i++) { @@ -178,8 +178,8 @@ ParameterizedTest(char *fmt, io, lowlevel) cr_assert_eq_samples(f, smps, smpt, ret); - sample_free(smps, NUM_SAMPLES); - sample_free(smpt, NUM_SAMPLES); + sample_free_many(smps, NUM_SAMPLES); + sample_free_many(smpt, NUM_SAMPLES); ret = pool_destroy(&p); cr_assert_eq(ret, 0); @@ -263,8 +263,8 @@ ParameterizedTest(char *fmt, io, highlevel) free(fn); - sample_free(smps, NUM_SAMPLES); - sample_free(smpt, NUM_SAMPLES); + sample_free_many(smps, NUM_SAMPLES); + sample_free_many(smpt, NUM_SAMPLES); ret = pool_destroy(&p); cr_assert_eq(ret, 0);