From 1b61d55cabf05adf9eec79b4dd262a62ac7d1f6c Mon Sep 17 00:00:00 2001 From: Georg Reinke Date: Fri, 12 May 2017 13:08:34 +0200 Subject: [PATCH] add close methods for queue and use them for shmem --- include/villas/queue.h | 10 +++++++- include/villas/queue_signalled.h | 2 ++ lib/nodes/shmem.c | 25 ++++-------------- lib/queue.c | 21 +++++++++++++-- lib/queue_signalled.c | 44 ++++++++++++++++++++------------ lib/shmem.c | 26 ++++--------------- 6 files changed, 68 insertions(+), 60 deletions(-) diff --git a/include/villas/queue.h b/include/villas/queue.h index 62b88d268..30e29b64b 100644 --- a/include/villas/queue.h +++ b/include/villas/queue.h @@ -56,7 +56,7 @@ struct queue_cell { struct queue { cacheline_pad_t _pad0; /**< Shared area: all threads read */ - enum state state; + atomic_size_t state; struct memtype *mem; size_t buffer_mask; @@ -104,3 +104,11 @@ int queue_push_many(struct queue *q, void *ptr[], size_t cnt); * \p cnt elements. */ int queue_pull_many(struct queue *q, void *ptr[], size_t cnt); + +/** Closes the queue, causing following writes to fail and following reads (after + * the queue is empty) to fail. + * + * @return 0 on success. + * @return -1 on failure. + */ +int queue_close(struct queue *q); diff --git a/include/villas/queue_signalled.h b/include/villas/queue_signalled.h index 6974e00cd..345eab59b 100644 --- a/include/villas/queue_signalled.h +++ b/include/villas/queue_signalled.h @@ -47,3 +47,5 @@ int queue_signalled_pull(struct queue_signalled *qs, void **ptr); int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt); int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt); + +int queue_signalled_close(struct queue_signalled *qs); diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 0541b2db8..5f7c38c02 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -136,13 +136,10 @@ int shmem_close(struct node *n) struct shmem* shm = n->_vd; int ret; - atomic_store_explicit(&shm->shared->node_stopped, 1, memory_order_relaxed); - - if (!shm->polling) { - pthread_mutex_lock(&shm->shared->out.qs.mutex); - pthread_cond_broadcast(&shm->shared->out.qs.ready); - pthread_mutex_unlock(&shm->shared->out.qs.mutex); - } + if (shm->polling) + queue_close(&shm->shared->out.q); + else + queue_signalled_close(&shm->shared->out.qs); /* Don't destroy the data structures yet, since the other process might * still be using them. Once both processes are done and have unmapped the @@ -158,7 +155,7 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) { struct shmem *shm = n->_vd; - int ret, recv; + int recv; struct sample *shared_smps[cnt]; recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) shared_smps, cnt) @@ -167,11 +164,6 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) if (recv <= 0) return recv; - /* Check if remote process is still running */ - ret = atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed); - if (ret) - return -1; - sample_copy_many(smps, shared_smps, recv); sample_put_many(shared_smps, recv); struct timespec ts_recv = time_now(); @@ -205,13 +197,6 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) shared_smps[i]->length = len; } - if (atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) { - for (int i = 0; i < avail; i++) - sample_put(shared_smps[i]); - - return -1; - } - pushed = shm->polling ? queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail) : queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail); diff --git a/lib/queue.c b/lib/queue.c index 3d61d0dd1..50c9df6d2 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -99,6 +99,8 @@ int queue_push(struct queue *q, void *ptr) size_t pos, seq; intptr_t diff; + if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED) + return -1; buffer = (struct queue_cell *) ((char *) q + q->buffer_off); pos = atomic_load_explicit(&q->tail, memory_order_relaxed); for (;;) { @@ -128,6 +130,8 @@ int queue_pull(struct queue *q, void **ptr) size_t pos, seq; intptr_t diff; + if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED) + return -1; buffer = (struct queue_cell *) ((char *) q + q->buffer_off); pos = atomic_load_explicit(&q->head, memory_order_relaxed); for (;;) { @@ -158,9 +162,11 @@ int queue_push_many(struct queue *q, void *ptr[], size_t cnt) for (i = 0; i < cnt; i++) { ret = queue_push(q, ptr[i]); - if (!ret) + if (ret <= 0) break; } + if (ret == -1 && i == 0) + return -1; return i; } @@ -172,9 +178,20 @@ int queue_pull_many(struct queue *q, void *ptr[], size_t cnt) for (i = 0; i < cnt; i++) { ret = queue_pull(q, &ptr[i]); - if (!ret) + if (ret <= 0) break; } + if (ret == -1 && i == 0) + return -1; return i; } + +int queue_close(struct queue *q) +{ + size_t expected = STATE_INITIALIZED; + if (atomic_compare_exchange_weak_explicit(&q->state, &expected, STATE_STOPPED, memory_order_relaxed, memory_order_relaxed)) { + return 0; + } + return -1; +} diff --git a/lib/queue_signalled.c b/lib/queue_signalled.c index fa0cd6f89..610c60898 100644 --- a/lib/queue_signalled.c +++ b/lib/queue_signalled.c @@ -95,35 +95,47 @@ int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cn int queue_signalled_pull(struct queue_signalled *qs, void **ptr) { - int ret; + int ret = 0; /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex); pthread_mutex_lock(&qs->mutex); - ret = queue_pull(&qs->queue, ptr); - if (!ret) - pthread_cond_wait(&qs->ready, &qs->mutex); + while (!ret) { + ret = queue_pull(&qs->queue, ptr); + if (ret == -1) + break; + if (!ret) + pthread_cond_wait(&qs->ready, &qs->mutex); + } pthread_mutex_unlock(&qs->mutex); pthread_cleanup_pop(0); - if (ret) - return ret; - - return queue_pull(&qs->queue, ptr); + return ret; } int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt) { - int ret; + int ret = 0; /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex); pthread_mutex_lock(&qs->mutex); - ret = queue_pull_many(&qs->queue, ptr, cnt); - if (!ret) - pthread_cond_wait(&qs->ready, &qs->mutex); + while (!ret) { + ret = queue_pull_many(&qs->queue, ptr, cnt); + if (ret == -1) + break; + if (!ret) + pthread_cond_wait(&qs->ready, &qs->mutex); + } pthread_mutex_unlock(&qs->mutex); pthread_cleanup_pop(0); - if (ret) - return ret; - - return queue_pull_many(&qs->queue, ptr, cnt); + return ret; +} + +int queue_signalled_close(struct queue_signalled *qs) { + int ret; + + pthread_mutex_lock(&qs->mutex); + ret = queue_close(&qs->queue); + pthread_cond_broadcast(&qs->ready); + pthread_mutex_unlock(&qs->mutex); + return ret; } diff --git a/lib/shmem.c b/lib/shmem.c index c90617595..79a765abc 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -95,38 +95,22 @@ struct shmem_shared * shmem_shared_open(const char *name, void **base_ptr) int shmem_shared_close(struct shmem_shared *shm, void *base) { - atomic_store_explicit(&shm->ext_stopped, 1, memory_order_relaxed); - - if (!shm->polling) { - pthread_mutex_lock(&shm->in.qs.mutex); - pthread_cond_broadcast(&shm->in.qs.ready); - pthread_mutex_unlock(&shm->in.qs.mutex); - } + if (shm->polling) + queue_close(&shm->in.q); + else + queue_signalled_close(&shm->in.qs); return munmap(base, shm->len); } int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) { - int ret; - - ret = shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt) + return shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt) : queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt); - - if (!ret && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed)) - return -1; - - return ret; } int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) { - int ret; - - ret = atomic_load_explicit(&shm->node_stopped, memory_order_relaxed); - if (ret) - return -1; - return shm->polling ? queue_push_many(&shm->in.q, (void **) smps, cnt) : queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt); }