diff --git a/include/villas/queue_signalled.h b/include/villas/queue_signalled.h index 345eab59b..5bc7c12d4 100644 --- a/include/villas/queue_signalled.h +++ b/include/villas/queue_signalled.h @@ -27,16 +27,40 @@ #include "queue.h" +enum queue_signalled_flags { + /* Mode */ + QUEUE_SIGNALLED_AUTO = (0 << 0), /**< We will choose the best method available on the platform */ + QUEUE_SIGNALLED_PTHREAD = (1 << 0), + QUEUE_SIGNALLED_POLLING = (2 << 0), +#ifdef __linux__ + QUEUE_SIGNALLED_EVENTFD = (3 << 0), +#endif + QUEUE_SIGNALLED_MASK = 0xf, + + /* Other flags */ + QUEUE_SIGNALLED_PROCESS_SHARED = (1 << 4) +} mode; + /** Wrapper around queue that uses POSIX CV's for signalling writes. */ struct queue_signalled { struct queue queue; /**< Actual underlying queue. */ - pthread_cond_t ready; /**< Condition variable to signal writes to the queue. */ - pthread_mutex_t mutex; /**< Mutex for ready. */ + + enum queue_signalled_flags mode; + + union { + struct { + pthread_cond_t ready; /**< Condition variable to signal writes to the queue. */ + pthread_mutex_t mutex; /**< Mutex for ready. */ + } pthread; +#ifdef __linux__ + int eventfd; +#endif + }; }; #define queue_signalled_available(q) queue_available(&((q)->queue)) -int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem); +int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem, int flags); int queue_signalled_destroy(struct queue_signalled *qs); @@ -49,3 +73,6 @@ int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cn int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt); int queue_signalled_close(struct queue_signalled *qs); + +/** Returns a file descriptor which can be used with poll / select to wait for new data */ +int queue_signalled_fd(struct queue_signalled *qs); diff --git a/lib/queue_signalled.c b/lib/queue_signalled.c index cd9058788..2d22927b3 100644 --- a/lib/queue_signalled.c +++ b/lib/queue_signalled.c @@ -22,29 +22,71 @@ *********************************************************************************/ #include "queue_signalled.h" +#include "log.h" -int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem) +#ifdef __linux__ + #include +#endif + +static void queue_signalled_cleanup(void *p) +{ + struct queue_signalled *qs = p; + + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) + pthread_mutex_unlock(&qs->pthread.mutex); +} + +int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem, int flags) { int ret; - - pthread_condattr_t cvattr; - pthread_mutexattr_t mtattr; + + qs->mode = flags & QUEUE_SIGNALLED_MASK; + + if (qs->mode == 0) { +#ifdef __linux__ + if (flags & QUEUE_SIGNALLED_PROCESS_SHARED) + qs->mode = QUEUE_SIGNALLED_PTHREAD; + else + qs->mode = QUEUE_SIGNALLED_EVENTFD; +#else + qs->mode = QUEUE_SIGNALLED_PTHREAD; +#endif + } ret = queue_init(&qs->queue, size, mem); if (ret < 0) return ret; - pthread_mutexattr_init(&mtattr); - pthread_condattr_init(&cvattr); + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) { + pthread_condattr_t cvattr; + pthread_mutexattr_t mtattr; + + pthread_mutexattr_init(&mtattr); + pthread_condattr_init(&cvattr); - pthread_mutexattr_setpshared(&mtattr, PTHREAD_PROCESS_SHARED); - pthread_condattr_setpshared(&cvattr, PTHREAD_PROCESS_SHARED); + if (flags & QUEUE_SIGNALLED_PROCESS_SHARED) { + pthread_mutexattr_setpshared(&mtattr, PTHREAD_PROCESS_SHARED); + pthread_condattr_setpshared(&cvattr, PTHREAD_PROCESS_SHARED); + } - pthread_mutex_init(&qs->mutex, &mtattr); - pthread_cond_init(&qs->ready, &cvattr); + pthread_mutex_init(&qs->pthread.mutex, &mtattr); + pthread_cond_init(&qs->pthread.ready, &cvattr); - pthread_mutexattr_destroy(&mtattr); - pthread_condattr_destroy(&cvattr); + pthread_mutexattr_destroy(&mtattr); + pthread_condattr_destroy(&cvattr); + } + else if (qs->mode == QUEUE_SIGNALLED_POLLING) { + /* Nothing todo */ + } +#ifdef __linux__ + else if (qs->mode == QUEUE_SIGNALLED_EVENTFD) { + qs->eventfd = eventfd(0, 0); + if (qs->eventfd < 0) + return -2; + } +#endif + else + return -1; return 0; } @@ -56,98 +98,208 @@ int queue_signalled_destroy(struct queue_signalled *qs) ret = queue_destroy(&qs->queue); if (ret < 0) return ret; - - pthread_cond_destroy(&qs->ready); - pthread_mutex_destroy(&qs->mutex); + + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) { + pthread_cond_destroy(&qs->pthread.ready); + pthread_mutex_destroy(&qs->pthread.mutex); + } + else if (qs->mode == QUEUE_SIGNALLED_POLLING) { + /* Nothing todo */ + } +#ifdef __linux__ + else if (qs->mode == QUEUE_SIGNALLED_EVENTFD) { + ret = close(qs->eventfd); + if (ret) + return ret; + } +#endif + else + return -1; return 0; } int queue_signalled_push(struct queue_signalled *qs, void *ptr) { - int ret; + int ret, pushed; - ret = queue_push(&qs->queue, ptr); - if (ret < 0) - return ret; + pushed = queue_push(&qs->queue, ptr); + if (pushed < 0) + return pushed; + + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) { + pthread_mutex_lock(&qs->pthread.mutex); + pthread_cond_broadcast(&qs->pthread.ready); + pthread_mutex_unlock(&qs->pthread.mutex); + } + else if (qs->mode == QUEUE_SIGNALLED_POLLING) { + /* Nothing todo */ + } +#ifdef __linux__ + else if (qs->mode == QUEUE_SIGNALLED_EVENTFD) { + uint64_t incr = 1; + ret = write(qs->eventfd, &incr, sizeof(incr)); + if (ret < 0) + return ret; + } +#endif + else + return -1; - pthread_mutex_lock(&qs->mutex); - pthread_cond_broadcast(&qs->ready); - pthread_mutex_unlock(&qs->mutex); - - return ret; + return pushed; } int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt) { - int ret; + int ret, pushed; - ret = queue_push_many(&qs->queue, ptr, cnt); - if (ret < 0) - return ret; + pushed = queue_push_many(&qs->queue, ptr, cnt); + if (pushed < 0) + return pushed; - pthread_mutex_lock(&qs->mutex); - pthread_cond_broadcast(&qs->ready); - pthread_mutex_unlock(&qs->mutex); + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) { + pthread_mutex_lock(&qs->pthread.mutex); + pthread_cond_broadcast(&qs->pthread.ready); + pthread_mutex_unlock(&qs->pthread.mutex); + } + else if (qs->mode == QUEUE_SIGNALLED_POLLING) { + /* Nothing todo */ + } +#ifdef __linux__ + else if (qs->mode == QUEUE_SIGNALLED_EVENTFD) { + uint64_t incr = 1; + ret = write(qs->eventfd, &incr, sizeof(incr)); + if (ret < 0) + return ret; + } +#endif + else + return -1; - return ret; + return pushed; } int queue_signalled_pull(struct queue_signalled *qs, void **ptr) { - int ret = 0; + int ret, pulled = 0; /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ - pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex); - pthread_mutex_lock(&qs->mutex); + pthread_cleanup_push(queue_signalled_cleanup, qs); - while (!ret) { - ret = queue_pull(&qs->queue, ptr); - if (ret == -1) + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) + pthread_mutex_lock(&qs->pthread.mutex); + + while (!pulled) { + pulled = queue_pull(&qs->queue, ptr); + if (pulled < 0) break; - - if (!ret) - pthread_cond_wait(&qs->ready, &qs->mutex); + else if (pulled == 0) { + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) + pthread_cond_wait(&qs->pthread.ready, &qs->pthread.mutex); + else if (qs->mode == QUEUE_SIGNALLED_POLLING) + continue; /* Try again */ +#ifdef __linux__ + else if (qs->mode == QUEUE_SIGNALLED_EVENTFD) { + uint64_t cntr; + ret = read(qs->eventfd, &cntr, sizeof(cntr)); + if (ret < 0) + break; + } +#endif + else + break; + } } - pthread_mutex_unlock(&qs->mutex); + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) + pthread_mutex_unlock(&qs->pthread.mutex); + pthread_cleanup_pop(0); - return ret; + return pulled; } int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt) { - int ret = 0; + int ret, pulled = 0; /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ - pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex); - pthread_mutex_lock(&qs->mutex); + pthread_cleanup_push(queue_signalled_cleanup, qs); - while (!ret) { - ret = queue_pull_many(&qs->queue, ptr, cnt); - if (ret == -1) + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) + pthread_mutex_lock(&qs->pthread.mutex); + + while (!pulled) { + pulled = queue_pull_many(&qs->queue, ptr, cnt); + if (pulled < 0) break; - - if (!ret) - pthread_cond_wait(&qs->ready, &qs->mutex); + else if (pulled == 0) { + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) + pthread_cond_wait(&qs->pthread.ready, &qs->pthread.mutex); + else if (qs->mode == QUEUE_SIGNALLED_POLLING) + continue; /* Try again */ +#ifdef __linux__ + else if (qs->mode == QUEUE_SIGNALLED_EVENTFD) { + uint64_t cntr; + ret = read(qs->eventfd, &cntr, sizeof(cntr)); + if (ret < 0) + break; + } +#endif + else + break; + } } - pthread_mutex_unlock(&qs->mutex); + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) + pthread_mutex_unlock(&qs->pthread.mutex); + pthread_cleanup_pop(0); - return ret; + return pulled; } -int queue_signalled_close(struct queue_signalled *qs) { +int queue_signalled_close(struct queue_signalled *qs) +{ int ret; - pthread_mutex_lock(&qs->mutex); + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) + pthread_mutex_lock(&qs->pthread.mutex); ret = queue_close(&qs->queue); + + if (qs->mode == QUEUE_SIGNALLED_PTHREAD) { + pthread_cond_broadcast(&qs->pthread.ready); + pthread_mutex_unlock(&qs->pthread.mutex); + } + else if (qs->mode == QUEUE_SIGNALLED_POLLING) { + /* Nothing todo */ + } +#ifdef __linux__ + else if (qs->mode == QUEUE_SIGNALLED_EVENTFD) { + int ret; + uint64_t incr = 1; - pthread_cond_broadcast(&qs->ready); - pthread_mutex_unlock(&qs->mutex); + ret = write(qs->eventfd, &incr, sizeof(incr)); + if (ret < 0) + return ret; + } +#endif + else + return -1; return ret; } + +int queue_signalled_fd(struct queue_signalled *qs) +{ + switch (qs->mode) { +#ifdef __linux__ + case QUEUE_SIGNALLED_EVENTFD: + return qs->eventfd; +#endif + default: { } + } + + return -1; +}