1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

shm node: renamed ‚cond_{in,out}‘ settings to ‚polling‘

This commit is contained in:
Steffen Vogel 2017-04-15 22:45:09 +02:00
parent 426712b56f
commit db3d1477d1
6 changed files with 23 additions and 28 deletions

View file

@ -24,8 +24,7 @@ nodes = {
name = "/villas1",
insize = 32,
outsize = 32,
cond_in = true,
cond_out = true,
polling = false,
sample_size = 4,
vectorize = 1
}

View file

@ -24,8 +24,7 @@ struct shmem {
const char* name; /**< Name of the shm object. */
int sample_size; /**< Number of data entries for each sample. */
int insize, outsize; /**< Size of ingoing and outgoing queue, respectively. */
int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */
int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */
int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to outgoing queue. */
char **exec; /**< External program to execute on start. */
struct memtype *manager; /**< Manager for the shared memory region. */

View file

@ -20,8 +20,7 @@ union shmem_queue {
struct shmem_shared {
size_t len; /**< Total size of the shared memory region.*/
int cond_in; /**< Whether to use a pthread_cond_t to signal if new samples are written to inqueue. */
int cond_out; /**< Whether to use a pthread_cond_t to signal if new samples are written to outqueue. */
int polling; /**< Whether to use a pthread_cond_t to signal if new samples are written to incoming queue. */
union shmem_queue in; /**< Queue for samples passed from external program to node.*/
union shmem_queue out; /**< Queue for samples passed from node to external program.*/

View file

@ -192,7 +192,7 @@ char * node_name_long(struct node *n)
if (n->_vt->print) {
struct node_type *vt = n->_vt;
char *name_long = vt->print(n);
strcatf(&n->_name_long, "%s: %s", node_name(n), name_long);
strcatf(&n->_name_long, "%s: vectorize=%d, %s", node_name(n), n->vectorize, name_long);
free(name_long);
}
else

View file

@ -30,10 +30,8 @@ int shmem_parse(struct node *n, config_setting_t *cfg)
shm->outsize = DEFAULT_SHMEM_QUEUESIZE;
if (!config_setting_lookup_int(cfg, "sample_size", &shm->sample_size))
cerror(cfg, "Missing sample size setting");
if (!config_setting_lookup_bool(cfg, "cond_out", &shm->cond_out))
shm->cond_out = false;
if (!config_setting_lookup_bool(cfg, "cond_in", &shm->cond_in))
shm->cond_in = false;
if (!config_setting_lookup_bool(cfg, "polling", &shm->polling))
shm->polling = false;
config_setting_t *exec_cfg = config_setting_lookup(cfg, "exec");
if (!exec_cfg)
@ -86,16 +84,15 @@ int shmem_open(struct node *n)
memset(shm->shared, 0, sizeof(struct shmem_shared));
shm->shared->len = len;
shm->shared->cond_in = shm->cond_in;
shm->shared->cond_out = shm->cond_out;
shm->shared->polling = shm->polling;
ret = shm->cond_in ? queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager)
: queue_init(&shm->shared->in.q, shm->insize, shm->manager);
ret = shm->polling ? queue_init(&shm->shared->in.q, shm->insize, shm->manager)
: queue_signalled_init(&shm->shared->in.qs, shm->insize, shm->manager);
if (ret)
error("Shared memory queue allocation failed (not enough memory?)");
ret = shm->cond_out ? queue_signalled_init(&shm->shared->out.qs, shm->outsize, shm->manager)
: queue_init(&shm->shared->out.q, shm->outsize, shm->manager);
ret = shm->polling ? queue_init(&shm->shared->out.q, shm->outsize, shm->manager)
: queue_signalled_init(&shm->shared->out.qs, shm->outsize, shm->manager);
if (ret)
error("Shared memory queue allocation failed (not enough memory?)");
@ -125,7 +122,7 @@ int shmem_close(struct node *n)
atomic_store_explicit(&shm->shared->node_stopped, 1, memory_order_relaxed);
if (shm->cond_out) {
if (!shm->polling) {
pthread_mutex_lock(&shm->shared->out.qs.mt);
pthread_cond_broadcast(&shm->shared->out.qs.ready);
pthread_mutex_unlock(&shm->shared->out.qs.mt);
@ -147,8 +144,8 @@ int shmem_read(struct node *n, struct sample *smps[], unsigned cnt)
int ret;
ret = shm->cond_in ? queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt)
: queue_pull_many(&shm->shared->in.q, (void**) smps, cnt);
ret = shm->polling ? queue_pull_many(&shm->shared->in.q, (void**) smps, cnt)
: queue_signalled_pull_many(&shm->shared->in.qs, (void**) smps, cnt);
if (ret <= 0)
return ret;
@ -195,8 +192,8 @@ int shmem_write(struct node *n, struct sample *smps[], unsigned cnt)
return -1;
}
pushed = shm->cond_out ? queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail)
: queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail);
pushed = shm->polling ? queue_push_many(&shm->shared->out.q, (void**) shared_smps, avail)
: queue_signalled_push_many(&shm->shared->out.qs, (void**) shared_smps, avail);
if (pushed != avail)
warn("Outgoing queue overrun for node %s", node_name(n));
@ -209,7 +206,8 @@ char * shmem_print(struct node *n)
struct shmem *shm = n->_vd;
char *buf = NULL;
strcatf(&buf, "name=%s, insize=%d, outsize=%d, sample_size=%d", shm->name, shm->insize, shm->outsize, shm->sample_size);
strcatf(&buf, "name=%s, insize=%d, outsize=%d, sample_size=%d, polling=%s",
shm->name, shm->insize, shm->outsize, shm->sample_size, shm->polling ? "yes" : "no");
if (shm->exec) {
strcatf(&buf, ", exec='");

View file

@ -81,7 +81,7 @@ int shmem_shared_close(struct shmem_shared *shm, void *base)
{
atomic_store_explicit(&shm->ext_stopped, 1, memory_order_relaxed);
if (shm->cond_in) {
if (!shm->polling) {
pthread_mutex_lock(&shm->in.qs.mt);
pthread_cond_broadcast(&shm->in.qs.ready);
pthread_mutex_unlock(&shm->in.qs.mt);
@ -94,8 +94,8 @@ int shmem_shared_read(struct shmem_shared *shm, struct sample *smps[], unsigned
{
int ret;
ret = shm->cond_out ? queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt)
: queue_pull_many(&shm->out.q, (void **) smps, cnt);
ret = shm->polling ? queue_pull_many(&shm->out.q, (void **) smps, cnt)
: queue_signalled_pull_many(&shm->out.qs, (void **) smps, cnt);
if (!ret && atomic_load_explicit(&shm->node_stopped, memory_order_relaxed))
return -1;
@ -111,6 +111,6 @@ int shmem_shared_write(struct shmem_shared *shm, struct sample *smps[], unsigned
if (ret)
return -1;
return shm->cond_in ? queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt)
: queue_push_many(&shm->in.q, (void **) smps, cnt);
return shm->polling ? queue_push_many(&shm->in.q, (void **) smps, cnt)
: queue_signalled_push_many(&shm->in.qs, (void **) smps, cnt);
}