diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h index de66ff2c8..b56e3afee 100644 --- a/include/villas/nodes/shmem.h +++ b/include/villas/nodes/shmem.h @@ -5,22 +5,19 @@ #include "memory.h" #include "pool.h" #include "queue.h" +#include "queue_signalled.h" #define DEFAULT_SHMEM_QUEUESIZE 512 -/** Per-direction shared datastructure for a shmem node. */ -struct shmem_queue { - struct queue queue; /**< Actual queue where the samples are passed */ - pthread_cond_t ready; /**< Condition variable to signal writes to the queue */ - pthread_condattr_t readyattr; - pthread_mutex_t mt; /**< Mutex for ready */ - pthread_mutexattr_t mtattr; +union shmem_queue { + struct queue q; + struct queue_signalled qs; }; /** The structure that actually resides in the shared memory. TODO better name?*/ struct shmem_shared { - struct shmem_queue in; /**< Queue for samples passed from external program to node.*/ - struct shmem_queue out; /**< Queue for samples passed from node to external program.*/ + union shmem_queue in; /**< Queue for samples passed from external program to node.*/ + union shmem_queue out; /**< Queue for samples passed from node to external program.*/ struct pool pool; /**< Pool for the samples in the queues. */ }; diff --git a/include/villas/queue_signalled.h b/include/villas/queue_signalled.h new file mode 100644 index 000000000..84acb8909 --- /dev/null +++ b/include/villas/queue_signalled.h @@ -0,0 +1,22 @@ +#pragma once + +#include + +#include "queue.h" + +/** Wrapper around queue that uses POSIX CV's for signalling writes. */ +struct queue_signalled { + struct queue q; /**< Actual underlying queue. */ + pthread_cond_t ready; /**< Condition variable to signal writes to the queue. */ + pthread_condattr_t readyattr; + pthread_mutex_t mt; /**< Mutex for ready. */ + pthread_mutexattr_t mtattr; +}; + +int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem); + +int queue_signalled_destroy(struct queue_signalled *qs); + +int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt); + +int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt); diff --git a/lib/Makefile.inc b/lib/Makefile.inc index 7eeaa9935..b20543b9e 100644 --- a/lib/Makefile.inc +++ b/lib/Makefile.inc @@ -8,7 +8,7 @@ LIB_SRCS = $(addprefix lib/nodes/, file.c cbuilder.c shmem.c) \ $(addprefix lib/kernel/, kernel.c rt.c) \ $(addprefix lib/, sample.c path.c node.c hook.c \ log.c utils.c super_node.c hist.c timing.c pool.c \ - list.c queue.c memory.c advio.c web.c api.c \ + list.c queue.c queue_signalled.c memory.c advio.c web.c api.c \ plugin.c node_type.c stats.c mapping.c sample_io.c\ ) diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index c3a777ebd..7444b2f02 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -31,16 +31,6 @@ int shmem_parse(struct node *n, config_setting_t *cfg) { return 0; } -/* Helper for initializing condition variables / mutexes. */ -void shmem_cond_init(struct shmem_queue *queue) { - pthread_mutexattr_init(&queue->mtattr); - pthread_mutexattr_setpshared(&queue->mtattr, PTHREAD_PROCESS_SHARED); - pthread_condattr_init(&queue->readyattr); - pthread_condattr_setpshared(&queue->readyattr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(&queue->mt, &queue->mtattr); - pthread_cond_init(&queue->ready, &queue->readyattr); -} - int shmem_open(struct node *n) { struct shmem *shm = n->_vd; @@ -63,24 +53,36 @@ int shmem_open(struct node *n) { if (!shm->shared) error("Shm shared struct allocation failed (not enough memory?)"); memset(shm->shared, 0, sizeof(struct shmem_shared)); - if (queue_init(&shm->shared->in.queue, shm->insize, shm->manager) < 0) - error("Shm queue allocation failed (not enough memory?)"); - if (queue_init(&shm->shared->out.queue, shm->outsize, shm->manager) < 0) - error("Shm queue allocation failed (not enough memory?)"); + if (shm->cond_in) { + if (queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager) < 0) + error("Shm queue allocation failed (not enough memory?)"); + } else { + if (queue_init(&shm->shared->in.q, shm->insize, shm->manager) < 0) + error("Shm queue allocation failed (not enough memory?)"); + } + if (shm->cond_out) { + if (queue_signalled_init(&shm->shared->out.qs, shm->outsize, shm->manager) < 0) + error("Shm queue allocation failed (not enough memory?)"); + } else { + if (queue_init(&shm->shared->out.q, shm->outsize, shm->manager) < 0) + error("Shm queue allocation failed (not enough memory?)"); + } if (pool_init(&shm->shared->pool, shm->insize+shm->outsize, SAMPLE_LEN(shm->sample_size), shm->manager) < 0) error("Shm pool allocation failed (not enough memory?)"); - if (shm->cond_out) - shmem_cond_init(&shm->shared->out); - if (shm->cond_in) - shmem_cond_init(&shm->shared->in); return 0; } int shmem_close(struct node *n) { struct shmem* shm = n->_vd; - queue_destroy(&shm->shared->in.queue); - queue_destroy(&shm->shared->out.queue); + if (shm->cond_in) + queue_signalled_destroy(&shm->shared->in.qs); + else + queue_destroy(&shm->shared->in.q); + if (shm->cond_out) + queue_signalled_destroy(&shm->shared->out.qs); + else + queue_destroy(&shm->shared->out.q); pool_destroy(&shm->shared->pool); int r = munmap(shm->base, shm->len); if (r != 0) @@ -90,13 +92,9 @@ int shmem_close(struct node *n) { int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) { struct shmem *shm = n->_vd; - if (shm->cond_in) { - pthread_mutex_lock(&shm->shared->in.mt); - pthread_cond_wait(&shm->shared->in.ready, &shm->shared->in.mt); - pthread_mutex_unlock(&shm->shared->in.mt); - } - int r = queue_pull_many(&shm->shared->in.queue, (void**) smps, cnt); - return r; + if (shm->cond_in) + return queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt); + return queue_pull_many(&shm->shared->in.q, (void**) smps, cnt); } int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) { @@ -119,14 +117,13 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) { shared_smps[i]->length = len; sample_get(shared_smps[i]); } - int pushed = queue_push_many(&shm->shared->out.queue, (void**) shared_smps, avail); + int pushed; + if (shm->cond_out) + pushed = queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail); + else + pushed = queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail); if (pushed != avail) warn("Outqueue overrun for shmem node %s", shm->name); - if (pushed && shm->cond_out) { - pthread_mutex_lock(&shm->shared->out.mt); - pthread_cond_broadcast(&shm->shared->out.ready); - pthread_mutex_unlock(&shm->shared->out.mt); - } return pushed; } diff --git a/lib/queue_signalled.c b/lib/queue_signalled.c new file mode 100644 index 000000000..71ae87d21 --- /dev/null +++ b/lib/queue_signalled.c @@ -0,0 +1,46 @@ +/** Wrapper around queue that uses POSIX CV's for signalling writes. */ + +#include "queue_signalled.h" + +int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem) +{ + int r = queue_init(&qs->q, size, mem); + if (r < 0) + return r; + pthread_mutexattr_init(&qs->mtattr); + pthread_mutexattr_setpshared(&qs->mtattr, PTHREAD_PROCESS_SHARED); + pthread_condattr_init(&qs->readyattr); + pthread_condattr_setpshared(&qs->readyattr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&qs->mt, &qs->mtattr); + pthread_cond_init(&qs->ready, &qs->readyattr); + return 0; +} + +int queue_signalled_destroy(struct queue_signalled *qs) +{ + int r = queue_destroy(&qs->q); + if (r < 0) + return r; + pthread_cond_destroy(&qs->ready); + pthread_mutex_destroy(&qs->mt); + return 0; +} + +int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt) +{ + int r = queue_push_many(&qs->q, ptr, cnt); + if (r > 0) { + pthread_mutex_lock(&qs->mt); + pthread_cond_broadcast(&qs->ready); + pthread_mutex_unlock(&qs->mt); + } + return r; +} + +int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt) +{ + pthread_mutex_lock(&qs->mt); + pthread_cond_wait(&qs->ready, &qs->mt); + pthread_mutex_unlock(&qs->mt); + return queue_pull_many(&qs->q, ptr, cnt); +} diff --git a/src/shmem.c b/src/shmem.c index 1f29b2a45..0d55fdc72 100644 --- a/src/shmem.c +++ b/src/shmem.c @@ -45,12 +45,11 @@ int main(int argc, char* argv[]) struct sample *insmps[node->vectorize], *outsmps[node->vectorize]; while (1) { - if (shm->cond_out) { - pthread_mutex_lock(&shmem->out.mt); - pthread_cond_wait(&shmem->out.ready, &shmem->out.mt); - pthread_mutex_unlock(&shmem->out.mt); - } - int r = queue_pull_many(&shmem->out.queue, (void **) insmps, node->vectorize); + int r, w; + if (shm->cond_out) + r = queue_signalled_pull_many(&shmem->out.qs, (void **) insmps, node->vectorize); + else + r = queue_pull_many(&shmem->out.q, (void **) insmps, node->vectorize); int avail = sample_alloc(&shmem->pool, outsmps, r); if (avail < r) warn("pool underrun (%d/%d)\n", avail, r); @@ -65,13 +64,11 @@ int main(int argc, char* argv[]) } for (int i = 0; i < r; i++) sample_put(insmps[i]); - int w = queue_push_many(&shmem->in.queue, (void **) outsmps, avail); + if (shm->cond_in) + w = queue_signalled_push_many(&shmem->in.qs, (void **) outsmps, avail); + else + w = queue_push_many(&shmem->in.q, (void **) outsmps, avail); if (w < avail) warn("short write (%d/%d)\n", w, r); - if (shm->cond_in && w) { - pthread_mutex_lock(&shmem->in.mt); - pthread_cond_broadcast(&shmem->in.ready); - pthread_mutex_unlock(&shmem->in.mt); - } } }