1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

add support for node_fd() to more node types

This commit is contained in:
Steffen Vogel 2017-08-30 13:30:31 +02:00
parent d13b617167
commit 8fdcaa3c4e
5 changed files with 63 additions and 25 deletions

View file

@ -45,7 +45,8 @@ struct cbuilder {
* The simulation step is triggerd by a call to cbuilder_write().
*/
pthread_mutex_t mtx;
pthread_cond_t cv;
int eventfd; /**< Eventfd for synchronizing cbuilder_read() / cbuilder_write() access. */
};
/** @} */

View file

@ -4,6 +4,8 @@
* @copyright 2017, Steffen Vogel
**********************************************************************************/
#include <sys/eventfd.h>
#include "node.h"
#include "log.h"
#include "plugin.h"
@ -60,7 +62,10 @@ int cbuilder_start(struct node *n)
/* Initialize mutex and cv */
pthread_mutex_init(&cb->mtx, NULL);
pthread_cond_init(&cb->cv, NULL);
cb->eventfd = eventfd(0, 0);
if (cb->eventfd < 0)
return -1;
/* Currently only a single timestep per model / instance is supported */
cb->step = 0;
@ -77,10 +82,14 @@ int cbuilder_start(struct node *n)
int cbuilder_stop(struct node *n)
{
int ret;
struct cbuilder *cb = n->_vd;
ret = close(cb->eventfd);
if (ret)
return ret;
pthread_mutex_destroy(&cb->mtx);
pthread_cond_destroy(&cb->cv);
return 0;
}
@ -89,11 +98,13 @@ int cbuilder_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct cbuilder *cb = n->_vd;
struct sample *smp = smps[0];
uint64_t cntr;
read(cb->eventfd, &cntr, sizeof(cntr));
/* Wait for completion of step */
pthread_mutex_lock(&cb->mtx);
while (cb->read >= cb->step)
pthread_cond_wait(&cb->cv, &cb->mtx);
float data[smp->capacity];
@ -125,13 +136,22 @@ int cbuilder_write(struct node *n, struct sample *smps[], unsigned cnt)
cb->model->code();
cb->step++;
uint64_t incr = 1;
write(cb->eventfd, &incr, sizeof(incr));
pthread_cond_signal(&cb->cv);
pthread_mutex_unlock(&cb->mtx);
return 1;
}
int cbuilder_fd(struct node *n)
{
struct cbuilder *cb = n->_vd;
return cb->eventfd;
}
static struct plugin p = {
.name = "cbuilder",
.description = "RTDS CBuilder model",
@ -143,7 +163,8 @@ static struct plugin p = {
.start = cbuilder_start,
.stop = cbuilder_stop,
.read = cbuilder_read,
.write = cbuilder_write
.write = cbuilder_write,
.fd = cbuilder_fd
}
};

View file

@ -57,7 +57,7 @@ int loopback_open(struct node *n)
if (ret)
return ret;
return queue_signalled_init(&l->queue, l->queuelen, &memtype_hugepage);
return queue_signalled_init(&l->queue, l->queuelen, &memtype_hugepage, QUEUE_SIGNALLED_EVENTFD);
}
int loopback_close(struct node *n)
@ -116,19 +116,27 @@ char * loopback_print(struct node *n)
return buf;
};
int loopback_fd(struct node *n)
{
struct loopback *l = n->_vd;
return queue_signalled_fd(&l->queue);
}
static struct plugin p = {
.name = "loopback",
.description = "Loopback to connect multiple paths",
.type = PLUGIN_TYPE_NODE,
.node = {
.vectorize = 0,
.size = sizeof(struct loopback),
.parse = loopback_parse,
.print = loopback_print,
.start = loopback_open,
.stop = loopback_close,
.read = loopback_read,
.write = loopback_write
.size = sizeof(struct loopback),
.parse = loopback_parse,
.print = loopback_print,
.start = loopback_open,
.stop = loopback_close,
.read = loopback_read,
.write = loopback_write,
.fd = loopback_fd
}
};

View file

@ -186,7 +186,7 @@ char * shmem_print(struct node *n)
}
return buf;
};
}
static struct plugin p = {
.name = "shmem",
@ -194,13 +194,13 @@ static struct plugin p = {
.type = PLUGIN_TYPE_NODE,
.node = {
.vectorize = 0,
.size = sizeof(struct shmem),
.parse = shmem_parse,
.print = shmem_print,
.start = shmem_open,
.stop = shmem_close,
.read = shmem_read,
.write = shmem_write
.size = sizeof(struct shmem),
.parse = shmem_parse,
.print = shmem_print,
.start = shmem_open,
.stop = shmem_close,
.read = shmem_read,
.write = shmem_write
}
};

View file

@ -346,7 +346,7 @@ int websocket_start(struct node *n)
if (ret)
return ret;
ret = queue_signalled_init(&w->queue, DEFAULT_WEBSOCKET_QUEUELEN, &memtype_hugepage);
ret = queue_signalled_init(&w->queue, DEFAULT_WEBSOCKET_QUEUELEN, &memtype_hugepage, 0);
if (ret)
return ret;
@ -558,6 +558,13 @@ char * websocket_print(struct node *n)
return buf;
}
int websocket_fd(struct node *n)
{
struct websocket *w = n->_vd;
return queue_signalled_fd(&w->queue);
}
static struct plugin p = {
.name = "websocket",
.description = "Send and receive samples of a WebSocket connection (libwebsockets)",
@ -573,7 +580,8 @@ static struct plugin p = {
.read = websocket_read,
.write = websocket_write,
.print = websocket_print,
.parse = websocket_parse
.parse = websocket_parse,
.fd = websocket_fd
}
};