From e95b562c712b0fc6ea62d6b8190a774cc81efa88 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sun, 14 May 2017 11:35:49 +0200 Subject: [PATCH] improve readability --- lib/queue.c | 11 +++++------ lib/queue_signalled.c | 12 ++++++++++++ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/lib/queue.c b/lib/queue.c index 50c9df6d2..a10d9c360 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -82,11 +82,6 @@ int queue_destroy(struct queue *q) return ret; } -/** Return estimation of current queue usage. - * - * Note: This is only an estimation and not accurate as long other - * threads are performing operations. - */ size_t queue_available(struct queue *q) { return atomic_load_explicit(&q->tail, memory_order_relaxed) - @@ -101,6 +96,7 @@ int queue_push(struct queue *q, void *ptr) if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED) return -1; + buffer = (struct queue_cell *) ((char *) q + q->buffer_off); pos = atomic_load_explicit(&q->tail, memory_order_relaxed); for (;;) { @@ -132,6 +128,7 @@ int queue_pull(struct queue *q, void **ptr) if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED) return -1; + buffer = (struct queue_cell *) ((char *) q + q->buffer_off); pos = atomic_load_explicit(&q->head, memory_order_relaxed); for (;;) { @@ -165,6 +162,7 @@ int queue_push_many(struct queue *q, void *ptr[], size_t cnt) if (ret <= 0) break; } + if (ret == -1 && i == 0) return -1; @@ -181,6 +179,7 @@ int queue_pull_many(struct queue *q, void *ptr[], size_t cnt) if (ret <= 0) break; } + if (ret == -1 && i == 0) return -1; @@ -192,6 +191,6 @@ int queue_close(struct queue *q) size_t expected = STATE_INITIALIZED; if (atomic_compare_exchange_weak_explicit(&q->state, &expected, STATE_STOPPED, memory_order_relaxed, memory_order_relaxed)) { return 0; - } + return -1; } diff --git a/lib/queue_signalled.c b/lib/queue_signalled.c index 610c60898..cd9058788 100644 --- a/lib/queue_signalled.c +++ b/lib/queue_signalled.c @@ -96,18 +96,23 @@ int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cn int queue_signalled_pull(struct queue_signalled *qs, void **ptr) { int ret = 0; + /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex); pthread_mutex_lock(&qs->mutex); + while (!ret) { ret = queue_pull(&qs->queue, ptr); if (ret == -1) break; + if (!ret) pthread_cond_wait(&qs->ready, &qs->mutex); } + pthread_mutex_unlock(&qs->mutex); pthread_cleanup_pop(0); + return ret; } @@ -118,15 +123,19 @@ int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cn /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex); pthread_mutex_lock(&qs->mutex); + while (!ret) { ret = queue_pull_many(&qs->queue, ptr, cnt); if (ret == -1) break; + if (!ret) pthread_cond_wait(&qs->ready, &qs->mutex); } + pthread_mutex_unlock(&qs->mutex); pthread_cleanup_pop(0); + return ret; } @@ -134,8 +143,11 @@ int queue_signalled_close(struct queue_signalled *qs) { int ret; pthread_mutex_lock(&qs->mutex); + ret = queue_close(&qs->queue); + pthread_cond_broadcast(&qs->ready); pthread_mutex_unlock(&qs->mutex); + return ret; }