diff --git a/include/villas/queue_signalled.h b/include/villas/queue_signalled.h index d8dfabad6..bf6913855 100644 --- a/include/villas/queue_signalled.h +++ b/include/villas/queue_signalled.h @@ -13,9 +13,9 @@ /** Wrapper around queue that uses POSIX CV's for signalling writes. */ struct queue_signalled { - struct queue q; /**< Actual underlying queue. */ + struct queue queue; /**< Actual underlying queue. */ pthread_cond_t ready; /**< Condition variable to signal writes to the queue. */ - pthread_mutex_t mt; /**< Mutex for ready. */ + pthread_mutex_t mutex; /**< Mutex for ready. */ }; int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem); diff --git a/lib/queue_signalled.c b/lib/queue_signalled.c index bebf6a84e..0bbbcc7e8 100644 --- a/lib/queue_signalled.c +++ b/lib/queue_signalled.c @@ -10,11 +10,11 @@ int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem) { int ret; - - ret = queue_init(&qs->q, size, mem); pthread_condattr_t cvattr; pthread_mutexattr_t mtattr; + + ret = queue_init(&qs->queue, size, mem); if (ret < 0) return ret; @@ -37,12 +37,12 @@ int queue_signalled_destroy(struct queue_signalled *qs) { int ret; - ret = queue_destroy(&qs->q); + ret = queue_destroy(&qs->queue); if (ret < 0) return ret; pthread_cond_destroy(&qs->ready); - pthread_mutex_destroy(&qs->mt); + pthread_mutex_destroy(&qs->mutex); return 0; } @@ -51,25 +51,25 @@ int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cn { int ret; - ret = queue_push_many(&qs->q, ptr, cnt); + ret = queue_push_many(&qs->queue, ptr, cnt); if (ret < 0) return ret; - pthread_mutex_lock(&qs->mt); + pthread_mutex_lock(&qs->mutex); pthread_cond_broadcast(&qs->ready); - pthread_mutex_unlock(&qs->mt); + pthread_mutex_unlock(&qs->mutex); return ret; } int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt) { - /* Make sure that qs->mt is unlocked if this thread gets cancelled. */ - pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mt); - pthread_mutex_lock(&qs->mt); - pthread_cond_wait(&qs->ready, &qs->mt); - pthread_mutex_unlock(&qs->mt); + /* Make sure that qs->mutex is unlocked if this thread gets cancelled. */ + pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex); + pthread_mutex_lock(&qs->mutex); + pthread_cond_wait(&qs->ready, &qs->mutex); + pthread_mutex_unlock(&qs->mutex); pthread_cleanup_pop(0); - return queue_pull_many(&qs->q, ptr, cnt); + return queue_pull_many(&qs->queue, ptr, cnt); } diff --git a/lib/shmem.c b/lib/shmem.c index 9f6401ac8..31508b981 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -82,9 +82,9 @@ int shmem_shared_close(struct shmem_shared *shm, void *base) atomic_store_explicit(&shm->ext_stopped, 1, memory_order_relaxed); if (!shm->polling) { - pthread_mutex_lock(&shm->in.qs.mt); + pthread_mutex_lock(&shm->in.qs.mutex); pthread_cond_broadcast(&shm->in.qs.ready); - pthread_mutex_unlock(&shm->in.qs.mt); + pthread_mutex_unlock(&shm->in.qs.mutex); } return munmap(base, shm->len);