diff --git a/include/villas/shmem.h b/include/villas/shmem.h index 141e9c103..413ac661c 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -41,25 +41,19 @@ extern "C" { #define DEFAULT_SHMEM_QUEUELEN 512 #define DEFAULT_SHMEM_SAMPLELEN 64 -/** A signalled queue or a regular (polling) queue, depending on the polling setting. */ -union shmem_queue { - struct queue q; - struct queue_signalled qs; -}; - /** Struct containing all parameters that need to be known when creating a new * shared memory object. */ struct shmem_conf { int polling; /**< Whether to use polling instead of POSIX CVs */ int queuelen; /**< Size of the queues (in elements) */ - int samplelen; /**< Maximum number of data entries in a single sample */ + int samplelen; /**< Maximum number of data entries in a single sample */ }; /** The structure that actually resides in the shared memory. */ struct shmem_shared { int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */ - union shmem_queue queue; /**< Queues for samples passed in both directions. */ - struct pool pool; /**< Pool for the samples in the queues. */ + struct queue_signalled queue; /**< Queue for samples passed in both directions. */ + struct pool pool; /**< Pool for the samples in the queues. */ }; /** Relevant information for one direction of the interface. */ diff --git a/lib/shmem.c b/lib/shmem.c index f0675d2d7..74b93bc68 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -93,9 +93,14 @@ int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, memset(shared, 0, sizeof(struct shmem_shared)); shared->polling = conf->polling; + + int flags = QUEUE_SIGNALLED_PROCESS_SHARED; + if (conf->polling) + flags |= QUEUE_SIGNALLED_POLLING; + else + flags |= QUEUE_SIGNALLED_PTHREAD; - ret = shared->polling ? queue_init(&shared->queue.q, conf->queuelen, manager) - : queue_signalled_init(&shared->queue.qs, conf->queuelen, manager); + ret = queue_signalled_init(&shared->queue, conf->queuelen, manager, flags); if (ret) { errno = ENOMEM; return -1; @@ -151,10 +156,8 @@ int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm, int shmem_int_close(struct shmem_int *shm) { atomic_store(&shm->closed, 1); - if (shm->write.shared->polling) - queue_close(&shm->write.shared->queue.q); - else - queue_signalled_close(&shm->write.shared->queue.qs); + + queue_signalled_close(&shm->write.shared->queue); shm_unlink(shm->write.name); if (atomic_load(&shm->readers) == 0) @@ -171,10 +174,7 @@ int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt) atomic_fetch_add(&shm->readers, 1); - if (shm->read.shared->polling) - ret = queue_pull_many(&shm->read.shared->queue.q, (void **) smps, cnt); - else - ret = queue_signalled_pull_many(&shm->read.shared->queue.qs, (void **) smps, cnt); + ret = queue_signalled_pull_many(&shm->read.shared->queue, (void **) smps, cnt); if (atomic_fetch_sub(&shm->readers, 1) == 1 && atomic_load(&shm->closed) == 1) munmap(shm->read.base, shm->read.len); @@ -188,10 +188,7 @@ int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt) atomic_fetch_add(&shm->writers, 1); - if (shm->write.shared->polling) - ret = queue_push_many(&shm->write.shared->queue.q, (void **) smps, cnt); - else - ret = queue_signalled_push_many(&shm->write.shared->queue.qs, (void **) smps, cnt); + ret = queue_signalled_push_many(&shm->write.shared->queue, (void **) smps, cnt); if (atomic_fetch_sub(&shm->writers, 1) == 1 && atomic_load(&shm->closed) == 1) munmap(shm->write.base, shm->write.len);