1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

new wrapper struct around queue with POSIX CV's

This commit is contained in:
Georg Reinke 2017-04-07 12:18:08 +02:00
parent ee809bd5c9
commit c7e241cb2b
6 changed files with 114 additions and 55 deletions

View file

@ -5,22 +5,19 @@
#include "memory.h"
#include "pool.h"
#include "queue.h"
#include "queue_signalled.h"
#define DEFAULT_SHMEM_QUEUESIZE 512
/** Per-direction shared datastructure for a shmem node. */
struct shmem_queue {
struct queue queue; /**< Actual queue where the samples are passed */
pthread_cond_t ready; /**< Condition variable to signal writes to the queue */
pthread_condattr_t readyattr;
pthread_mutex_t mt; /**< Mutex for ready */
pthread_mutexattr_t mtattr;
union shmem_queue {
struct queue q;
struct queue_signalled qs;
};
/** The structure that actually resides in the shared memory. TODO better name?*/
struct shmem_shared {
struct shmem_queue in; /**< Queue for samples passed from external program to node.*/
struct shmem_queue out; /**< Queue for samples passed from node to external program.*/
union shmem_queue in; /**< Queue for samples passed from external program to node.*/
union shmem_queue out; /**< Queue for samples passed from node to external program.*/
struct pool pool; /**< Pool for the samples in the queues. */
};

View file

@ -0,0 +1,22 @@
#pragma once
#include <pthread.h>
#include "queue.h"
/** Wrapper around queue that uses POSIX CV's for signalling writes. */
struct queue_signalled {
struct queue q; /**< Actual underlying queue. */
pthread_cond_t ready; /**< Condition variable to signal writes to the queue. */
pthread_condattr_t readyattr;
pthread_mutex_t mt; /**< Mutex for ready. */
pthread_mutexattr_t mtattr;
};
int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem);
int queue_signalled_destroy(struct queue_signalled *qs);
int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt);
int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt);

View file

@ -8,7 +8,7 @@ LIB_SRCS = $(addprefix lib/nodes/, file.c cbuilder.c shmem.c) \
$(addprefix lib/kernel/, kernel.c rt.c) \
$(addprefix lib/, sample.c path.c node.c hook.c \
log.c utils.c super_node.c hist.c timing.c pool.c \
list.c queue.c memory.c advio.c web.c api.c \
list.c queue.c queue_signalled.c memory.c advio.c web.c api.c \
plugin.c node_type.c stats.c mapping.c sample_io.c\
)

View file

@ -31,16 +31,6 @@ int shmem_parse(struct node *n, config_setting_t *cfg) {
return 0;
}
/* Helper for initializing condition variables / mutexes. */
void shmem_cond_init(struct shmem_queue *queue) {
pthread_mutexattr_init(&queue->mtattr);
pthread_mutexattr_setpshared(&queue->mtattr, PTHREAD_PROCESS_SHARED);
pthread_condattr_init(&queue->readyattr);
pthread_condattr_setpshared(&queue->readyattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&queue->mt, &queue->mtattr);
pthread_cond_init(&queue->ready, &queue->readyattr);
}
int shmem_open(struct node *n) {
struct shmem *shm = n->_vd;
@ -63,24 +53,36 @@ int shmem_open(struct node *n) {
if (!shm->shared)
error("Shm shared struct allocation failed (not enough memory?)");
memset(shm->shared, 0, sizeof(struct shmem_shared));
if (queue_init(&shm->shared->in.queue, shm->insize, shm->manager) < 0)
error("Shm queue allocation failed (not enough memory?)");
if (queue_init(&shm->shared->out.queue, shm->outsize, shm->manager) < 0)
error("Shm queue allocation failed (not enough memory?)");
if (shm->cond_in) {
if (queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager) < 0)
error("Shm queue allocation failed (not enough memory?)");
} else {
if (queue_init(&shm->shared->in.q, shm->insize, shm->manager) < 0)
error("Shm queue allocation failed (not enough memory?)");
}
if (shm->cond_out) {
if (queue_signalled_init(&shm->shared->out.qs, shm->outsize, shm->manager) < 0)
error("Shm queue allocation failed (not enough memory?)");
} else {
if (queue_init(&shm->shared->out.q, shm->outsize, shm->manager) < 0)
error("Shm queue allocation failed (not enough memory?)");
}
if (pool_init(&shm->shared->pool, shm->insize+shm->outsize, SAMPLE_LEN(shm->sample_size), shm->manager) < 0)
error("Shm pool allocation failed (not enough memory?)");
if (shm->cond_out)
shmem_cond_init(&shm->shared->out);
if (shm->cond_in)
shmem_cond_init(&shm->shared->in);
return 0;
}
int shmem_close(struct node *n) {
struct shmem* shm = n->_vd;
queue_destroy(&shm->shared->in.queue);
queue_destroy(&shm->shared->out.queue);
if (shm->cond_in)
queue_signalled_destroy(&shm->shared->in.qs);
else
queue_destroy(&shm->shared->in.q);
if (shm->cond_out)
queue_signalled_destroy(&shm->shared->out.qs);
else
queue_destroy(&shm->shared->out.q);
pool_destroy(&shm->shared->pool);
int r = munmap(shm->base, shm->len);
if (r != 0)
@ -90,13 +92,9 @@ int shmem_close(struct node *n) {
int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) {
struct shmem *shm = n->_vd;
if (shm->cond_in) {
pthread_mutex_lock(&shm->shared->in.mt);
pthread_cond_wait(&shm->shared->in.ready, &shm->shared->in.mt);
pthread_mutex_unlock(&shm->shared->in.mt);
}
int r = queue_pull_many(&shm->shared->in.queue, (void**) smps, cnt);
return r;
if (shm->cond_in)
return queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt);
return queue_pull_many(&shm->shared->in.q, (void**) smps, cnt);
}
int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) {
@ -119,14 +117,13 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) {
shared_smps[i]->length = len;
sample_get(shared_smps[i]);
}
int pushed = queue_push_many(&shm->shared->out.queue, (void**) shared_smps, avail);
int pushed;
if (shm->cond_out)
pushed = queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail);
else
pushed = queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail);
if (pushed != avail)
warn("Outqueue overrun for shmem node %s", shm->name);
if (pushed && shm->cond_out) {
pthread_mutex_lock(&shm->shared->out.mt);
pthread_cond_broadcast(&shm->shared->out.ready);
pthread_mutex_unlock(&shm->shared->out.mt);
}
return pushed;
}

