From 39d7c451fba3661e82586e045eade88d3614d8cd Mon Sep 17 00:00:00 2001 From: Georg Reinke Date: Wed, 12 Apr 2017 14:38:18 +0200 Subject: [PATCH] shm node: signal the other program if one exits --- include/villas/shmem.h | 6 +++++- lib/nodes/shmem.c | 32 +++++++++++++++++++++----------- lib/shmem.c | 37 +++++++++++++++++++++++++++++-------- src/shmem.c | 19 +++++++++++++++++-- 4 files changed, 72 insertions(+), 22 deletions(-) diff --git a/include/villas/shmem.h b/include/villas/shmem.h index 543936240..5811eb2b0 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -19,9 +19,13 @@ struct shmem_shared { union shmem_queue out; /**< Queue for samples passed from node to external program.*/ int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */ struct pool pool; /**< Pool for the samples in the queues. */ + atomic_size_t node_stopped; + atomic_size_t ext_stopped; }; -struct shmem_shared * shmem_shared_open(const char* name); +struct shmem_shared * shmem_shared_open(const char* name, void **base_ptr); + +int shmem_shared_close(struct shmem_shared *shm, void *base); int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt); diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 69a1ec689..0c890b6d6 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -79,15 +79,15 @@ int shmem_open(struct node *n) { int shmem_close(struct node *n) { struct shmem* shm = n->_vd; size_t len = shm->shared->len; - if (shm->cond_in) - queue_signalled_destroy(&shm->shared->in.qs); - else - queue_destroy(&shm->shared->in.q); - if (shm->cond_out) - queue_signalled_destroy(&shm->shared->out.qs); - else - queue_destroy(&shm->shared->out.q); - pool_destroy(&shm->shared->pool); + atomic_store_explicit(&shm->shared->node_stopped, 1, memory_order_relaxed); + if (shm->cond_out) { + pthread_mutex_lock(&shm->shared->out.qs.mt); + pthread_cond_broadcast(&shm->shared->out.qs.ready); + pthread_mutex_unlock(&shm->shared->out.qs.mt); + } + /* Don't destroy the data structures yet, since the other process might + * still be using them. Once both processes are done and have unmapped the + * memory, it will be freed anyway. */ int r = munmap(shm->base, len); if (r != 0) return r; @@ -96,9 +96,14 @@ int shmem_close(struct node *n) { int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) { struct shmem *shm = n->_vd; + int r; if (shm->cond_in) - return queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt); - return queue_pull_many(&shm->shared->in.q, (void**) smps, cnt); + r = queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt); + else + r = queue_pull_many(&shm->shared->in.q, (void**) smps, cnt); + if (!r && atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) + return -1; + return r; } int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) { @@ -122,6 +127,11 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) { sample_get(shared_smps[i]); } int pushed; + if (atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) { + for (int i = 0; i < avail; i++) + sample_put(shared_smps[i]); + return -1; + } if (shm->cond_out) pushed = queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail); else diff --git a/lib/shmem.c b/lib/shmem.c index 401405e53..7feab1ff9 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -24,7 +24,7 @@ size_t shmem_total_size(int insize, int outsize, int sample_size) + 1024; } -struct shmem_shared* shmem_shared_open(const char *name) +struct shmem_shared* shmem_shared_open(const char *name, void **base_ptr) { int fd = shm_open(name, O_RDWR, 0); if (fd < 0) @@ -47,19 +47,40 @@ struct shmem_shared* shmem_shared_open(const char *name) return NULL; /* Adress might have moved */ cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock); + if (base_ptr) + *base_ptr = base; return (struct shmem_shared *) cptr; } -int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) { - if (shm->cond_out) - return queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt); - else - return queue_pull_many(&shm->out.q, (void **) smps, cnt); +int shmem_shared_close(struct shmem_shared *shm, void *base) +{ + atomic_store_explicit(&shm->ext_stopped, 1, memory_order_relaxed); + if (shm->cond_in) { + pthread_mutex_lock(&shm->in.qs.mt); + pthread_cond_broadcast(&shm->in.qs.ready); + pthread_mutex_unlock(&shm->in.qs.mt); + } + return munmap(base, shm->len); } -int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) { +int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) +{ + int r; + if (shm->cond_out) + r = queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt); + else + r = queue_pull_many(&shm->out.q, (void **) smps, cnt); + if (!r && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed)) + return -1; + return r; +} + +int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) +{ + if (atomic_load_explicit(&shm->node_stopped, memory_order_relaxed)) + return -1; if (shm->cond_in) return queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt); else - return queue_pull_many(&shm->in.q, (void **) smps, cnt); + return queue_push_many(&shm->in.q, (void **) smps, cnt); } diff --git a/src/shmem.c b/src/shmem.c index 81262f1cc..8949ec95b 100644 --- a/src/shmem.c +++ b/src/shmem.c @@ -7,7 +7,7 @@ #include "node.h" #include "nodes/shmem.h" #include "pool.h" -#include "queue.h" +#include "queue_signalled.h" #include "sample.h" #include "sample_io.h" #include "shmem.h" @@ -17,12 +17,21 @@ #define VECTORIZE 8 +void *base; +struct shmem_shared *shared; + void usage() { printf("Usage: villas-shmem SHM_NAME\n"); printf(" SHMNAME name of the shared memory object\n"); } +void quit(int sig) +{ + shmem_shared_close(shared, base); + exit(1); +} + int main(int argc, char* argv[]) { if (argc != 2) { @@ -30,14 +39,20 @@ int main(int argc, char* argv[]) return 1; } - struct shmem_shared *shared = shmem_shared_open(argv[1]); + shared = shmem_shared_open(argv[1], &base); if (!shared) serror("Failed to open shmem interface"); + signal(SIGINT, quit); + signal(SIGTERM, quit); struct sample *insmps[VECTORIZE], *outsmps[VECTORIZE]; while (1) { int r, w; r = shmem_shared_read(shared, insmps, VECTORIZE); + if (r == -1) { + printf("node stopped, exiting\n"); + break; + } int avail = sample_alloc(&shared->pool, outsmps, r); if (avail < r) warn("pool underrun (%d/%d)\n", avail, r);