1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

shmem: simplify code with new QUEUE_SIGNALLED_POLLING mode

This commit is contained in:
Steffen Vogel 2017-08-30 12:37:09 +02:00
parent d7dd7240db
commit d13b617167
2 changed files with 14 additions and 23 deletions

View file

@ -41,25 +41,19 @@ extern "C" {
#define DEFAULT_SHMEM_QUEUELEN 512
#define DEFAULT_SHMEM_SAMPLELEN 64
/** A signalled queue or a regular (polling) queue, depending on the polling setting. */
union shmem_queue {
struct queue q;
struct queue_signalled qs;
};
/** Struct containing all parameters that need to be known when creating a new
* shared memory object. */
struct shmem_conf {
int polling; /**< Whether to use polling instead of POSIX CVs */
int queuelen; /**< Size of the queues (in elements) */
int samplelen; /**< Maximum number of data entries in a single sample */
int samplelen; /**< Maximum number of data entries in a single sample */
};
/** The structure that actually resides in the shared memory. */
struct shmem_shared {
int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */
union shmem_queue queue; /**< Queues for samples passed in both directions. */
struct pool pool; /**< Pool for the samples in the queues. */
struct queue_signalled queue; /**< Queue for samples passed in both directions. */
struct pool pool; /**< Pool for the samples in the queues. */
};
/** Relevant information for one direction of the interface. */

View file

@ -93,9 +93,14 @@ int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm,
memset(shared, 0, sizeof(struct shmem_shared));
shared->polling = conf->polling;
int flags = QUEUE_SIGNALLED_PROCESS_SHARED;
if (conf->polling)
flags |= QUEUE_SIGNALLED_POLLING;
else
flags |= QUEUE_SIGNALLED_PTHREAD;
ret = shared->polling ? queue_init(&shared->queue.q, conf->queuelen, manager)
: queue_signalled_init(&shared->queue.qs, conf->queuelen, manager);
ret = queue_signalled_init(&shared->queue, conf->queuelen, manager, flags);
if (ret) {
errno = ENOMEM;
return -1;
@ -151,10 +156,8 @@ int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm,
int shmem_int_close(struct shmem_int *shm)
{
atomic_store(&shm->closed, 1);
if (shm->write.shared->polling)
queue_close(&shm->write.shared->queue.q);
else
queue_signalled_close(&shm->write.shared->queue.qs);
queue_signalled_close(&shm->write.shared->queue);
shm_unlink(shm->write.name);
if (atomic_load(&shm->readers) == 0)
@ -171,10 +174,7 @@ int shmem_int_read(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
atomic_fetch_add(&shm->readers, 1);
if (shm->read.shared->polling)
ret = queue_pull_many(&shm->read.shared->queue.q, (void **) smps, cnt);
else
ret = queue_signalled_pull_many(&shm->read.shared->queue.qs, (void **) smps, cnt);
ret = queue_signalled_pull_many(&shm->read.shared->queue, (void **) smps, cnt);
if (atomic_fetch_sub(&shm->readers, 1) == 1 && atomic_load(&shm->closed) == 1)
munmap(shm->read.base, shm->read.len);
@ -188,10 +188,7 @@ int shmem_int_write(struct shmem_int *shm, struct sample *smps[], unsigned cnt)
atomic_fetch_add(&shm->writers, 1);
if (shm->write.shared->polling)
ret = queue_push_many(&shm->write.shared->queue.q, (void **) smps, cnt);
else
ret = queue_signalled_push_many(&shm->write.shared->queue.qs, (void **) smps, cnt);
ret = queue_signalled_push_many(&shm->write.shared->queue, (void **) smps, cnt);
if (atomic_fetch_sub(&shm->writers, 1) == 1 && atomic_load(&shm->closed) == 1)
munmap(shm->write.base, shm->write.len);