mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
Merge branch 'fix-queue-signalled' into develop
This commit is contained in:
commit
867fb90f7f
8 changed files with 93 additions and 60 deletions
|
@ -56,7 +56,7 @@ struct queue_cell {
|
|||
struct queue {
|
||||
cacheline_pad_t _pad0; /**< Shared area: all threads read */
|
||||
|
||||
enum state state;
|
||||
_Atomic enum state state;
|
||||
|
||||
struct memtype *mem;
|
||||
size_t buffer_mask;
|
||||
|
@ -104,3 +104,11 @@ int queue_push_many(struct queue *q, void *ptr[], size_t cnt);
|
|||
* \p cnt elements.
|
||||
*/
|
||||
int queue_pull_many(struct queue *q, void *ptr[], size_t cnt);
|
||||
|
||||
/** Closes the queue, causing following writes to fail and following reads (after
|
||||
* the queue is empty) to fail.
|
||||
*
|
||||
* @return 0 on success.
|
||||
* @return -1 on failure.
|
||||
*/
|
||||
int queue_close(struct queue *q);
|
||||
|
|
|
@ -47,3 +47,5 @@ int queue_signalled_pull(struct queue_signalled *qs, void **ptr);
|
|||
int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cnt);
|
||||
|
||||
int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt);
|
||||
|
||||
int queue_signalled_close(struct queue_signalled *qs);
|
||||
|
|
|
@ -78,7 +78,7 @@ struct sample {
|
|||
};
|
||||
|
||||
/** Request \p cnt samples from memory pool \p p and initialize them.
|
||||
* This will leave the reference count of the sample to zero.
|
||||
* The reference count will already be set to 1.
|
||||
* Use the sample_get() function to increase it. */
|
||||
int sample_alloc(struct pool *p, struct sample *smps[], int cnt);
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@
|
|||
#include "nodes/shmem.h"
|
||||
#include "plugin.h"
|
||||
#include "shmem.h"
|
||||
#include "timing.h"
|
||||
#include "utils.h"
|
||||
|
||||
int shmem_parse(struct node *n, config_setting_t *cfg)
|
||||
|
@ -135,13 +136,10 @@ int shmem_close(struct node *n)
|
|||
struct shmem* shm = n->_vd;
|
||||
int ret;
|
||||
|
||||
atomic_store_explicit(&shm->shared->node_stopped, 1, memory_order_relaxed);
|
||||
|
||||
if (!shm->polling) {
|
||||
pthread_mutex_lock(&shm->shared->out.qs.mutex);
|
||||
pthread_cond_broadcast(&shm->shared->out.qs.ready);
|
||||
pthread_mutex_unlock(&shm->shared->out.qs.mutex);
|
||||
}
|
||||
if (shm->polling)
|
||||
queue_close(&shm->shared->out.q);
|
||||
else
|
||||
queue_signalled_close(&shm->shared->out.qs);
|
||||
|
||||
/* Don't destroy the data structures yet, since the other process might
|
||||
* still be using them. Once both processes are done and have unmapped the
|
||||
|
@ -157,19 +155,20 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
{
|
||||
struct shmem *shm = n->_vd;
|
||||
|
||||
int ret, recv;
|
||||
int recv;
|
||||
|
||||
recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) smps, cnt)
|
||||
: queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt);
|
||||
struct sample *shared_smps[cnt];
|
||||
recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) shared_smps, cnt)
|
||||
: queue_signalled_pull_many(&shm->shared->in.qs, (void**) shared_smps, cnt);
|
||||
|
||||
if (recv <= 0)
|
||||
return recv;
|
||||
|
||||
/* Check if remote process is still running */
|
||||
ret = atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
sample_copy_many(smps, shared_smps, recv);
|
||||
sample_put_many(shared_smps, recv);
|
||||
struct timespec ts_recv = time_now();
|
||||
for (int i = 0; i < recv; i++)
|
||||
smps[i]->ts.received = ts_recv;
|
||||
return recv;
|
||||
}
|
||||
|
||||
|
@ -196,15 +195,6 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
|
|||
memcpy(shared_smps[i]->data, smps[i]->data, SAMPLE_DATA_LEN(len));
|
||||
|
||||
shared_smps[i]->length = len;
|
||||
|
||||
sample_get(shared_smps[i]);
|
||||
}
|
||||
|
||||
if (atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) {
|
||||
for (int i = 0; i < avail; i++)
|
||||
sample_put(shared_smps[i]);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
pushed = shm->polling ? queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail)
|
||||
|
|
30
lib/queue.c
30
lib/queue.c
|
@ -82,11 +82,6 @@ int queue_destroy(struct queue *q)
|
|||
return ret;
|
||||
}
|
||||
|
||||
/** Return estimation of current queue usage.
|
||||
*
|
||||
* Note: This is only an estimation and not accurate as long other
|
||||
* threads are performing operations.
|
||||
*/
|
||||
size_t queue_available(struct queue *q)
|
||||
{
|
||||
return atomic_load_explicit(&q->tail, memory_order_relaxed) -
|
||||
|
@ -99,6 +94,9 @@ int queue_push(struct queue *q, void *ptr)
|
|||
size_t pos, seq;
|
||||
intptr_t diff;
|
||||
|
||||
if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED)
|
||||
return -1;
|
||||
|
||||
buffer = (struct queue_cell *) ((char *) q + q->buffer_off);
|
||||
pos = atomic_load_explicit(&q->tail, memory_order_relaxed);
|
||||
for (;;) {
|
||||
|
@ -128,6 +126,9 @@ int queue_pull(struct queue *q, void **ptr)
|
|||
size_t pos, seq;
|
||||
intptr_t diff;
|
||||
|
||||
if (atomic_load_explicit(&q->state, memory_order_relaxed) == STATE_STOPPED)
|
||||
return -1;
|
||||
|
||||
buffer = (struct queue_cell *) ((char *) q + q->buffer_off);
|
||||
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
|
||||
for (;;) {
|
||||
|
@ -158,10 +159,13 @@ int queue_push_many(struct queue *q, void *ptr[], size_t cnt)
|
|||
|
||||
for (i = 0; i < cnt; i++) {
|
||||
ret = queue_push(q, ptr[i]);
|
||||
if (!ret)
|
||||
if (ret <= 0)
|
||||
break;
|
||||
}
|
||||
|
||||
if (ret == -1 && i == 0)
|
||||
return -1;
|
||||
|
||||
return i;
|
||||
}
|
||||
|
||||
|
@ -172,9 +176,21 @@ int queue_pull_many(struct queue *q, void *ptr[], size_t cnt)
|
|||
|
||||
for (i = 0; i < cnt; i++) {
|
||||
ret = queue_pull(q, &ptr[i]);
|
||||
if (!ret)
|
||||
if (ret <= 0)
|
||||
break;
|
||||
}
|
||||
|
||||
if (ret == -1 && i == 0)
|
||||
return -1;
|
||||
|
||||
return i;
|
||||
}
|
||||
|
||||
int queue_close(struct queue *q)
|
||||
{
|
||||
enum state expected = STATE_INITIALIZED;
|
||||
if (atomic_compare_exchange_weak_explicit(&q->state, &expected, STATE_STOPPED, memory_order_relaxed, memory_order_relaxed))
|
||||
return 0;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -95,24 +95,59 @@ int queue_signalled_push_many(struct queue_signalled *qs, void *ptr[], size_t cn
|
|||
|
||||
int queue_signalled_pull(struct queue_signalled *qs, void **ptr)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
/* Make sure that qs->mutex is unlocked if this thread gets cancelled. */
|
||||
pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex);
|
||||
pthread_mutex_lock(&qs->mutex);
|
||||
pthread_cond_wait(&qs->ready, &qs->mutex);
|
||||
|
||||
while (!ret) {
|
||||
ret = queue_pull(&qs->queue, ptr);
|
||||
if (ret == -1)
|
||||
break;
|
||||
|
||||
if (!ret)
|
||||
pthread_cond_wait(&qs->ready, &qs->mutex);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&qs->mutex);
|
||||
pthread_cleanup_pop(0);
|
||||
|
||||
return queue_pull(&qs->queue, ptr);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int queue_signalled_pull_many(struct queue_signalled *qs, void *ptr[], size_t cnt)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
/* Make sure that qs->mutex is unlocked if this thread gets cancelled. */
|
||||
pthread_cleanup_push((void (*)(void*)) pthread_mutex_unlock, &qs->mutex);
|
||||
pthread_mutex_lock(&qs->mutex);
|
||||
pthread_cond_wait(&qs->ready, &qs->mutex);
|
||||
|
||||
while (!ret) {
|
||||
ret = queue_pull_many(&qs->queue, ptr, cnt);
|
||||
if (ret == -1)
|
||||
break;
|
||||
|
||||
if (!ret)
|
||||
pthread_cond_wait(&qs->ready, &qs->mutex);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&qs->mutex);
|
||||
pthread_cleanup_pop(0);
|
||||
|
||||
return queue_pull_many(&qs->queue, ptr, cnt);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int queue_signalled_close(struct queue_signalled *qs) {
|
||||
int ret;
|
||||
|
||||
pthread_mutex_lock(&qs->mutex);
|
||||
|
||||
ret = queue_close(&qs->queue);
|
||||
|
||||
pthread_cond_broadcast(&qs->ready);
|
||||
pthread_mutex_unlock(&qs->mutex);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
|
26
lib/shmem.c
26
lib/shmem.c
|
@ -95,38 +95,22 @@ struct shmem_shared * shmem_shared_open(const char *name, void **base_ptr)
|
|||
|
||||
int shmem_shared_close(struct shmem_shared *shm, void *base)
|
||||
{
|
||||
atomic_store_explicit(&shm->ext_stopped, 1, memory_order_relaxed);
|
||||
|
||||
if (!shm->polling) {
|
||||
pthread_mutex_lock(&shm->in.qs.mutex);
|
||||
pthread_cond_broadcast(&shm->in.qs.ready);
|
||||
pthread_mutex_unlock(&shm->in.qs.mutex);
|
||||
}
|
||||
if (shm->polling)
|
||||
queue_close(&shm->in.q);
|
||||
else
|
||||
queue_signalled_close(&shm->in.qs);
|
||||
|
||||
return munmap(base, shm->len);
|
||||
}
|
||||
|
||||
int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int ret;
|
||||
|
||||
ret = shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt)
|
||||
return shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt)
|
||||
: queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt);
|
||||
|
||||
if (!ret && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed))
|
||||
return -1;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
int ret;
|
||||
|
||||
ret = atomic_load_explicit(&shm->node_stopped, memory_order_relaxed);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
return shm->polling ? queue_push_many(&shm->in.q, (void **) smps, cnt)
|
||||
: queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt);
|
||||
}
|
||||
|
|
|
@ -143,8 +143,6 @@ static void * worker(void *ctx)
|
|||
{
|
||||
struct web *w = ctx;
|
||||
|
||||
assert(w->state == STATE_STARTED);
|
||||
|
||||
for (;;)
|
||||
lws_service(w->context, 100);
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue