diff --git a/include/villas/nodes/cbuilder.h b/include/villas/nodes/cbuilder.h index 06aab7861..169500230 100644 --- a/include/villas/nodes/cbuilder.h +++ b/include/villas/nodes/cbuilder.h @@ -45,7 +45,8 @@ struct cbuilder { * The simulation step is triggerd by a call to cbuilder_write(). */ pthread_mutex_t mtx; - pthread_cond_t cv; + + int eventfd; /**< Eventfd for synchronizing cbuilder_read() / cbuilder_write() access. */ }; /** @} */ diff --git a/lib/nodes/cbuilder.c b/lib/nodes/cbuilder.c index 759851d28..0b6e64977 100644 --- a/lib/nodes/cbuilder.c +++ b/lib/nodes/cbuilder.c @@ -4,6 +4,8 @@ * @copyright 2017, Steffen Vogel **********************************************************************************/ +#include + #include "node.h" #include "log.h" #include "plugin.h" @@ -60,7 +62,10 @@ int cbuilder_start(struct node *n) /* Initialize mutex and cv */ pthread_mutex_init(&cb->mtx, NULL); - pthread_cond_init(&cb->cv, NULL); + + cb->eventfd = eventfd(0, 0); + if (cb->eventfd < 0) + return -1; /* Currently only a single timestep per model / instance is supported */ cb->step = 0; @@ -77,10 +82,14 @@ int cbuilder_start(struct node *n) int cbuilder_stop(struct node *n) { + int ret; struct cbuilder *cb = n->_vd; + + ret = close(cb->eventfd); + if (ret) + return ret; pthread_mutex_destroy(&cb->mtx); - pthread_cond_destroy(&cb->cv); return 0; } @@ -89,11 +98,13 @@ int cbuilder_read(struct node *n, struct sample *smps[], unsigned cnt) { struct cbuilder *cb = n->_vd; struct sample *smp = smps[0]; + + uint64_t cntr; + + read(cb->eventfd, &cntr, sizeof(cntr)); /* Wait for completion of step */ pthread_mutex_lock(&cb->mtx); - while (cb->read >= cb->step) - pthread_cond_wait(&cb->cv, &cb->mtx); float data[smp->capacity]; @@ -125,13 +136,22 @@ int cbuilder_write(struct node *n, struct sample *smps[], unsigned cnt) cb->model->code(); cb->step++; + + uint64_t incr = 1; + write(cb->eventfd, &incr, sizeof(incr)); - pthread_cond_signal(&cb->cv); pthread_mutex_unlock(&cb->mtx); return 1; } +int cbuilder_fd(struct node *n) +{ + struct cbuilder *cb = n->_vd; + + return cb->eventfd; +} + static struct plugin p = { .name = "cbuilder", .description = "RTDS CBuilder model", @@ -143,7 +163,8 @@ static struct plugin p = { .start = cbuilder_start, .stop = cbuilder_stop, .read = cbuilder_read, - .write = cbuilder_write + .write = cbuilder_write, + .fd = cbuilder_fd } }; diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c index 995a8d055..cb866a5aa 100644 --- a/lib/nodes/loopback.c +++ b/lib/nodes/loopback.c @@ -57,7 +57,7 @@ int loopback_open(struct node *n) if (ret) return ret; - return queue_signalled_init(&l->queue, l->queuelen, &memtype_hugepage); + return queue_signalled_init(&l->queue, l->queuelen, &memtype_hugepage, QUEUE_SIGNALLED_EVENTFD); } int loopback_close(struct node *n) @@ -116,19 +116,27 @@ char * loopback_print(struct node *n) return buf; }; +int loopback_fd(struct node *n) +{ + struct loopback *l = n->_vd; + + return queue_signalled_fd(&l->queue); +} + static struct plugin p = { .name = "loopback", .description = "Loopback to connect multiple paths", .type = PLUGIN_TYPE_NODE, .node = { .vectorize = 0, - .size = sizeof(struct loopback), - .parse = loopback_parse, - .print = loopback_print, - .start = loopback_open, - .stop = loopback_close, - .read = loopback_read, - .write = loopback_write + .size = sizeof(struct loopback), + .parse = loopback_parse, + .print = loopback_print, + .start = loopback_open, + .stop = loopback_close, + .read = loopback_read, + .write = loopback_write, + .fd = loopback_fd } }; diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index 6b16a843f..688dcb22b 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -186,7 +186,7 @@ char * shmem_print(struct node *n) } return buf; -}; +} static struct plugin p = { .name = "shmem", @@ -194,13 +194,13 @@ static struct plugin p = { .type = PLUGIN_TYPE_NODE, .node = { .vectorize = 0, - .size = sizeof(struct shmem), - .parse = shmem_parse, - .print = shmem_print, - .start = shmem_open, - .stop = shmem_close, - .read = shmem_read, - .write = shmem_write + .size = sizeof(struct shmem), + .parse = shmem_parse, + .print = shmem_print, + .start = shmem_open, + .stop = shmem_close, + .read = shmem_read, + .write = shmem_write } }; diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c index 95f99a641..52ebb1e72 100644 --- a/lib/nodes/websocket.c +++ b/lib/nodes/websocket.c @@ -346,7 +346,7 @@ int websocket_start(struct node *n) if (ret) return ret; - ret = queue_signalled_init(&w->queue, DEFAULT_WEBSOCKET_QUEUELEN, &memtype_hugepage); + ret = queue_signalled_init(&w->queue, DEFAULT_WEBSOCKET_QUEUELEN, &memtype_hugepage, 0); if (ret) return ret; @@ -558,6 +558,13 @@ char * websocket_print(struct node *n) return buf; } +int websocket_fd(struct node *n) +{ + struct websocket *w = n->_vd; + + return queue_signalled_fd(&w->queue); +} + static struct plugin p = { .name = "websocket", .description = "Send and receive samples of a WebSocket connection (libwebsockets)", @@ -573,7 +580,8 @@ static struct plugin p = { .read = websocket_read, .write = websocket_write, .print = websocket_print, - .parse = websocket_parse + .parse = websocket_parse, + .fd = websocket_fd } };