46
lib/queue_signalled.c Normal file
View file

@ -0,0 +1,46 @@
/** Wrapper around queue that uses POSIX CV's for signalling writes. */
#include "queue_signalled.h"
int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem)
{
int r = queue_init(&qs->q, size, mem);
if (r < 0)
return r;
pthread_mutexattr_init(&qs->mtattr);
pthread_mutexattr_setpshared(&qs->mtattr, PTHREAD_PROCESS_SHARED);
pthread_condattr_init(&qs->readyattr);
pthread_condattr_setpshared(&qs->readyattr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&qs->mt, &qs->mtattr);
pthread_cond_init(&qs->ready, &qs->readyattr);
return 0;
}
int queue_signalled_destroy(struct queue_signalled *qs)
{
int r = queue_destroy(&qs->q);
if (r < 0)
return r;
pthread_cond_destroy(&qs->ready);
pthread_mutex_destroy(&qs->mt);
return 0;
}
int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt)
{
int r = queue_push_many(&qs->q, ptr, cnt);
if (r > 0) {
pthread_mutex_lock(&qs->mt);
pthread_cond_broadcast(&qs->ready);
pthread_mutex_unlock(&qs->mt);
}
return r;
}
int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt)
{
pthread_mutex_lock(&qs->mt);
pthread_cond_wait(&qs->ready, &qs->mt);
pthread_mutex_unlock(&qs->mt);
return queue_pull_many(&qs->q, ptr, cnt);
}

View file

@ -45,12 +45,11 @@ int main(int argc, char* argv[])
struct sample *insmps[node->vectorize], *outsmps[node->vectorize];
while (1) {
if (shm->cond_out) {
pthread_mutex_lock(&shmem->out.mt);
pthread_cond_wait(&shmem->out.ready, &shmem->out.mt);
pthread_mutex_unlock(&shmem->out.mt);
}
int r = queue_pull_many(&shmem->out.queue, (void **) insmps, node->vectorize);
int r, w;
if (shm->cond_out)
r = queue_signalled_pull_many(&shmem->out.qs, (void **) insmps, node->vectorize);
else
r = queue_pull_many(&shmem->out.q, (void **) insmps, node->vectorize);
int avail = sample_alloc(&shmem->pool, outsmps, r);
if (avail < r)
warn("pool underrun (%d/%d)\n", avail, r);
@ -65,13 +64,11 @@ int main(int argc, char* argv[])
}
for (int i = 0; i < r; i++)
sample_put(insmps[i]);
int w = queue_push_many(&shmem->in.queue, (void **) outsmps, avail);
if (shm->cond_in)
w = queue_signalled_push_many(&shmem->in.qs, (void **) outsmps, avail);
else
w = queue_push_many(&shmem->in.q, (void **) outsmps, avail);
if (w < avail)
warn("short write (%d/%d)\n", w, r);
if (shm->cond_in && w) {
pthread_mutex_lock(&shmem->in.mt);
pthread_cond_broadcast(&shmem->in.ready);
pthread_mutex_unlock(&shmem->in.mt);
}
}
}