diff --git a/etc/shmem.conf b/etc/shmem.conf index ecbdacd84..50d0b1665 100644 --- a/etc/shmem.conf +++ b/etc/shmem.conf @@ -24,8 +24,7 @@ nodes = { name = "/villas1", insize = 32, outsize = 32, - cond_in = true, - cond_out = true, + polling = false, sample_size = 4, vectorize = 1 } diff --git a/include/villas/nodes/shmem.h b/include/villas/nodes/shmem.h index e4c5dbfe9..d94d20bda 100644 --- a/include/villas/nodes/shmem.h +++ b/include/villas/nodes/shmem.h @@ -24,8 +24,7 @@ struct shmem { const char* name; /**< Name of the shm object. */ int sample_size; /**< Number of data entries for each sample. */ int insize, outsize; /**< Size of ingoing and outgoing queue, respectively. */ - int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */ - int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */ + int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to outgoing queue. */ char **exec; /**< External program to execute on start. */ struct memtype *manager; /**< Manager for the shared memory region. */ diff --git a/include/villas/shmem.h b/include/villas/shmem.h index d10149db3..7725cae4d 100644 --- a/include/villas/shmem.h +++ b/include/villas/shmem.h @@ -20,8 +20,7 @@ union shmem_queue { struct shmem_shared { size_t len; /**< Total size of the shared memory region.*/ - int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */ - int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */ + int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */ union shmem_queue in; /**< Queue for samples passed from external program to node.*/ union shmem_queue out; /**< Queue for samples passed from node to external program.*/ diff --git a/lib/node.c b/lib/node.c index 275f02eab..963be02c5 100644 --- a/lib/node.c +++ b/lib/node.c @@ -192,7 +192,7 @@ char * node_name_long(struct node *n) if (n->_vt->print) { struct node_type *vt = n->_vt; char *name_long = vt->print(n); - strcatf(&n->_name_long, "%s: %s", node_name(n), name_long); + strcatf(&n->_name_long, "%s: vectorize=%d, %s", node_name(n), n->vectorize, name_long); free(name_long); } else diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index ae6035c02..2d4a98161 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -30,10 +30,8 @@ int shmem_parse(struct node *n, config_setting_t *cfg) shm->outsize = DEFAULT_SHMEM_QUEUESIZE; if (!config_setting_lookup_int(cfg, "sample_size", &shm->sample_size)) cerror(cfg, "Missing sample size setting"); - if (!config_setting_lookup_bool(cfg, "cond_out", &shm->cond_out)) - shm->cond_out = false; - if (!config_setting_lookup_bool(cfg, "cond_in", &shm->cond_in)) - shm->cond_in = false; + if (!config_setting_lookup_bool(cfg, "polling", &shm->polling)) + shm->polling = false; config_setting_t *exec_cfg = config_setting_lookup(cfg, "exec"); if (!exec_cfg) @@ -86,16 +84,15 @@ int shmem_open(struct node *n) memset(shm->shared, 0, sizeof(struct shmem_shared)); shm->shared->len = len; - shm->shared->cond_in = shm->cond_in; - shm->shared->cond_out = shm->cond_out; + shm->shared->polling = shm->polling; - ret = shm->cond_in ? queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager) - : queue_init(&shm->shared->in.q, shm->insize, shm->manager); + ret = shm->polling ? queue_init(&shm->shared->in.q, shm->insize, shm->manager) + : queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager); if (ret) error("Shared memory queue allocation failed (not enough memory?)"); - ret = shm->cond_out ? queue_signalled_init(&shm->shared->out.qs, shm->outsize, shm->manager) - : queue_init(&shm->shared->out.q, shm->outsize, shm->manager); + ret = shm->polling ? queue_init(&shm->shared->out.q, shm->outsize, shm->manager) + : queue_signalled_init(&shm->shared->out.qs, shm->outsize, shm->manager); if (ret) error("Shared memory queue allocation failed (not enough memory?)"); @@ -125,7 +122,7 @@ int shmem_close(struct node *n) atomic_store_explicit(&shm->shared->node_stopped, 1, memory_order_relaxed); - if (shm->cond_out) { + if (!shm->polling) { pthread_mutex_lock(&shm->shared->out.qs.mt); pthread_cond_broadcast(&shm->shared->out.qs.ready); pthread_mutex_unlock(&shm->shared->out.qs.mt); @@ -147,8 +144,8 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt) int ret; - ret = shm->cond_in ? queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt) - : queue_pull_many(&shm->shared->in.q, (void**) smps, cnt); + ret = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) smps, cnt) + : queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt); if (ret <= 0) return ret; @@ -195,8 +192,8 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt) return -1; } - pushed = shm->cond_out ? queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail) - : queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail); + pushed = shm->polling ? queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail) + : queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail); if (pushed != avail) warn("Outgoing queue overrun for node %s", node_name(n)); @@ -209,7 +206,8 @@ char * shmem_print(struct node *n) struct shmem *shm = n->_vd; char *buf = NULL; - strcatf(&buf, "name=%s, insize=%d, outsize=%d, sample_size=%d", shm->name, shm->insize, shm->outsize, shm->sample_size); + strcatf(&buf, "name=%s, insize=%d, outsize=%d, sample_size=%d, polling=%s", + shm->name, shm->insize, shm->outsize, shm->sample_size, shm->polling ? "yes" : "no"); if (shm->exec) { strcatf(&buf, ", exec='"); diff --git a/lib/shmem.c b/lib/shmem.c index 3b44eb0f6..9f6401ac8 100644 --- a/lib/shmem.c +++ b/lib/shmem.c @@ -81,7 +81,7 @@ int shmem_shared_close(struct shmem_shared *shm, void *base) { atomic_store_explicit(&shm->ext_stopped, 1, memory_order_relaxed); - if (shm->cond_in) { + if (!shm->polling) { pthread_mutex_lock(&shm->in.qs.mt); pthread_cond_broadcast(&shm->in.qs.ready); pthread_mutex_unlock(&shm->in.qs.mt); @@ -94,8 +94,8 @@ int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned { int ret; - ret = shm->cond_out ? queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt) - : queue_pull_many(&shm->out.q, (void **) smps, cnt); + ret = shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt) + : queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt); if (!ret && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed)) return -1; @@ -111,6 +111,6 @@ int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned if (ret) return -1; - return shm->cond_in ? queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt) - : queue_push_many(&shm->in.q, (void **) smps, cnt); + return shm->polling ? queue_push_many(&shm->in.q, (void **) smps, cnt) + : queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt); }