1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

refactored sample by adding more *_many() functions

This commit is contained in:
Steffen Vogel 2017-10-16 08:08:17 +02:00
parent ce5f4e5a60
commit 80ee8d786d
12 changed files with 116 additions and 42 deletions

View file

@ -107,13 +107,19 @@ struct sample {
/** Get the address of the pool to which the sample belongs. */
#define sample_pool(s) ((struct pool *) ((char *) (s) + (s)->pool_off))
struct sample * sample_alloc(struct pool *p);
struct sample * sample_clone(struct sample *smp);
void sample_free(struct sample *s);
/** Request \p cnt samples from memory pool \p p and initialize them.
* The reference count will already be set to 1.
* Use the sample_get() function to increase it. */
int sample_alloc(struct pool *p, struct sample *smps[], int cnt);
int sample_alloc_many(struct pool *p, struct sample *smps[], int cnt);
/** Release an array of samples back to their pools */
void sample_free(struct sample *smps[], int cnt);
void sample_free_many(struct sample *smps[], int cnt);
/** Increase reference count of sample */
int sample_get(struct sample *s);
@ -123,9 +129,11 @@ int sample_put(struct sample *s);
int sample_copy(struct sample *dst, struct sample *src);
/** Compare two samples */
int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags);
int sample_clone_many(struct sample *clones[], struct sample *origs[], int cnt);
int sample_copy_many(struct sample *dsts[], struct sample *srcs[], int cnt);
int sample_get_many(struct sample *smps[], int cnt);
int sample_put_many(struct sample *smps[], int cnt);

View file

@ -88,7 +88,7 @@ static int map_process(struct hook *h, struct sample *smps[], unsigned *cnt)
if (!p)
return -1;
ret = sample_alloc(p, tmp, *cnt);
ret = sample_alloc_many(p, tmp, *cnt);
if (ret != *cnt)
return ret;
@ -101,7 +101,7 @@ static int map_process(struct hook *h, struct sample *smps[], unsigned *cnt)
SWAP(smps[i], tmp[i]);
}
sample_free(tmp, *cnt);
sample_free_many(tmp, *cnt);
return 0;
}

View file

@ -89,19 +89,16 @@ int loopback_read(struct node *n, struct sample *smps[], unsigned cnt)
int loopback_write(struct node *n, struct sample *smps[], unsigned cnt)
{
int avail;
int cloned;
struct loopback *l = n->_vd;
struct sample *cpys[cnt];
struct sample *clones[cnt];
avail = sample_alloc(&l->pool, cpys, cnt);
if (avail < cnt)
warn("Pool underun for node %s", node_name(n));
cloned = sample_clone_many(smps, clones, cnt);
if (cloned < cnt)
warn("Pool underrun for node %s", node_name(n));
for (int i = 0; i < avail; i++)
sample_copy(cpys[i], smps[i]);
return queue_signalled_push_many(&l->queue, (void **) cpys, avail);
return queue_signalled_push_many(&l->queue, (void **) clones, cloned);
}
char * loopback_print(struct node *n)

View file

@ -135,15 +135,15 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct shmem *shm = n->_vd;
struct sample *shared_smps[cnt]; /* Samples need to be copied to the shared pool first */
int avail, pushed;
int avail, pushed, copied;
avail = sample_alloc(&shm->intf.write.shared->pool, shared_smps, cnt);
avail = sample_alloc_many(&shm->intf.write.shared->pool, shared_smps, cnt);
if (avail != cnt)
warn("Pool underrun for shmem node %s", shm->out_name);
for (int i = 0; i < avail; i++) {
sample_copy(shared_smps[i], smps[i]);
copied = sample_copy_many(shared_smps, smps, avail);
for (int i = 0; i < copied; i++) {
/* Since the node isn't in shared memory, the source can't be accessed */
shared_smps[i]->source = NULL;
shared_smps[i]->flags &= ~SAMPLE_HAS_SOURCE;

View file

@ -268,7 +268,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
struct websocket *w = c->node->_vd;
struct sample **smps = alloca(cnt * sizeof(struct sample *));
ret = sample_alloc(&w->pool, smps, cnt);
ret = sample_alloc_many(&w->pool, smps, cnt);
if (ret != cnt) {
warn("Pool underrun for connection: %s", websocket_connection_name(c));
break;
@ -464,7 +464,7 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt)
struct sample *cpys[cnt];
/* Make copies of all samples */
avail = sample_alloc(&w->pool, cpys, cnt);
avail = sample_alloc_many(&w->pool, cpys, cnt);
if (avail < cnt)
warn("Pool underrun for node %s: avail=%u", node_name(n), avail);

View file

@ -29,7 +29,37 @@
#include "utils.h"
#include "timing.h"
int sample_alloc(struct pool *p, struct sample *smps[], int cnt)
int sample_init(struct sample *s)
{
struct pool *p = sample_pool(s);
s->length = 0;
s->format = 0; /* all sample values are float by default */
s->capacity = (p->blocksz - sizeof(struct sample)) / sizeof(s->data[0]);
s->refcnt = ATOMIC_VAR_INIT(1);
return 0;
}
struct sample * sample_alloc(struct pool *p)
{
struct sample *s = pool_get(p);
s->pool_off = (char *) p - (char *) s;
sample_init(s);
return s;
}
void sample_free(struct sample *s)
{
struct pool *p = sample_pool(s);
pool_put(p, s);
}
int sample_alloc_many(struct pool *p, struct sample *smps[], int cnt)
{
int ret;
@ -38,19 +68,21 @@ int sample_alloc(struct pool *p, struct sample *smps[], int cnt)
return ret;
for (int i = 0; i < ret; i++) {
smps[i]->capacity = (p->blocksz - sizeof(**smps)) / sizeof(smps[0]->data[0]);
smps[i]->pool_off = (char *) p - (char *) smps[i];
smps[i]->format = 0; /* all sample values are float by default */
smps[i]->refcnt = ATOMIC_VAR_INIT(1);
sample_init(smps[i]);
}
return ret;
}
void sample_free(struct sample *smps[], int cnt)
void sample_free_many(struct sample *smps[], int cnt)
{
for (int i = 0; i < cnt; i++)
pool_put(sample_pool(smps[i]), smps[i]);
for (int i = 0; i < cnt; i++) {
struct pool *p = sample_pool(smps[i]);
pool_put(p, smps[i]);
}
}
int sample_put_many(struct sample *smps[], int cnt)
@ -105,6 +137,43 @@ int sample_copy(struct sample *dst, struct sample *src)
return 0;
}
struct sample * sample_clone(struct sample *orig)
{
struct sample *clone;
struct pool *pool;
pool = sample_pool(orig);
if (!pool)
return NULL;
clone = sample_alloc(pool);
if (!clone)
return NULL;
sample_copy(clone, orig);
return clone;
}
int sample_clone_many(struct sample *clones[], struct sample *origs[], int cnt)
{
int alloced, copied;
struct pool *pool;
if (cnt <= 0)
return 0;
pool = sample_pool(origs[0]);
if (!pool)
return 0;
alloced = sample_alloc_many(pool, clones, cnt);
copied = sample_copy_many(clones, origs, alloced);
return copied;
}
int sample_copy_many(struct sample *dsts[], struct sample *srcs[], int cnt)
{
for (int i = 0; i < cnt; i++)

View file

@ -198,5 +198,5 @@ int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
int shmem_int_alloc(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
{
return sample_alloc(&shm->write.shared->pool, smps, cnt);
return sample_alloc_many(&shm->write.shared->pool, smps, cnt);
}

View file

@ -65,7 +65,7 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx)
if (ret)
error("Failed to destroy hook");
sample_free(smps, cnt);
sample_free_many(smps, cnt);
ret = pool_destroy(&q);
if (ret)
@ -215,7 +215,7 @@ check: if (optarg == endptr)
pause();
}
ret = sample_alloc(&q, smps, cnt);
ret = sample_alloc_many(&q, smps, cnt);
if (ret != cnt)
error("Failed to allocate %d smps from pool", cnt);
@ -231,7 +231,7 @@ check: if (optarg == endptr)
io_print(&io, smps, recv);
sample_free(smps, cnt);
sample_free_many(smps, cnt);
}
return 0;

View file

@ -116,7 +116,7 @@ static void * send_loop(void *ctx)
error("Failed to allocate memory for receive pool.");
while (!io_eof(&io)) {
ready = sample_alloc(&sendd.pool, smps, node->vectorize);
ready = sample_alloc_many(&sendd.pool, smps, node->vectorize);
if (ret < 0)
error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret);
else if (ready < node->vectorize)
@ -172,7 +172,7 @@ static void * recv_loop(void *ctx)
error("Failed to allocate memory for receive pool.");
for (;;) {
ready = sample_alloc(&recvv.pool, smps, node->vectorize);
ready = sample_alloc_many(&recvv.pool, smps, node->vectorize);
if (ready < 0)
error("Failed to allocate %u samples from receive pool.", node->vectorize);
else if (ready < node->vectorize)

View file

@ -154,7 +154,7 @@ int main(int argc, char *argv[])
if (ret)
error("Failed to verify node configuration");
ret = pool_init(&q, 1, SAMPLE_LEN(n.samplelen), &memtype_heap);
ret = pool_init(&q, 16, SAMPLE_LEN(n.samplelen), &memtype_heap);
if (ret)
error("Failed to initialize pool");
@ -163,7 +163,7 @@ int main(int argc, char *argv[])
serror("Failed to start node");
for (;;) {
sample_alloc(&q, &t, 1);
t = sample_alloc(&q);
node_read(&n, &t, 1);
io_print(&io, &t, 1);

View file

@ -140,8 +140,8 @@ check: if (optarg == endptr)
if (ret)
error("Failed to open file: %s", s[i].path);
ret = sample_alloc(&pool, &s[i].sample, 1);
if (ret != 1)
s[i].sample = sample_alloc(&pool);
if (!s[i].sample)
error("Failed to allocate samples");
}

View file

@ -64,10 +64,10 @@ void generate_samples(struct pool *p, struct sample *smps[], struct sample *smpt
int ret;
/* Prepare a sample with arbitrary data */
ret = sample_alloc(p, smps, cnt);
ret = sample_alloc_many(p, smps, cnt);
cr_assert_eq(ret, NUM_SAMPLES);
ret = sample_alloc(p, smpt, cnt);
ret = sample_alloc_many(p, smpt, cnt);
cr_assert_eq(ret, cnt);
for (int i = 0; i < cnt; i++) {
@ -178,8 +178,8 @@ ParameterizedTest(char *fmt, io, lowlevel)
cr_assert_eq_samples(f, smps, smpt, ret);
sample_free(smps, NUM_SAMPLES);
sample_free(smpt, NUM_SAMPLES);
sample_free_many(smps, NUM_SAMPLES);
sample_free_many(smpt, NUM_SAMPLES);
ret = pool_destroy(&p);
cr_assert_eq(ret, 0);
@ -263,8 +263,8 @@ ParameterizedTest(char *fmt, io, highlevel)
free(fn);
sample_free(smps, NUM_SAMPLES);
sample_free(smpt, NUM_SAMPLES);
sample_free_many(smps, NUM_SAMPLES);
sample_free_many(smpt, NUM_SAMPLES);
ret = pool_destroy(&p);
cr_assert_eq(ret, 0);