1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

add some more functions for sample reference counting

This commit is contained in:
Steffen Vogel 2017-04-07 17:46:50 +02:00
parent 47bff0f9ba
commit 44ea4a160e
3 changed files with 58 additions and 8 deletions

View file

@ -70,6 +70,12 @@ int sample_get(struct sample *s);
/** Decrease reference count and release memory if last reference was held. */
int sample_put(struct sample *s);
int sample_copy(struct sample *dst, struct sample *src);
int sample_copy_many(struct sample *dsts[], struct sample *srcs[], int cnt);
int sample_get_many(struct sample *smps[], int cnt);
int sample_put_many(struct sample *smps[], int cnt);
/** Set number representation for a single value of a sample. */
int sample_get_data_format(struct sample *s, int idx);

View file

@ -66,8 +66,7 @@ static void path_read(struct path *p)
if (enqueue != enqueued)
warn("Queue overrun for path %s", path_name(p));
for (int i = 0; i < enqueued; i++)
sample_get(smps[i]); /* increase reference count */
sample_get_many(smps, enqueued);
debug(LOG_PATH | 15, "Enqueued %u samples from %s to queue of %s", enqueued, node_name(ps->node), node_name(pd->node));
}
@ -108,11 +107,7 @@ static void path_write(struct path *p)
debug(LOG_PATH | 15, "Sent %u messages to node %s", sent, node_name(pd->node));
released = 0;
for (int i = 0; i < sent; i++) {
if (sample_put(smps[i]) == 0)
released++; /* we had the last reference (0 remaining) */
}
released = sample_put_many(smps, sent);
debug(LOG_PATH | 15, "Released %d samples back to memory pool", released);
}

View file

@ -4,8 +4,11 @@
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
*********************************************************************************/
#include <string.h>
#include "pool.h"
#include "sample.h"
#include "utils.h"
int sample_alloc(struct pool *p, struct sample *smps[], int cnt)
{
@ -16,9 +19,10 @@ int sample_alloc(struct pool *p, struct sample *smps[], int cnt)
return ret;
for (int i = 0; i < ret; i++) {
smps[i]->capacity = (p->blocksz - sizeof(**smps)) / sizeof(smps[0]->data[0]);
smps[i]->capacity = (p->blocksz - SAMPLE_LEN(0)) / SAMPLE_DATA_LEN(1);
smps[i]->pool = p;
smps[i]->format = 0; /* all sample values are float by default */
smps[i]->refcnt = ATOMIC_VAR_INIT(1);
}
return ret;
@ -30,6 +34,26 @@ void sample_free(struct sample *smps[], int cnt)
pool_put(smps[i]->pool, smps[i]);
}
int sample_put_many(struct sample *smps[], int cnt)
{
int released = 0;
for (int i = 0; i < cnt; i++) {
if (sample_put(smps[i]) == 0)
released++;
}
return released;
}
int sample_get_many(struct sample *smps[], int cnt)
{
for (int i = 0; i < cnt; i++)
sample_get(smps[i]);
return cnt;
}
int sample_get(struct sample *s)
{
return atomic_fetch_add(&s->refcnt, 1) + 1;
@ -46,6 +70,31 @@ int sample_put(struct sample *s)
return prev - 1;
}
int sample_copy(struct sample *dst, struct sample *src)
{
dst->length = MIN(src->length, dst->capacity);
dst->sequence = src->sequence;
dst->format = src->format;
dst->source = src->source;
dst->ts.origin = src->ts.origin;
dst->ts.received = src->ts.received;
dst->ts.sent = src->ts.sent;
memcpy(&dst->data, &src->data, SAMPLE_DATA_LEN(dst->length));
return 0;
}
int sample_copy_many(struct sample *dsts[], struct sample *srcs[], int cnt)
{
for (int i = 0; i < cnt; i++)
sample_copy(dsts[i], srcs[i]);
return cnt;
}
int sample_set_data_format(struct sample *s, int idx, enum sample_data_format fmt)
{
if (idx >= sizeof(s->format) * 8)