From 5d156e867cc0727af4dc70c7f52e1f7a0a41dce9 Mon Sep 17 00:00:00 2001 From: Georg Reinke Date: Fri, 28 Jul 2017 12:17:37 +0200 Subject: [PATCH] shmem: unmap regions lazily on closing --- include/villas/shmem.h | 1 + lib/shmem.c | 41 +++++++++++++++++++++++++++++++++++------ 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/include/villas/shmem.h b/include/villas/shmem.h index 1f189a0a3..141e9c103 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -73,6 +73,7 @@ struct shmem_dir { /** Main structure representing the shared memory interface. */ struct shmem_int { struct shmem_dir read, write; + atomic_int readers, writers, closed; }; /** Open the shared memory objects and retrieve / initialize the shared data structures. diff --git a/lib/shmem.c b/lib/shmem.c index 72228be6b..f0675d2d7 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -138,6 +138,10 @@ int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, shm->read.len = len; shm->read.shared = shared; + shm->readers = 0; + shm->writers = 0; + shm->closed = 0; + /* Unlink the semaphores; we don't need them anymore */ sem_unlink(wname); @@ -146,28 +150,53 @@ int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, int shmem_int_close(struct shmem_int *shm) { + atomic_store(&shm->closed, 1); if (shm->write.shared->polling) queue_close(&shm->write.shared->queue.q); else queue_signalled_close(&shm->write.shared->queue.qs); - munmap(shm->read.base, shm->read.len); - munmap(shm->write.base, shm->write.len); shm_unlink(shm->write.name); + if (atomic_load(&shm->readers) == 0) + munmap(shm->read.base, shm->read.len); + if (atomic_load(&shm->writers) == 0) + munmap(shm->write.base, shm->write.len); return 0; } int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt) { - return shm->read.shared->polling ? queue_pull_many(&shm->read.shared->queue.q, (void **) smps, cnt) - : queue_signalled_pull_many(&shm->read.shared->queue.qs, (void **) smps, cnt); + int ret; + + atomic_fetch_add(&shm->readers, 1); + + if (shm->read.shared->polling) + ret = queue_pull_many(&shm->read.shared->queue.q, (void **) smps, cnt); + else + ret = queue_signalled_pull_many(&shm->read.shared->queue.qs, (void **) smps, cnt); + + if (atomic_fetch_sub(&shm->readers, 1) == 1 && atomic_load(&shm->closed) == 1) + munmap(shm->read.base, shm->read.len); + + return ret; } int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt) { - return shm->write.shared->polling ? queue_push_many(&shm->write.shared->queue.q, (void **) smps, cnt) - : queue_signalled_push_many(&shm->write.shared->queue.qs, (void **) smps, cnt); + int ret; + + atomic_fetch_add(&shm->writers, 1); + + if (shm->write.shared->polling) + ret = queue_push_many(&shm->write.shared->queue.q, (void **) smps, cnt); + else + ret = queue_signalled_push_many(&shm->write.shared->queue.qs, (void **) smps, cnt); + + if (atomic_fetch_sub(&shm->writers, 1) == 1 && atomic_load(&shm->closed) == 1) + munmap(shm->write.base, shm->write.len); + + return ret; } int shmem_int_alloc(struct shmem_int *shm, struct sample *smps[], unsigned cnt)