From 1067c66ab8f23401431a4f5aedba5864e6c6bd7b Mon Sep 17 00:00:00 2001 From: Georg Reinke Date: Fri, 12 May 2017 11:36:23 +0200 Subject: [PATCH 1/3] shmem node: copy samples when reading --- lib/nodes/shmem.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 4156f13ec..0ea7b4a9c 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -33,6 +33,7 @@ #include "nodes/shmem.h" #include "plugin.h" #include "shmem.h" +#include "timing.h" #include "utils.h" int shmem_parse(struct node *n, config_setting_t *cfg) @@ -159,8 +160,9 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) int ret, recv; - recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) smps, cnt) - : queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt); + struct sample *shared_smps[cnt]; + recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) shared_smps, cnt) + : queue_signalled_pull_many(&shm->shared->in.qs, (void**) shared_smps, cnt); if (recv <= 0) return recv; @@ -168,8 +170,13 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) /* Check if remote process is still running */ ret = atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed); if (ret) - return ret; + return -1; + sample_copy_many(smps, shared_smps, recv); + sample_put_many(shared_smps, recv); + struct timespec ts_recv = time_now(); + for (int i = 0; i < recv; i++) + smps[i]->ts.received = ts_recv; return recv; } From 959d8aa796cd6082dc950f7a18e35159664e657f Mon Sep 17 00:00:00 2001 From: Georg Reinke Date: Fri, 12 May 2017 11:35:57 +0200 Subject: [PATCH 2/3] fix race condition when starting web service --- lib/web.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/web.c b/lib/web.c index 11f87c5a3..b7f6bec7f 100644 --- a/lib/web.c +++ b/lib/web.c @@ -223,11 +223,11 @@ int web_start(struct web *w) error("WebSocket: failed to initialize server"); } + w->state = STATE_STARTED; ret = pthread_create(&w->thread, NULL, worker, w); if (ret) error("Failed to start Web worker"); - w->state = STATE_STARTED; return ret; } From d17300e2760e5528558aeba655fd4de2b15ba9fc Mon Sep 17 00:00:00 2001 From: Georg Reinke Date: Fri, 12 May 2017 11:35:21 +0200 Subject: [PATCH 3/3] remove unnecessary sample_get in shmem node --- include/villas/sample.h | 2 +- lib/nodes/shmem.c | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/include/villas/sample.h b/include/villas/sample.h index f7fbffe56..707058739 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -78,7 +78,7 @@ struct sample { }; /** Request \p cnt samples from memory pool \p p and initialize them. - * This will leave the reference count of the sample to zero. + * The reference count will already be set to 1. * Use the sample_get() function to increase it. */ int sample_alloc(struct pool *p, struct sample *smps[], int cnt); diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 0ea7b4a9c..0541b2db8 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -203,8 +203,6 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) memcpy(shared_smps[i]->data, smps[i]->data, SAMPLE_DATA_LEN(len)); shared_smps[i]->length = len; - - sample_get(shared_smps[i]); } if (atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) {