diff --git a/include/villas/sample.h b/include/villas/sample.h index f7fbffe56..707058739 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -78,7 +78,7 @@ struct sample { }; /** Request \p cnt samples from memory pool \p p and initialize them. - * This will leave the reference count of the sample to zero. + * The reference count will already be set to 1. * Use the sample_get() function to increase it. */ int sample_alloc(struct pool *p, struct sample *smps[], int cnt); diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 4156f13ec..0541b2db8 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -33,6 +33,7 @@ #include "nodes/shmem.h" #include "plugin.h" #include "shmem.h" +#include "timing.h" #include "utils.h" int shmem_parse(struct node *n, config_setting_t *cfg) @@ -159,8 +160,9 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) int ret, recv; - recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) smps, cnt) - : queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt); + struct sample *shared_smps[cnt]; + recv = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) shared_smps, cnt) + : queue_signalled_pull_many(&shm->shared->in.qs, (void**) shared_smps, cnt); if (recv <= 0) return recv; @@ -168,8 +170,13 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) /* Check if remote process is still running */ ret = atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed); if (ret) - return ret; + return -1; + sample_copy_many(smps, shared_smps, recv); + sample_put_many(shared_smps, recv); + struct timespec ts_recv = time_now(); + for (int i = 0; i < recv; i++) + smps[i]->ts.received = ts_recv; return recv; } @@ -196,8 +203,6 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) memcpy(shared_smps[i]->data, smps[i]->data, SAMPLE_DATA_LEN(len)); shared_smps[i]->length = len; - - sample_get(shared_smps[i]); } if (atomic_load_explicit(&shm->shared->ext_stopped, memory_order_relaxed)) { diff --git a/lib/web.c b/lib/web.c index 11f87c5a3..b7f6bec7f 100644 --- a/lib/web.c +++ b/lib/web.c @@ -223,11 +223,11 @@ int web_start(struct web *w) error("WebSocket: failed to initialize server"); } + w->state = STATE_STARTED; ret = pthread_create(&w->thread, NULL, worker, w); if (ret) error("Failed to start Web worker"); - w->state = STATE_STARTED; return ret; }