diff --git a/include/villas/sample.h b/include/villas/sample.h index 07c3ca4a5..17fac2f90 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -111,17 +111,17 @@ void sample_free(struct sample *s); /** Request \p cnt samples from memory pool \p p and initialize them. * The reference count will already be set to 1. - * Use the sample_get() function to increase it. */ + * Use the sample_incref() function to increase it. */ int sample_alloc_many(struct pool *p, struct sample *smps[], int cnt); /** Release an array of samples back to their pools */ void sample_free_many(struct sample *smps[], int cnt); /** Increase reference count of sample */ -int sample_get(struct sample *s); +int sample_incref(struct sample *s); /** Decrease reference count and release memory if last reference was held. */ -int sample_put(struct sample *s); +int sample_decref(struct sample *s); int sample_copy(struct sample *dst, struct sample *src); @@ -130,8 +130,8 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags); int sample_clone_many(struct sample *clones[], struct sample *origs[], int cnt); int sample_copy_many(struct sample *dsts[], struct sample *srcs[], int cnt); -int sample_get_many(struct sample *smps[], int cnt); -int sample_put_many(struct sample *smps[], int cnt); +int sample_incref_many(struct sample *smps[], int cnt); +int sample_decref_many(struct sample *smps[], int cnt); enum signal_format sample_format(const struct sample *s, unsigned idx); diff --git a/include/villas/shmem.h b/include/villas/shmem.h index a8f5022a8..5ee575b8c 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -95,7 +95,7 @@ int shmem_int_close(struct shmem_int *shm); * * @param shm The shared memory interface. * @param smps An array where the pointers to the samples will be written. The samples - * must be freed with sample_put after use. + * must be freed with sample_decref after use. * @param cnt Number of samples to be read. * @retval >=0 Number of samples that were read. Can be less than cnt (including 0) in case not enough samples were available. * @retval -1 The other process closed the interface; no samples can be read anymore. @@ -114,7 +114,7 @@ int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt); /** Allocate samples to be written to the interface. * - * The writing process must not free the samples; only the receiving process should free them using sample_put after use. + * The writing process must not free the samples; only the receiving process should free them using sample_decref after use. * @param shm The shared memory interface. * @param smps Array where pointers to newly allocated samples will be returned. * @param cnt Number of samples to allocate. diff --git a/include/villas/signal.h b/include/villas/signal.h index 8769803db..55ec24004 100644 --- a/include/villas/signal.h +++ b/include/villas/signal.h @@ -62,10 +62,10 @@ struct signal { int signal_init(struct signal *s); /** Increase reference counter. */ -int signal_get(struct signal *s); +int signal_incref(struct signal *s); /** Decrease reference counter. */ -int signal_put(struct signal *s); +int signal_decref(struct signal *s); int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, unsigned index); diff --git a/lib/hooks/drop.c b/lib/hooks/drop.c index 0d4a8cbd2..70e63ff90 100644 --- a/lib/hooks/drop.c +++ b/lib/hooks/drop.c @@ -50,7 +50,7 @@ static int drop_stop(struct hook *h) struct drop *d = (struct drop *) h->_vd; if (d->prev) - sample_put(d->prev); + sample_decref(d->prev); return 0; } @@ -91,9 +91,9 @@ ok: /* To discard the first X samples in 'smps[]' we must } if (cur) - sample_get(cur); + sample_incref(cur); if (d->prev) - sample_put(d->prev); + sample_decref(d->prev); d->prev = cur; @@ -107,7 +107,7 @@ static int drop_restart(struct hook *h) struct drop *d = (struct drop *) h->_vd; if (d->prev) { - sample_put(d->prev); + sample_decref(d->prev); d->prev = NULL; } diff --git a/lib/hooks/restart.c b/lib/hooks/restart.c index 1e8c73902..0aff2eb11 100644 --- a/lib/hooks/restart.c +++ b/lib/hooks/restart.c @@ -49,7 +49,7 @@ static int restart_stop(struct hook *h) struct restart *r = (struct restart *) h->_vd; if (r->prev) - sample_put(r->prev); + sample_decref(r->prev); return 0; } @@ -90,9 +90,9 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt) } if (cur) - sample_get(cur); + sample_incref(cur); if (r->prev) - sample_put(r->prev); + sample_decref(r->prev); r->prev = cur; diff --git a/lib/hooks/stats.c b/lib/hooks/stats.c index 76704373f..b3f5c87a8 100644 --- a/lib/hooks/stats.c +++ b/lib/hooks/stats.c @@ -178,10 +178,10 @@ static int stats_collect_read(struct hook *h, struct sample *smps[], unsigned *c } if (p->last) - sample_put(p->last); + sample_decref(p->last); if (previous) - sample_get(previous); + sample_incref(previous); p->last = previous; diff --git a/lib/nodes/iec61850_sv.c b/lib/nodes/iec61850_sv.c index 1ed940e56..5ceb8a26d 100644 --- a/lib/nodes/iec61850_sv.c +++ b/lib/nodes/iec61850_sv.c @@ -356,7 +356,7 @@ int iec61850_sv_read(struct node *n, struct sample *smps[], unsigned cnt, unsign pulled = queue_signalled_pull_many(&i->subscriber.queue, (void **) smpt, cnt); sample_copy_many(smps, smpt, pulled); - sample_put_many(smpt, pulled); + sample_decref_many(smpt, pulled); return pulled; } diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index ed597c395..2356a3e2e 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -48,14 +48,14 @@ static int ib_disconnect(struct node *n) ib->conn.available_recv_wrs -= wcs; for (int j = 0; j < wcs; j++) - sample_put((struct sample *) (wc[j].wr_id)); + sample_decref((struct sample *) (wc[j].wr_id)); } // Send Queue while ((wcs = ibv_poll_cq(ib->ctx.send_cq, ib->send_cq_size, wc))) for (int j = 0; j < wcs; j++) if (wc[j].wr_id > 0) - sample_put((struct sample *) (wc[j].wr_id)); + sample_decref((struct sample *) (wc[j].wr_id)); // Send Queue Stack @@ -65,7 +65,7 @@ static int ib_disconnect(struct node *n) // to double check return of queue_pull. queue_pull(&ib->conn.send_wc_buffer, (void **) &smp); - sample_put(smp); + sample_decref(smp); } // Destroy QP diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c index 40ab4773a..5f61ea8ef 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.c @@ -80,7 +80,7 @@ int loopback_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned for (int i = 0; i < avail; i++) { sample_copy(smps[i], cpys[i]); - sample_put(cpys[i]); + sample_decref(cpys[i]); } return avail; diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c index 86fd1e78e..4e105e7ad 100644 --- a/lib/nodes/mqtt.c +++ b/lib/nodes/mqtt.c @@ -102,7 +102,7 @@ static void mqtt_message_cb(struct mosquitto *mosq, void *userdata, const struct } if (ret == 0) { debug(4, "MQTT: skip empty message for node %s", node_name(n)); - sample_put_many(smps, n->in.vectorize); + sample_decref_many(smps, n->in.vectorize); return; } @@ -373,7 +373,7 @@ int mqtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel pulled = queue_signalled_pull_many(&m->queue, (void **) smpt, cnt); sample_copy_many(smps, smpt, pulled); - sample_put_many(smpt, pulled); + sample_decref_many(smpt, pulled); return pulled; } diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 38bfbc249..23666cfb9 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -129,7 +129,7 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re } sample_copy_many(smps, shared_smps, recv); - sample_put_many(shared_smps, recv); + sample_decref_many(shared_smps, recv); return recv; } diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 12aba03fa..4fa3849c3 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -117,7 +117,7 @@ static int websocket_connection_destroy(struct websocket_connection *c) int avail; struct sample *smp; while ((avail = queue_pull(&c->queue, (void **) &smp))) - sample_put(smp); + sample_decref(smp); ret = queue_destroy(&c->queue); if (ret) @@ -154,7 +154,7 @@ static int websocket_connection_write(struct websocket_connection *c, struct sam if (pushed < cnt) warn("Queue overrun in WebSocket connection: %s", websocket_connection_name(c)); - sample_get_many(smps, pushed); + sample_incref_many(smps, pushed); debug(LOG_WEBSOCKET | 10, "Enqueued %u samples to %s", pushed, websocket_connection_name(c)); @@ -283,7 +283,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi ret = lws_write(wsi, (unsigned char *) c->buffers.send.buf + LWS_PRE, wbytes, c->io.flags & FORMAT_TYPE_BINARY ? LWS_WRITE_BINARY : LWS_WRITE_TEXT); - sample_put_many(smps, pulled); + sample_decref_many(smps, pulled); debug(LOG_WEBSOCKET | 10, "Send %d samples to connection: %s, bytes=%d", pulled, websocket_connection_name(c), ret); } @@ -341,7 +341,7 @@ int websocket_protocol_cb(struct lws *wsi, enum lws_callback_reasons reason, voi /* Release unused samples back to pool */ if (enqueued < avail) - sample_put_many(&smps[enqueued], avail - enqueued); + sample_decref_many(&smps[enqueued], avail - enqueued); buffer_clear(&c->buffers.recv); @@ -481,7 +481,7 @@ int websocket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned return avail; sample_copy_many(smps, cpys, avail); - sample_put_many(cpys, avail); + sample_decref_many(cpys, avail); return avail; } @@ -507,7 +507,7 @@ int websocket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigne websocket_connection_write(c, cpys, cnt); } - sample_put_many(cpys, avail); + sample_decref_many(cpys, avail); return cnt; } diff --git a/lib/path.c b/lib/path.c index 523f96cb8..f0f32a445 100644 --- a/lib/path.c +++ b/lib/path.c @@ -147,8 +147,8 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) } } - sample_put_many(muxed_smps, tomux); -out2: sample_put_many(read_smps, release); + sample_decref_many(muxed_smps, tomux); +out2: sample_decref_many(read_smps, release); } static int path_destination_init(struct path_destination *pd, int queuelen) @@ -191,12 +191,12 @@ static void path_destination_enqueue(struct path *p, struct sample *smps[], unsi warn("Queue overrun for path %s", path_name(p)); /* Increase reference counter of these samples as they are now also owned by the queue. */ - sample_get_many(clones, cloned); + sample_incref_many(clones, cloned); debug(LOG_PATH | 15, "Enqueued %u samples to destination %s of path %s", enqueued, node_name(pd->node), path_name(p)); } - sample_put_many(clones, cloned); + sample_decref_many(clones, cloned); } static void path_destination_write(struct path_destination *pd, struct path *p) @@ -227,7 +227,7 @@ static void path_destination_write(struct path_destination *pd, struct path *p) else if (sent < allocated) warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, allocated); - released = sample_put_many(smps, release); + released = sample_decref_many(smps, release); debug(LOG_PATH | 15, "Released %d samples back to memory pool", released); } diff --git a/lib/sample.c b/lib/sample.c index 997eb4722..9bae7afd8 100644 --- a/lib/sample.c +++ b/lib/sample.c @@ -107,32 +107,32 @@ void sample_free_many(struct sample *smps[], int cnt) sample_free(smps[i]); } -int sample_put_many(struct sample *smps[], int cnt) +int sample_decref_many(struct sample *smps[], int cnt) { int released = 0; for (int i = 0; i < cnt; i++) { - if (sample_put(smps[i]) == 0) + if (sample_decref(smps[i]) == 0) released++; } return released; } -int sample_get_many(struct sample *smps[], int cnt) +int sample_incref_many(struct sample *smps[], int cnt) { for (int i = 0; i < cnt; i++) - sample_get(smps[i]); + sample_incref(smps[i]); return cnt; } -int sample_get(struct sample *s) +int sample_incref(struct sample *s) { return atomic_fetch_add(&s->refcnt, 1) + 1; } -int sample_put(struct sample *s) +int sample_decref(struct sample *s) { int prev = atomic_fetch_sub(&s->refcnt, 1); diff --git a/lib/signal.c b/lib/signal.c index 92a690ce0..4728f9040 100644 --- a/lib/signal.c +++ b/lib/signal.c @@ -99,12 +99,12 @@ int signal_destroy(struct signal *s) return 0; } -int signal_get(struct signal *s) +int signal_incref(struct signal *s) { return atomic_fetch_add(&s->refcnt, 1) + 1; } -int signal_put(struct signal *s) +int signal_decref(struct signal *s) { int prev = atomic_fetch_sub(&s->refcnt, 1); diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp index fb909d99c..5743ec584 100644 --- a/src/villas-pipe.cpp +++ b/src/villas-pipe.cpp @@ -171,7 +171,7 @@ static void * send_loop(void *ctx) else if (sent < scanned) warn("Failed to sent %d out of %d samples to node %s", scanned-sent, scanned, node_name(node)); - sample_put_many(smps, release); + sample_decref_many(smps, release); cnt += sent; if (sendd.limit > 0 && cnt >= sendd.limit) @@ -226,7 +226,7 @@ static void * recv_loop(void *ctx) io_print(&io, smps, recv); - sample_put_many(smps, release); + sample_decref_many(smps, release); cnt += recv; if (recvv.limit > 0 && cnt >= recvv.limit) diff --git a/src/villas-signal.cpp b/src/villas-signal.cpp index c397eb970..2754d18c9 100644 --- a/src/villas-signal.cpp +++ b/src/villas-signal.cpp @@ -182,7 +182,7 @@ int main(int argc, char *argv[]) node_read(&n, &t, 1, &release); io_print(&io, &t, 1); - sample_put(t); + sample_decref(t); } return 0; diff --git a/src/villas-test-cmp.cpp b/src/villas-test-cmp.cpp index b5fa6eadf..9301cae9a 100644 --- a/src/villas-test-cmp.cpp +++ b/src/villas-test-cmp.cpp @@ -200,7 +200,7 @@ out: for (int i = 0; i < n; i++) { if (ret) error("Failed to destroy IO"); - sample_put(s[i].sample); + sample_decref(s[i].sample); } ret = pool_destroy(&pool); diff --git a/src/villas-test-shmem.cpp b/src/villas-test-shmem.cpp index 8deaafb6a..5e645f325 100644 --- a/src/villas-test-shmem.cpp +++ b/src/villas-test-shmem.cpp @@ -103,7 +103,7 @@ int main(int argc, char* argv[]) } for (int i = 0; i < readcnt; i++) - sample_put(insmps[i]); + sample_decref(insmps[i]); writecnt = shmem_int_write(&shm, outsmps, avail); if (writecnt < avail)