1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

refactor: {sample,signal}_{put,get} -> {sample,signal}_{incref,decref}

This commit is contained in:
Steffen Vogel 2018-08-07 09:22:26 +02:00
parent 37b98e7ad6
commit 1f16b4bf07
19 changed files with 50 additions and 50 deletions

View file

@ -111,17 +111,17 @@ void sample_free(struct sample *s);
/** Request \p cnt samples from memory pool \p p and initialize them.
* The reference count will already be set to 1.
* Use the sample_get() function to increase it. */
* Use the sample_incref() function to increase it. */
int sample_alloc_many(struct pool *p, struct sample *smps[], int cnt);
/** Release an array of samples back to their pools */
void sample_free_many(struct sample *smps[], int cnt);
/** Increase reference count of sample */
int sample_get(struct sample *s);
int sample_incref(struct sample *s);
/** Decrease reference count and release memory if last reference was held. */
int sample_put(struct sample *s);
int sample_decref(struct sample *s);
int sample_copy(struct sample *dst, struct sample *src);
@ -130,8 +130,8 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags);
int sample_clone_many(struct sample *clones[], struct sample *origs[], int cnt);
int sample_copy_many(struct sample *dsts[], struct sample *srcs[], int cnt);
int sample_get_many(struct sample *smps[], int cnt);
int sample_put_many(struct sample *smps[], int cnt);
int sample_incref_many(struct sample *smps[], int cnt);
int sample_decref_many(struct sample *smps[], int cnt);
enum signal_format sample_format(const struct sample *s, unsigned idx);

View file

@ -95,7 +95,7 @@ int shmem_int_close(struct shmem_int *shm);
*
* @param shm The shared memory interface.
* @param smps An array where the pointers to the samples will be written. The samples
* must be freed with sample_put after use.
* must be freed with sample_decref after use.
* @param cnt Number of samples to be read.
* @retval >=0 Number of samples that were read. Can be less than cnt (including 0) in case not enough samples were available.
* @retval -1 The other process closed the interface; no samples can be read anymore.
@ -114,7 +114,7 @@ int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt);
/** Allocate samples to be written to the interface.
*
* The writing process must not free the samples; only the receiving process should free them using sample_put after use.
* The writing process must not free the samples; only the receiving process should free them using sample_decref after use.
* @param shm The shared memory interface.
* @param smps Array where pointers to newly allocated samples will be returned.
* @param cnt Number of samples to allocate.

View file

@ -62,10 +62,10 @@ struct signal {
int signal_init(struct signal *s);
/** Increase reference counter. */
int signal_get(struct signal *s);
int signal_incref(struct signal *s);
/** Decrease reference counter. */
int signal_put(struct signal *s);
int signal_decref(struct signal *s);
int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, unsigned index);

View file

@ -50,7 +50,7 @@ static int drop_stop(struct hook *h)
struct drop *d = (struct drop *) h->_vd;
if (d->prev)
sample_put(d->prev);
sample_decref(d->prev);
return 0;
}
@ -91,9 +91,9 @@ ok: /* To discard the first X samples in 'smps[]' we must
}
if (cur)
sample_get(cur);
sample_incref(cur);
if (d->prev)
sample_put(d->prev);
sample_decref(d->prev);
d->prev = cur;
@ -107,7 +107,7 @@ static int drop_restart(struct hook *h)
struct drop *d = (struct drop *) h->_vd;
if (d->prev) {
sample_put(d->prev);
sample_decref(d->prev);
d->prev = NULL;
}

View file

@ -49,7 +49,7 @@ static int restart_stop(struct hook *h)
struct restart *r = (struct restart *) h->_vd;
if (r->prev)
sample_put(r->prev);
sample_decref(r->prev);
return 0;
}
@ -90,9 +90,9 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt)
}
if (cur)
sample_get(cur);
sample_incref(cur);
if (r->prev)
sample_put(r->prev);
sample_decref(r->prev);
r->prev = cur;

View file

@ -178,10 +178,10 @@ static int stats_collect_read(struct hook *h, struct sample *smps[], unsigned *c
}
if (p->last)
sample_put(p->last);
sample_decref(p->last);
if (previous)
sample_get(previous);
sample_incref(previous);
p->last = previous;

View file

@ -356,7 +356,7 @@ int iec61850_sv_read(struct node *n, struct sample *smps[], unsigned cnt, unsign
pulled = queue_signalled_pull_many(&i->subscriber.queue, (void **) smpt, cnt);
sample_copy_many(smps, smpt, pulled);
sample_put_many(smpt, pulled);
sample_decref_many(smpt, pulled);
return pulled;
}

View file

@ -48,14 +48,14 @@ static int ib_disconnect(struct node *n)
ib->conn.available_recv_wrs -= wcs;
for (int j = 0; j < wcs; j++)
sample_put((struct sample *) (wc[j].wr_id));
sample_decref((struct sample *) (wc[j].wr_id));
}
// Send Queue
while ((wcs = ibv_poll_cq(ib->ctx.send_cq, ib->send_cq_size, wc)))
for (int j = 0; j < wcs; j++)
if (wc[j].wr_id > 0)
sample_put((struct sample *) (wc[j].wr_id));
sample_decref((struct sample *) (wc[j].wr_id));
// Send Queue Stack
@ -65,7 +65,7 @@ static int ib_disconnect(struct node *n)
// to double check return of queue_pull.
queue_pull(&ib->conn.send_wc_buffer, (void **) &smp);
sample_put(smp);
sample_decref(smp);
}
// Destroy QP

View file

@ -80,7 +80,7 @@ int loopback_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned
for (int i = 0; i < avail; i++) {
sample_copy(smps[i], cpys[i]);
sample_put(cpys[i]);
sample_decref(cpys[i]);
}
return avail;

View file

@ -102,7 +102,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct
}
if (ret == 0) {
debug(4, "MQTT: skip empty message for node %s", node_name(n));
sample_put_many(smps, n->in.vectorize);
sample_decref_many(smps, n->in.vectorize);
return;
}
@ -373,7 +373,7 @@ int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt);
sample_copy_many(smps, smpt, pulled);
sample_put_many(smpt, pulled);
sample_decref_many(smpt, pulled);
return pulled;
}

View file

@ -129,7 +129,7 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
}
sample_copy_many(smps, shared_smps, recv);
sample_put_many(shared_smps, recv);
sample_decref_many(shared_smps, recv);
return recv;
}

View file

@ -117,7 +117,7 @@ static int websocket_connection_destroy(struct websocket_connection *c)
int avail;
struct sample *smp;
while ((avail = queue_pull(&c->queue, (void **) &smp)))
sample_put(smp);
sample_decref(smp);
ret = queue_destroy(&c->queue);
if (ret)
@ -154,7 +154,7 @@ static int websocket_connection_write(struct websocket_connection *c, struct sam
if (pushed < cnt)
warn("Queue overrun in WebSocket connection: %s", websocket_connection_name(c));
sample_get_many(smps, pushed);
sample_incref_many(smps, pushed);
debug(LOG_WEBSOCKET | 10, "Enqueued %u samples to %s", pushed, websocket_connection_name(c));
@ -283,7 +283,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->io.flags & FORMAT_TYPE_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT);
sample_put_many(smps, pulled);
sample_decref_many(smps, pulled);
debug(LOG_WEBSOCKET | 10, "Send %d samples to connection: %s, bytes=%d", pulled, websocket_connection_name(c), ret);
}
@ -341,7 +341,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi
/* Release unused samples back to pool */
if (enqueued < avail)
sample_put_many(&smps[enqueued], avail - enqueued);
sample_decref_many(&smps[enqueued], avail - enqueued);
buffer_clear(&c->buffers.recv);
@ -481,7 +481,7 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned
return avail;
sample_copy_many(smps, cpys, avail);
sample_put_many(cpys, avail);
sample_decref_many(cpys, avail);
return avail;
}
@ -507,7 +507,7 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigne
websocket_connection_write(c, cpys, cnt);
}
sample_put_many(cpys, avail);
sample_decref_many(cpys, avail);
return cnt;
}

