1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

improve readability

This commit is contained in:
Steffen Vogel 2017-05-14 11:35:49 +02:00
parent 1b61d55cab
commit e95b562c71
2 changed files with 17 additions and 6 deletions

View file

@ -82,11 +82,6 @@ int queue_destroy(struct queue *q)
return ret;
}
/** Return estimation of current queue usage.
*
* Note: This is only an estimation and not accurate as long other
* threads are performing operations.
*/
size_t queue_available(struct queue *q)
{
return atomic_load_explicit(&q->tail, memory_order_relaxed) -
@ -101,6 +96,7 @@ int queue_push(struct queue *q, void *ptr)
if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED)
return -1;
buffer = (struct queue_cell *) ((char *) q + q->buffer_off);
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
for (;;) {
@ -132,6 +128,7 @@ int queue_pull(struct queue *q, void **ptr)
if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED)
return -1;
buffer = (struct queue_cell *) ((char *) q + q->buffer_off);
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
for (;;) {
@ -165,6 +162,7 @@ int queue_push_many(struct queue *q, void *ptr[], size_t cnt)
if (ret <= 0)
break;
}
if (ret == -1 && i == 0)
return -1;
@ -181,6 +179,7 @@ int queue_pull_many(struct queue *q, void *ptr[], size_t cnt)
if (ret <= 0)
break;
}
if (ret == -1 && i == 0)
return -1;
@ -192,6 +191,6 @@ int queue_close(struct queue *q)
size_t expected = STATE_INITIALIZED;
if (atomic_compare_exchange_weak_explicit(&q->state, &expected, STATE_STOPPED, memory_order_relaxed, memory_order_relaxed)) {
return 0;
}
return -1;
}

View file

@ -96,18 +96,23 @@ int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cn
int queue_signalled_pull(struct queue_signalled *qs, void **ptr)
{
int ret = 0;
/* Make sure that qs->mutex is unlocked if this thread gets cancelled. */
pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex);
pthread_mutex_lock(&qs->mutex);
while (!ret) {
ret = queue_pull(&qs->queue, ptr);
if (ret == -1)
break;
if (!ret)
pthread_cond_wait(&qs->ready, &qs->mutex);
}
pthread_mutex_unlock(&qs->mutex);
pthread_cleanup_pop(0);
return ret;
}
@ -118,15 +123,19 @@ int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cn
/* Make sure that qs->mutex is unlocked if this thread gets cancelled. */
pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex);
pthread_mutex_lock(&qs->mutex);
while (!ret) {
ret = queue_pull_many(&qs->queue, ptr, cnt);
if (ret == -1)
break;
if (!ret)
pthread_cond_wait(&qs->ready, &qs->mutex);
}
pthread_mutex_unlock(&qs->mutex);
pthread_cleanup_pop(0);
return ret;
}
@ -134,8 +143,11 @@ int queue_signalled_close(struct queue_signalled *qs) {
int ret;
pthread_mutex_lock(&qs->mutex);
ret = queue_close(&qs->queue);
pthread_cond_broadcast(&qs->ready);
pthread_mutex_unlock(&qs->mutex);
return ret;
}