From 37fc5df25417b632fab33d81e376b0c59ec2c976 Mon Sep 17 00:00:00 2001 From: Georg Reinke Date: Fri, 28 Jul 2017 12:15:56 +0200 Subject: [PATCH 1/3] add polling loop inside shmem_read --- lib/nodes/shmem.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index f98649bfd..1fcbc6054 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -110,14 +110,15 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) int recv; struct sample *shared_smps[cnt]; - recv = shmem_int_read(&shm->intf, shared_smps, cnt); - if (recv < 0) + do { + recv = shmem_int_read(&shm->intf, shared_smps, cnt); + } while (recv == 0); + if (recv < 0) { /* This can only really mean that the other process has exited, so close * the interface to make sure the shared memory object is unlinked */ shmem_int_close(&shm->intf); - - if (recv <= 0) return recv; + } sample_copy_many(smps, shared_smps, recv); sample_put_many(shared_smps, recv); From 29e4facc17fcdb3ef85eda1bdd6117d58eb22f13 Mon Sep 17 00:00:00 2001 From: Georg Reinke Date: Fri, 28 Jul 2017 12:17:24 +0200 Subject: [PATCH 2/3] adjust shmem size definitions --- include/villas/shmem.h | 4 +--- lib/shmem.c | 16 ++++++++-------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/include/villas/shmem.h b/include/villas/shmem.h index f60659e60..1f189a0a3 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -47,8 +47,6 @@ union shmem_queue { struct queue_signalled qs; }; -#define SHMEM_MIN_SIZE (sizeof(struct memtype) + sizeof(struct memblock) + sizeof(pthread_barrier_t) + sizeof(pthread_barrierattr_t)) - /** Struct containing all parameters that need to be known when creating a new * shared memory object. */ struct shmem_conf { @@ -132,7 +130,7 @@ int shmem_int_alloc(struct shmem_int *shm, struct sample *smps[], unsigned cnt); /** Returns the total size of the shared memory region with the given size of * the input/output queues (in elements) and the given number of data elements * per struct sample. */ -size_t shmem_total_size(int insize, int outsize, int sample_size); +size_t shmem_total_size(int queuelen, int samplelen); /** @} */ diff --git a/lib/shmem.c b/lib/shmem.c index e4fda425f..72228be6b 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -33,18 +33,18 @@ #include "sample.h" #include "shmem.h" -size_t shmem_total_size(int insize, int outsize, int sample_size) +size_t shmem_total_size(int queuelen, int samplelen) { /* We have the constant const of the memtype header */ return sizeof(struct memtype) /* and the shared struct itself */ + sizeof(struct shmem_shared) - /* the size of the 2 queues and the queue for the pool */ - + (insize + outsize) * (2 * sizeof(struct queue_cell)) + /* the size of the actual queue and the queue for the pool */ + + queuelen * (2 * sizeof(struct queue_cell)) /* the size of the pool */ - + (insize + outsize) * kernel_get_cacheline_size() * CEIL(SAMPLE_LEN(sample_size), kernel_get_cacheline_size()) - /* a memblock for each allocation (1 shmem_shared, 3 queues, 1 pool) */ - + 5 * sizeof(struct memblock) + + queuelen * kernel_get_cacheline_size() * CEIL(SAMPLE_LEN(samplelen), kernel_get_cacheline_size()) + /* a memblock for each allocation (1 shmem_shared, 2 queues, 1 pool) */ + + 4 * sizeof(struct memblock) /* and some extra buffer for alignment */ + 1024; } @@ -52,7 +52,7 @@ size_t shmem_total_size(int insize, int outsize, int sample_size) int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, struct shmem_conf *conf) { char *cptr; - int fd, ret; + int fd, ret; size_t len; void *base; struct memtype *manager; @@ -74,7 +74,7 @@ int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, if (fd < 0) return -1; - len = shmem_total_size(conf->queuelen, conf->queuelen, conf->samplelen); + len = shmem_total_size(conf->queuelen, conf->samplelen); if (ftruncate(fd, len) < 0) return -1; From 5d156e867cc0727af4dc70c7f52e1f7a0a41dce9 Mon Sep 17 00:00:00 2001 From: Georg Reinke Date: Fri, 28 Jul 2017 12:17:37 +0200 Subject: [PATCH 3/3] shmem: unmap regions lazily on closing --- include/villas/shmem.h | 1 + lib/shmem.c | 41 +++++++++++++++++++++++++++++++++++------ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/include/villas/shmem.h b/include/villas/shmem.h index 1f189a0a3..141e9c103 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -73,6 +73,7 @@ struct shmem_dir { /** Main structure representing the shared memory interface. */ struct shmem_int { struct shmem_dir read, write; + atomic_int readers, writers, closed; }; /** Open the shared memory objects and retrieve / initialize the shared data structures. diff --git a/lib/shmem.c b/lib/shmem.c index 72228be6b..f0675d2d7 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -138,6 +138,10 @@ int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, shm->read.len = len; shm->read.shared = shared; + shm->readers = 0; + shm->writers = 0; + shm->closed = 0; + /* Unlink the semaphores; we don't need them anymore */ sem_unlink(wname); @@ -146,28 +150,53 @@ int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, int shmem_int_close(struct shmem_int *shm) { + atomic_store(&shm->closed, 1); if (shm->write.shared->polling) queue_close(&shm->write.shared->queue.q); else queue_signalled_close(&shm->write.shared->queue.qs); - munmap(shm->read.base, shm->read.len); - munmap(shm->write.base, shm->write.len); shm_unlink(shm->write.name); + if (atomic_load(&shm->readers) == 0) + munmap(shm->read.base, shm->read.len); + if (atomic_load(&shm->writers) == 0) + munmap(shm->write.base, shm->write.len); return 0; } int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt) { - return shm->read.shared->polling ? queue_pull_many(&shm->read.shared->queue.q, (void **) smps, cnt) - : queue_signalled_pull_many(&shm->read.shared->queue.qs, (void **) smps, cnt); + int ret; + + atomic_fetch_add(&shm->readers, 1); + + if (shm->read.shared->polling) + ret = queue_pull_many(&shm->read.shared->queue.q, (void **) smps, cnt); + else + ret = queue_signalled_pull_many(&shm->read.shared->queue.qs, (void **) smps, cnt); + + if (atomic_fetch_sub(&shm->readers, 1) == 1 && atomic_load(&shm->closed) == 1) + munmap(shm->read.base, shm->read.len); + + return ret; } int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt) { - return shm->write.shared->polling ? queue_push_many(&shm->write.shared->queue.q, (void **) smps, cnt) - : queue_signalled_push_many(&shm->write.shared->queue.qs, (void **) smps, cnt); + int ret; + + atomic_fetch_add(&shm->writers, 1); + + if (shm->write.shared->polling) + ret = queue_push_many(&shm->write.shared->queue.q, (void **) smps, cnt); + else + ret = queue_signalled_push_many(&shm->write.shared->queue.qs, (void **) smps, cnt); + + if (atomic_fetch_sub(&shm->writers, 1) == 1 && atomic_load(&shm->closed) == 1) + munmap(shm->write.base, shm->write.len); + + return ret; } int shmem_int_alloc(struct shmem_int *shm, struct sample *smps[], unsigned cnt)