View file

@ -147,8 +147,8 @@ static void path_source_read(struct path_source *ps, struct path *p, int i)
}
}
sample_put_many(muxed_smps, tomux);
out2: sample_put_many(read_smps, release);
sample_decref_many(muxed_smps, tomux);
out2: sample_decref_many(read_smps, release);
}
static int path_destination_init(struct path_destination *pd, int queuelen)
@ -191,12 +191,12 @@ static void path_destination_enqueue(struct path *p, struct sample *smps[], unsi
warn("Queue overrun for path %s", path_name(p));
/* Increase reference counter of these samples as they are now also owned by the queue. */
sample_get_many(clones, cloned);
sample_incref_many(clones, cloned);
debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p));
}
sample_put_many(clones, cloned);
sample_decref_many(clones, cloned);
}
static void path_destination_write(struct path_destination *pd, struct path *p)
@ -227,7 +227,7 @@ static void path_destination_write(struct path_destination *pd, struct path *p)
else if (sent < allocated)
warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated);
released = sample_put_many(smps, release);
released = sample_decref_many(smps, release);
debug(LOG_PATH | 15, "Released %d samples back to memory pool", released);
}

View file

@ -107,32 +107,32 @@ void sample_free_many(struct sample *smps[], int cnt)
sample_free(smps[i]);
}
int sample_put_many(struct sample *smps[], int cnt)
int sample_decref_many(struct sample *smps[], int cnt)
{
int released = 0;
for (int i = 0; i < cnt; i++) {
if (sample_put(smps[i]) == 0)
if (sample_decref(smps[i]) == 0)
released++;
}
return released;
}
int sample_get_many(struct sample *smps[], int cnt)
int sample_incref_many(struct sample *smps[], int cnt)
{
for (int i = 0; i < cnt; i++)
sample_get(smps[i]);
sample_incref(smps[i]);
return cnt;
}
int sample_get(struct sample *s)
int sample_incref(struct sample *s)
{
return atomic_fetch_add(&s->refcnt, 1) + 1;
}
int sample_put(struct sample *s)
int sample_decref(struct sample *s)
{
int prev = atomic_fetch_sub(&s->refcnt, 1);

View file

@ -99,12 +99,12 @@ int signal_destroy(struct signal *s)
return 0;
}
int signal_get(struct signal *s)
int signal_incref(struct signal *s)
{
return atomic_fetch_add(&s->refcnt, 1) + 1;
}
int signal_put(struct signal *s)
int signal_decref(struct signal *s)
{
int prev = atomic_fetch_sub(&s->refcnt, 1);

View file

@ -171,7 +171,7 @@ static void * send_loop(void *ctx)
else if (sent < scanned)
warn("Failed to sent %d out of %d samples to node %s", scanned-sent, scanned, node_name(node));
sample_put_many(smps, release);
sample_decref_many(smps, release);
cnt += sent;
if (sendd.limit > 0 && cnt >= sendd.limit)
@ -226,7 +226,7 @@ static void * recv_loop(void *ctx)
io_print(&io, smps, recv);
sample_put_many(smps, release);
sample_decref_many(smps, release);
cnt += recv;
if (recvv.limit > 0 && cnt >= recvv.limit)

View file

@ -182,7 +182,7 @@ int main(int argc, char *argv[])
node_read(&n, &t, 1, &release);
io_print(&io, &t, 1);
sample_put(t);
sample_decref(t);
}
return 0;

View file

@ -200,7 +200,7 @@ out: for (int i = 0; i < n; i++) {
if (ret)
error("Failed to destroy IO");
sample_put(s[i].sample);
sample_decref(s[i].sample);
}
ret = pool_destroy(&pool);

View file

@ -103,7 +103,7 @@ int main(int argc, char* argv[])
}
for (int i = 0; i < readcnt; i++)
sample_put(insmps[i]);
sample_decref(insmps[i]);
writecnt = shmem_int_write(&shm, outsmps, avail);
if (writecnt < avail)