diff --git a/include/villas/queue.h b/include/villas/queue.h index 62b88d268..50e850260 100644 --- a/include/villas/queue.h +++ b/include/villas/queue.h @@ -56,7 +56,7 @@ struct queue_cell { struct queue { cacheline_pad_t _pad0; /**< Shared area: all threads read */ - enum state state; + _Atomic enum state state; struct memtype *mem; size_t buffer_mask; @@ -104,3 +104,11 @@ int queue_push_many(struct queue *q, void *ptr[], size_t cnt); * \p cnt elements. */ int queue_pull_many(struct queue *q, void *ptr[], size_t cnt); + +/** Closes the queue, causing following writes to fail and following reads (after + * the queue is empty) to fail. + * + * @return 0 on success. + * @return -1 on failure. + */ +int queue_close(struct queue *q); diff --git a/include/villas/queue_signalled.h b/include/villas/queue_signalled.h index 6974e00cd..345eab59b 100644 --- a/include/villas/queue_signalled.h +++ b/include/villas/queue_signalled.h @@ -47,3 +47,5 @@ int queue_signalled_pull(struct queue_signalled *qs, void **ptr); int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt); int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt); + +int queue_signalled_close(struct queue_signalled *qs); diff --git a/include/villas/sample.h b/include/villas/sample.h index f7fbffe56..707058739 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -78,7 +78,7 @@ struct sample { }; /** Request \p cnt samples from memory pool \p p and initialize them. - * This will leave the reference count of the sample to zero. + * The reference count will already be set to 1. * Use the sample_get() function to increase it. */ int sample_alloc(struct pool *p, struct sample *smps[], int cnt); diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 4156f13ec..5f7c38c02 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -33,6 +33,7 @@ #include "nodes/shmem.h" #include "plugin.h" #include "shmem.h" +#include "timing.h" #include "utils.h" int shmem_parse(struct node *n, config_setting_t *cfg) @@ -135,13 +136,10 @@ int shmem_close(struct node *n) struct shmem* shm = n->_vd; int ret; - atomic_store_explicit(&shm->shared->node_stopped, 1, memory_order_relaxed); - - if (!shm->polling) { - pthread_mutex_lock(&shm->shared->out.qs.mutex); - pthread_cond_broadcast(&shm->shared->out.qs.ready); - pthread_mutex_unlock(&shm->shared->out.qs.mutex); - } + if (shm->polling) + queue_close(&shm->shared->out.q); + else + queue_signalled_close(&shm->shared->out.qs); /* Don't destroy the data structures yet, since the other process might * still be using them. Once both processes are done and have unmapped the @@ -157,19 +155,20 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) { struct shmem *shm = n->_vd; - int ret, recv; + int recv; - recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) smps, cnt) - : queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt); + struct sample *shared_smps[cnt]; + recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) shared_smps, cnt) + : queue_signalled_pull_many(&shm->shared->in.qs, (void**) shared_smps, cnt); if (recv <= 0) return recv; - /* Check if remote process is still running */ - ret = atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed); - if (ret) - return ret; - + sample_copy_many(smps, shared_smps, recv); + sample_put_many(shared_smps, recv); + struct timespec ts_recv = time_now(); + for (int i = 0; i < recv; i++) + smps[i]->ts.received = ts_recv; return recv; } @@ -196,15 +195,6 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) memcpy(shared_smps[i]->data, smps[i]->data, SAMPLE_DATA_LEN(len)); shared_smps[i]->length = len; - - sample_get(shared_smps[i]); - } - - if (atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) { - for (int i = 0; i < avail; i++) - sample_put(shared_smps[i]); - - return -1; } pushed = shm->polling ? queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail) diff --git a/lib/queue.c b/lib/queue.c index 3d61d0dd1..2ca6c86c1 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -82,11 +82,6 @@ int queue_destroy(struct queue *q) return ret; } -/** Return estimation of current queue usage. - * - * Note: This is only an estimation and not accurate as long other - * threads are performing operations. - */ size_t queue_available(struct queue *q) { return atomic_load_explicit(&q->tail, memory_order_relaxed) - @@ -99,6 +94,9 @@ int queue_push(struct queue *q, void *ptr) size_t pos, seq; intptr_t diff; + if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED) + return -1; + buffer = (struct queue_cell *) ((char *) q + q->buffer_off); pos = atomic_load_explicit(&q->tail, memory_order_relaxed); for (;;) { @@ -128,6 +126,9 @@ int queue_pull(struct queue *q, void **ptr) size_t pos, seq; intptr_t diff; + if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED) + return -1; + buffer = (struct queue_cell *) ((char *) q + q->buffer_off); pos = atomic_load_explicit(&q->head, memory_order_relaxed); for (;;) { @@ -158,10 +159,13 @@ int queue_push_many(struct queue *q, void *ptr[], size_t cnt) for (i = 0; i < cnt; i++) { ret = queue_push(q, ptr[i]); - if (!ret) + if (ret <= 0) break; } + if (ret == -1 && i == 0) + return -1; + return i; } @@ -172,9 +176,21 @@ int queue_pull_many(struct queue *q, void *ptr[], size_t cnt) for (i = 0; i < cnt; i++) { ret = queue_pull(q, &ptr[i]); - if (!ret) + if (ret <= 0) break; } + if (ret == -1 && i == 0) + return -1; + return i; } + +int queue_close(struct queue *q) +{ + enum state expected = STATE_INITIALIZED; + if (atomic_compare_exchange_weak_explicit(&q->state, &expected, STATE_STOPPED, memory_order_relaxed, memory_order_relaxed)) + return 0; + + return -1; +} diff --git a/lib/queue_signalled.c b/lib/queue_signalled.c index 38f372e49..cd9058788 100644 --- a/lib/queue_signalled.c +++ b/lib/queue_signalled.c @@ -95,24 +95,59 @@ int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cn int queue_signalled_pull(struct queue_signalled *qs, void **ptr) { + int ret = 0; + /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex); pthread_mutex_lock(&qs->mutex); - pthread_cond_wait(&qs->ready, &qs->mutex); + + while (!ret) { + ret = queue_pull(&qs->queue, ptr); + if (ret == -1) + break; + + if (!ret) + pthread_cond_wait(&qs->ready, &qs->mutex); + } + pthread_mutex_unlock(&qs->mutex); pthread_cleanup_pop(0); - return queue_pull(&qs->queue, ptr); + return ret; } int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt) { + int ret = 0; + /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex); pthread_mutex_lock(&qs->mutex); - pthread_cond_wait(&qs->ready, &qs->mutex); + + while (!ret) { + ret = queue_pull_many(&qs->queue, ptr, cnt); + if (ret == -1) + break; + + if (!ret) + pthread_cond_wait(&qs->ready, &qs->mutex); + } + pthread_mutex_unlock(&qs->mutex); pthread_cleanup_pop(0); - return queue_pull_many(&qs->queue, ptr, cnt); + return ret; +} + +int queue_signalled_close(struct queue_signalled *qs) { + int ret; + + pthread_mutex_lock(&qs->mutex); + + ret = queue_close(&qs->queue); + + pthread_cond_broadcast(&qs->ready); + pthread_mutex_unlock(&qs->mutex); + + return ret; } diff --git a/lib/shmem.c b/lib/shmem.c index c90617595..79a765abc 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -95,38 +95,22 @@ struct shmem_shared * shmem_shared_open(const char *name, void **base_ptr) int shmem_shared_close(struct shmem_shared *shm, void *base) { - atomic_store_explicit(&shm->ext_stopped, 1, memory_order_relaxed); - - if (!shm->polling) { - pthread_mutex_lock(&shm->in.qs.mutex); - pthread_cond_broadcast(&shm->in.qs.ready); - pthread_mutex_unlock(&shm->in.qs.mutex); - } + if (shm->polling) + queue_close(&shm->in.q); + else + queue_signalled_close(&shm->in.qs); return munmap(base, shm->len); } int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) { - int ret; - - ret = shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt) + return shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt) : queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt); - - if (!ret && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed)) - return -1; - - return ret; } int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt) { - int ret; - - ret = atomic_load_explicit(&shm->node_stopped, memory_order_relaxed); - if (ret) - return -1; - return shm->polling ? queue_push_many(&shm->in.q, (void **) smps, cnt) : queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt); } diff --git a/lib/web.c b/lib/web.c index 11f87c5a3..c6d1b235b 100644 --- a/lib/web.c +++ b/lib/web.c @@ -143,8 +143,6 @@ static void * worker(void *ctx) { struct web *w = ctx; - assert(w->state == STATE_STARTED); - for (;;) lws_service(w->context, 100);