diff --git a/include/villas/queue_signalled.h b/include/villas/queue_signalled.h index bf6913855..d36ae30a3 100644 --- a/include/villas/queue_signalled.h +++ b/include/villas/queue_signalled.h @@ -18,10 +18,16 @@ struct queue_signalled { pthread_mutex_t mutex; /**< Mutex for ready. */ }; +#define queue_signalled_available(q) queue_available(&((q)->queue)) + int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem); int queue_signalled_destroy(struct queue_signalled *qs); +int queue_signalled_push(struct queue_signalled *qs, void *ptr); + +int queue_signalled_pull(struct queue_signalled *qs, void **ptr); + int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt); int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt); diff --git a/lib/queue_signalled.c b/lib/queue_signalled.c index 0bbbcc7e8..3af163f5c 100644 --- a/lib/queue_signalled.c +++ b/lib/queue_signalled.c @@ -47,6 +47,21 @@ int queue_signalled_destroy(struct queue_signalled *qs) return 0; } +int queue_signalled_push(struct queue_signalled *qs, void *ptr) +{ + int ret; + + ret = queue_push(&qs->queue, ptr); + if (ret < 0) + return ret; + + pthread_mutex_lock(&qs->mutex); + pthread_cond_broadcast(&qs->ready); + pthread_mutex_unlock(&qs->mutex); + + return ret; +} + int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt) { int ret; @@ -62,6 +77,18 @@ int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cn return ret; } +int queue_signalled_pull(struct queue_signalled *qs, void **ptr) +{ + /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ + pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex); + pthread_mutex_lock(&qs->mutex); + pthread_cond_wait(&qs->ready, &qs->mutex); + pthread_mutex_unlock(&qs->mutex); + pthread_cleanup_pop(0); + + return queue_pull(&qs->queue, ptr); +} + int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt) { /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */