1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

add close methods for queue and use them for shmem

This commit is contained in:
Georg Reinke 2017-05-12 13:08:34 +02:00
parent 434504b788
commit 1b61d55cab
6 changed files with 68 additions and 60 deletions

View file

@ -56,7 +56,7 @@ struct queue_cell {
struct queue {
cacheline_pad_t _pad0; /**< Shared area: all threads read */
enum state state;
atomic_size_t state;
struct memtype *mem;
size_t buffer_mask;
@ -104,3 +104,11 @@ int queue_push_many(struct queue *q, void *ptr[], size_t cnt);
* \p cnt elements.
*/
int queue_pull_many(struct queue *q, void *ptr[], size_t cnt);
/** Closes the queue, causing following writes to fail and following reads (after
* the queue is empty) to fail.
*
* @return 0 on success.
* @return -1 on failure.
*/
int queue_close(struct queue *q);

View file

@ -47,3 +47,5 @@ int queue_signalled_pull(struct queue_signalled *qs, void **ptr);
int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt);
int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt);
int queue_signalled_close(struct queue_signalled *qs);

View file

@ -136,13 +136,10 @@ int shmem_close(struct node *n)
struct shmem* shm = n->_vd;
int ret;
atomic_store_explicit(&shm->shared->node_stopped, 1, memory_order_relaxed);
if (!shm->polling) {
pthread_mutex_lock(&shm->shared->out.qs.mutex);
pthread_cond_broadcast(&shm->shared->out.qs.ready);
pthread_mutex_unlock(&shm->shared->out.qs.mutex);
}
if (shm->polling)
queue_close(&shm->shared->out.q);
else
queue_signalled_close(&shm->shared->out.qs);
/* Don't destroy the data structures yet, since the other process might
* still be using them. Once both processes are done and have unmapped the
@ -158,7 +155,7 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct shmem *shm = n->_vd;
int ret, recv;
int recv;
struct sample *shared_smps[cnt];
recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) shared_smps, cnt)
@ -167,11 +164,6 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt)
if (recv <= 0)
return recv;
/* Check if remote process is still running */
ret = atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed);
if (ret)
return -1;
sample_copy_many(smps, shared_smps, recv);
sample_put_many(shared_smps, recv);
struct timespec ts_recv = time_now();
@ -205,13 +197,6 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
shared_smps[i]->length = len;
}
if (atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) {
for (int i = 0; i < avail; i++)
sample_put(shared_smps[i]);
return -1;
}
pushed = shm->polling ? queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail)
: queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail);

View file

@ -99,6 +99,8 @@ int queue_push(struct queue *q, void *ptr)
size_t pos, seq;
intptr_t diff;
if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED)
return -1;
buffer = (struct queue_cell *) ((char *) q + q->buffer_off);
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
for (;;) {
@ -128,6 +130,8 @@ int queue_pull(struct queue *q, void **ptr)
size_t pos, seq;
intptr_t diff;
if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED)
return -1;
buffer = (struct queue_cell *) ((char *) q + q->buffer_off);
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
for (;;) {
@ -158,9 +162,11 @@ int queue_push_many(struct queue *q, void *ptr[], size_t cnt)
for (i = 0; i < cnt; i++) {
ret = queue_push(q, ptr[i]);
if (!ret)
if (ret <= 0)
break;
}
if (ret == -1 && i == 0)
return -1;
return i;
}
@ -172,9 +178,20 @@ int queue_pull_many(struct queue *q, void *ptr[], size_t cnt)
for (i = 0; i < cnt; i++) {
ret = queue_pull(q, &ptr[i]);
if (!ret)
if (ret <= 0)
break;
}
if (ret == -1 && i == 0)
return -1;
return i;
}
int queue_close(struct queue *q)
{
size_t expected = STATE_INITIALIZED;
if (atomic_compare_exchange_weak_explicit(&q->state, &expected, STATE_STOPPED, memory_order_relaxed, memory_order_relaxed)) {
return 0;
}
return -1;
}

View file

@ -95,35 +95,47 @@ int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cn
int queue_signalled_pull(struct queue_signalled *qs, void **ptr)
{
int ret;
int ret = 0;
/* Make sure that qs->mutex is unlocked if this thread gets cancelled. */
pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex);
pthread_mutex_lock(&qs->mutex);
ret = queue_pull(&qs->queue, ptr);
if (!ret)
pthread_cond_wait(&qs->ready, &qs->mutex);
while (!ret) {
ret = queue_pull(&qs->queue, ptr);
if (ret == -1)
break;
if (!ret)
pthread_cond_wait(&qs->ready, &qs->mutex);
}
pthread_mutex_unlock(&qs->mutex);
pthread_cleanup_pop(0);
if (ret)
return ret;
return queue_pull(&qs->queue, ptr);
return ret;
}
int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt)
{
int ret;
int ret = 0;
/* Make sure that qs->mutex is unlocked if this thread gets cancelled. */
pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex);
pthread_mutex_lock(&qs->mutex);
ret = queue_pull_many(&qs->queue, ptr, cnt);
if (!ret)
pthread_cond_wait(&qs->ready, &qs->mutex);
while (!ret) {
ret = queue_pull_many(&qs->queue, ptr, cnt);
if (ret == -1)
break;
if (!ret)
pthread_cond_wait(&qs->ready, &qs->mutex);
}
pthread_mutex_unlock(&qs->mutex);
pthread_cleanup_pop(0);
if (ret)
return ret;
return queue_pull_many(&qs->queue, ptr, cnt);
return ret;
}
int queue_signalled_close(struct queue_signalled *qs) {
int ret;
pthread_mutex_lock(&qs->mutex);
ret = queue_close(&qs->queue);
pthread_cond_broadcast(&qs->ready);
pthread_mutex_unlock(&qs->mutex);
return ret;
}

View file

@ -95,38 +95,22 @@ struct shmem_shared * shmem_shared_open(const char *name, void **base_ptr)
int shmem_shared_close(struct shmem_shared *shm, void *base)
{
atomic_store_explicit(&shm->ext_stopped, 1, memory_order_relaxed);
if (!shm->polling) {
pthread_mutex_lock(&shm->in.qs.mutex);
pthread_cond_broadcast(&shm->in.qs.ready);
pthread_mutex_unlock(&shm->in.qs.mutex);
}
if (shm->polling)
queue_close(&shm->in.q);
else
queue_signalled_close(&shm->in.qs);
return munmap(base, shm->len);
}
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
{
int ret;
ret = shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt)
return shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt)
: queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt);
if (!ret && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed))
return -1;
return ret;
}
int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
{
int ret;
ret = atomic_load_explicit(&shm->node_stopped, memory_order_relaxed);
if (ret)
return -1;
return shm->polling ? queue_push_many(&shm->in.q, (void **) smps, cnt)
: queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt);
}