1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

node: separated node configuratio for send / receive side

This commit is contained in:
Steffen Vogel 2018-05-24 09:04:41 +02:00
parent e3ee361c07
commit a535ec5abc
10 changed files with 238 additions and 114 deletions

View file

@ -38,6 +38,17 @@
#include "queue.h"
#include "common.h"
struct node_direction {
int enabled;
int builtin; /**< This node should use built-in hooks by default. */
int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
struct list hooks; /**< List of write hooks (struct hook). */
struct list signals; /**< List of signal meta data such as signal names */
json_t *cfg; /**< A JSON object containing the configuration of the node. */
};
/** The data structure for a node.
*
* Every entity which exchanges messages is represented by a node.
@ -49,19 +60,14 @@ struct node
char *_name; /**< Singleton: A string used to print to screen. */
char *_name_long; /**< Singleton: A string used to print to screen. */
int builtin; /**< This node should use built-in hooks by default. */
int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
int affinity; /**< CPU Affinity of this node */
int samplelen; /**< The maximum number of values this node can receive. */
int id; /**< An id of this node which is only unique in the scope of it's super-node (VILLASnode instance). */
unsigned sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */
struct stats *stats; /**< Statistic counters. This is a pointer to the statistic hooks private data. */
struct list hooks; /**< List of write hooks (struct hook). */
struct list signals; /**< List of signal meta data such as signal names */
struct node_direction in, out;
enum state state;

View file

@ -37,12 +37,15 @@ static int api_nodes(struct api_action *r, json_t *args, json_t **resp, struct a
for (size_t i = 0; i < list_length(&s->api->super_node->nodes); i++) {
struct node *n = (struct node *) list_at(&s->api->super_node->nodes, i);
json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }",
json_t *json_node = json_pack("{ s: s, s: i, s: i, s: { s: i }, s: { s: i } }",
"name", node_name_short(n),
"state", n->state,
"vectorize", n->vectorize,
"affinity", n->affinity,
"id", i
"in",
"vectorize", n->in.vectorize,
"out",
"vectorize", n->out.vectorize
);
if (n->stats)

View file

@ -72,8 +72,14 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt)
cur->flags |= SAMPLE_IS_FIRST;
/* Run restart hooks */
for (size_t i = 0; i < list_length(&h->node->hooks); i++) {
struct hook *k = (struct hook *) list_at(&h->node->hooks, i);
for (size_t i = 0; i < list_length(&h->node->in.hooks); i++) {
struct hook *k = (struct hook *) list_at(&h->node->in.hooks, i);
hook_restart(k);
}
for (size_t i = 0; i < list_length(&h->node->out.hooks); i++) {
struct hook *k = (struct hook *) list_at(&h->node->out.hooks, i);
hook_restart(k);
}

View file

@ -101,8 +101,8 @@ int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags)
io->output.node = n;
if (n) {
io->input.signals = &n->signals;
io->output.signals = &n->signals;
io->input.signals = &n->in.signals;
io->output.signals = &n->out.signals;
}
else {
io->input.signals = NULL;

View file

@ -33,35 +33,19 @@
#include <villas/timing.h>
#include <villas/signal.h>
int node_init(struct node *n, struct node_type *vt)
static int node_direction_init(struct node_direction *nd, struct node *n)
{
static int max_id;
nd->enabled = 1;
nd->vectorize = 1;
nd->builtin = 1;
assert(n->state == STATE_DESTROYED);
n->_vt = vt;
n->_vd = alloc(vt->size);
n->stats = NULL;
n->name = NULL;
n->_name = NULL;
n->_name_long = NULL;
n->id = max_id++;
/* Default values */
n->vectorize = 1;
n->builtin = 1;
n->samplelen = DEFAULT_SAMPLELEN;
list_push(&vt->instances, n);
list_init(&n->signals);
list_init(&nd->signals);
#ifdef WITH_HOOKS
/* Add internal hooks if they are not already in the list */
list_init(&n->hooks);
if (n->builtin) {
list_init(&nd->hooks);
if (nd->builtin) {
int ret;
for (size_t i = 0; i < list_length(&plugins); i++) {
struct plugin *q = (struct plugin *) list_at(&plugins, i);
@ -80,46 +64,163 @@ int node_init(struct node *n, struct node_type *vt)
if (ret)
return ret;
list_push(&n->hooks, h);
list_push(&nd->hooks, h);
}
}
#endif /* WITH_HOOKS */
n->state = STATE_INITIALIZED;
return 0;
}
static int node_direction_destroy(struct node_direction *nd, struct node *n)
{
int ret;
ret = list_destroy(&nd->signals, (dtor_cb_t) signal_destroy, true);
if (ret)
return ret;
#ifdef WITH_HOOKS
ret = list_destroy(&nd->hooks, (dtor_cb_t) hook_destroy, true);
if (ret)
return ret;
#endif
return 0;
}
int node_init2(struct node *n)
static int node_direction_parse(struct node_direction *nd, struct node *n, json_t *cfg)
{
int ret;
json_t *json_hooks = NULL;
json_t *json_signals = NULL;
json_error_t err;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b }",
"hooks", &json_hooks,
"signals", &json_signals,
"vectorize", &nd->vectorize,
"builtin", &nd->builtin
);
if (ret)
jerror(&err, "Failed to parse node '%s'", node_name(n));
#ifdef WITH_HOOKS
if (json_hooks) {
ret = hook_parse_list(&nd->hooks, json_hooks, NULL, n);
if (ret < 0)
return ret;
}
#endif /* WITH_HOOKS */
if (json_signals) {
ret = signal_parse_list(&nd->signals, json_signals);
if (ret)
error("Failed to parse signal definition of node '%s'", node_name(n));
}
return 0;
}
static int node_direction_check(struct node_direction *nd, struct node *n)
{
if (nd->vectorize <= 0)
error("Invalid `vectorize` value %d for node %s. Must be natural number!", nd->vectorize, node_name(n));
if (n->_vt->vectorize && n->_vt->vectorize < nd->vectorize)
error("Invalid value for `vectorize`. Node type requires a number smaller than %d!",
n->_vt->vectorize);
return 0;
}
static int node_direction_start(struct node_direction *nd, struct node *n)
{
#ifdef WITH_HOOKS
int ret;
/* We sort the hooks according to their priority before starting the path */
list_sort(&n->hooks, hook_cmp_priority);
list_sort(&nd->hooks, hook_cmp_priority);
for (size_t i = 0; i < list_length(&nd->hooks); i++) {
struct hook *h = (struct hook *) list_at(&nd->hooks, i);
ret = hook_start(h);
if (ret)
return ret;
}
#endif /* WITH_HOOKS */
return 0;
}
static int node_direction_stop(struct node_direction *nd, struct node *n)
{
#ifdef WITH_HOOKS
int ret;
for (size_t i = 0; i < list_length(&nd->hooks); i++) {
struct hook *h = (struct hook *) list_at(&nd->hooks, i);
ret = hook_stop(h);
if (ret)
return ret;
}
#endif /* WITH_HOOKS */
return 0;
}
int node_init(struct node *n, struct node_type *vt)
{
int ret;
assert(n->state == STATE_DESTROYED);
n->_vt = vt;
n->_vd = alloc(vt->size);
n->stats = NULL;
n->name = NULL;
n->_name = NULL;
n->_name_long = NULL;
/* Default values */
n->samplelen = DEFAULT_SAMPLELEN;
ret = node_direction_init(&n->in, n);
if (ret)
return ret;
ret = node_direction_init(&n->out, n);
if (ret)
return ret;
n->state = STATE_INITIALIZED;
list_push(&vt->instances, n);
return 0;
}
int node_parse(struct node *n, json_t *cfg, const char *name)
{
struct node_type *nt;
int ret;
json_error_t err;
json_t *json_hooks = NULL;
json_t *json_signals = NULL;
json_t *json_in = NULL, *json_out = NULL;
const char *type;
n->name = strdup(name);
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: i, s?: i, s?: o, s?: b, s?: o }",
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: i, s?: o, s?: o }",
"type", &type,
"vectorize", &n->vectorize,
"samplelen", &n->samplelen,
"hooks", &json_hooks,
"builtin", &n->builtin,
"signals", &json_signals
"in", &json_in,
"out", &json_out
);
if (ret)
jerror(&err, "Failed to parse node '%s'", node_name(n));
@ -127,18 +228,16 @@ int node_parse(struct node *n, json_t *cfg, const char *name)
nt = node_type_lookup(type);
assert(nt == n->_vt);
#ifdef WITH_HOOKS
if (json_hooks) {
ret = hook_parse_list(&n->hooks, json_hooks, NULL, n);
if (ret < 0)
return ret;
}
#endif /* WITH_HOOKS */
if (json_signals) {
ret = signal_parse_list(&n->signals, json_signals);
if (json_in) {
ret = node_direction_parse(&n->in, n, json_in);
if (ret)
error("Failed to parse signal definition of node '%s'", node_name(n));
error("Failed to parse input direction of node '%s'", node_name(n));
}
if (json_out) {
ret = node_direction_parse(&n->out, n, json_out);
if (ret)
error("Failed to parse output direction of node '%s'", node_name(n));
}
ret = n->_vt->parse ? n->_vt->parse(n, cfg) : 0;
@ -179,14 +278,16 @@ int node_parse_cli(struct node *n, int argc, char *argv[])
int node_check(struct node *n)
{
int ret;
assert(n->state != STATE_DESTROYED);
if (n->vectorize <= 0)
error("Invalid `vectorize` value %d for node %s. Must be natural number!", n->vectorize, node_name(n));
ret = node_direction_check(&n->in, n);
if (ret)
return ret;
if (n->_vt->vectorize && n->_vt->vectorize < n->vectorize)
error("Invalid value for `vectorize`. Node type requires a number smaller than %d!",
n->_vt->vectorize);
ret = node_direction_check(&n->out, n);
if (ret)
return ret;
n->state = STATE_CHECKED;
@ -201,15 +302,13 @@ int node_start(struct node *n)
info("Starting node %s", node_name_long(n));
{ INDENT
#ifdef WITH_HOOKS
for (size_t i = 0; i < list_length(&n->hooks); i++) {
struct hook *h = (struct hook *) list_at(&n->hooks, i);
ret = node_direction_start(&n->in, n);
if (ret)
return ret;
ret = hook_start(h);
if (ret)
return ret;
}
#endif /* WITH_HOOKS */
ret = node_direction_start(&n->out, n);
if (ret)
return ret;
ret = n->_vt->start ? n->_vt->start(n) : 0;
if (ret)
@ -232,15 +331,13 @@ int node_stop(struct node *n)
info("Stopping node %s", node_name(n));
{ INDENT
#ifdef WITH_HOOKS
for (size_t i = 0; i < list_length(&n->hooks); i++) {
struct hook *h = (struct hook *) list_at(&n->hooks, i);
ret = node_direction_stop(&n->in, n);
if (ret)
return ret;
ret = hook_stop(h);
if (ret)
return ret;
}
#endif /* WITH_HOOKS */
ret = node_direction_stop(&n->out, n);
if (ret)
return ret;
ret = n->_vt->stop ? n->_vt->stop(n) : 0;
}
@ -253,19 +350,22 @@ int node_stop(struct node *n)
int node_destroy(struct node *n)
{
int ret;
assert(n->state != STATE_DESTROYED && n->state != STATE_STARTED);
#ifdef WITH_HOOKS
list_destroy(&n->hooks, (dtor_cb_t) hook_destroy, true);
#endif
ret = node_direction_destroy(&n->in, n);
if (ret)
return ret;
ret = node_direction_destroy(&n->out, n);
if (ret)
return ret;
if (n->_vt->destroy)
n->_vt->destroy(n);
list_remove(&n->_vt->instances, n);
list_destroy(&n->signals, (dtor_cb_t) signal_destroy, true);
if (n->_vd)
free(n->_vd);
@ -328,7 +428,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt)
#ifdef WITH_HOOKS
/* Run read hooks */
int rread = hook_read_list(&n->hooks, smps, nread);
int rread = hook_read_list(&n->in.hooks, smps, nread);
int skipped = nread - rread;
if (skipped > 0 && n->stats != NULL) {
@ -354,7 +454,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt)
#ifdef WITH_HOOKS
/* Run write hooks */
cnt = hook_write_list(&n->hooks, smps, cnt);
cnt = hook_write_list(&n->out.hooks, smps, cnt);
if (cnt <= 0)
return cnt;
#endif /* WITH_HOOKS */
@ -395,7 +495,12 @@ char * node_name_long(struct node *n)
if (n->_vt->print) {
struct node_type *vt = n->_vt;
char *name_long = vt->print(n);
strcatf(&n->_name_long, "%s: #hooks=%zu, id=%d, vectorize=%d, samplelen=%d, %s", node_name(n), list_length(&n->hooks), n->id, n->vectorize, n->samplelen, name_long);
strcatf(&n->_name_long, "%s: #in.hooks=%zu, in.vectorize=%d, #out.hooks=%zu, out.vectorize=%d, samplelen=%d, %s",
node_name(n),
list_length(&n->in.hooks), n->in.vectorize,
list_length(&n->out.hooks), n->out.vectorize,
n->samplelen, name_long);
free(name_long);
}
else

View file

@ -46,7 +46,7 @@ int shmem_parse(struct node *n, json_t *cfg)
json_error_t err;
/* Default values */
shm->conf.queuelen = MAX(DEFAULT_SHMEM_QUEUELEN, n->vectorize);
shm->conf.queuelen = MAX(DEFAULT_SHMEM_QUEUELEN, n->in.vectorize);
shm->conf.samplelen = n->samplelen;
shm->conf.polling = false;
shm->exec = NULL;

View file

@ -46,7 +46,7 @@ static int path_source_init(struct path_source *ps)
{
int ret;
ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, ps->node->vectorize), SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage);
ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, ps->node->in.vectorize), SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage);
if (ret)
return ret;
@ -72,7 +72,7 @@ static void path_source_read(struct path_source *ps, struct path *p, int i)
{
int recv, tomux, ready, cnt;
cnt = ps->node->vectorize;
cnt = ps->node->in.vectorize;
struct sample *read_smps[cnt];
struct sample *muxed_smps[cnt];
@ -122,10 +122,7 @@ static void path_source_read(struct path_source *ps, struct path *p, int i)
if (toenqueue != tomux) {
int skipped = tomux - toenqueue;
debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, timux, path_name(p));
if (p->stats)
stats_update(p->stats, STATS_SKIPPED, skipped);
debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p));
}
#else
int toenqueue = tomux;
@ -197,7 +194,7 @@ static void path_destination_enqueue(struct path *p, struct sample *smps[], unsi
static void path_destination_write(struct path_destination *pd, struct path *p)
{
int cnt = pd->node->vectorize;
int cnt = pd->node->out.vectorize;
int sent;
int available;
int released;
@ -611,8 +608,13 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes)
&& node_fd(ps->node) != 1;
}
list_destroy(&sources, NULL, false);
list_destroy(&destinations, NULL, false);
ret = list_destroy(&sources, NULL, false);
if (ret)
return ret;
ret = list_destroy(&destinations, NULL, false);
if (ret)
return ret;
p->cfg = cfg;
p->state = STATE_PARSED;

View file

@ -369,10 +369,6 @@ int super_node_start(struct super_node *sn)
int refs = list_count(&sn->paths, (cmp_cb_t) path_uses_node, n);
if (refs > 0) { INDENT
ret = node_init2(n);
if (ret)
error("Failed to start node: %s", node_name(n));
ret = node_start(n);
if (ret)
error("Failed to start node: %s", node_name(n));

View file

@ -195,8 +195,14 @@ int main(int argc, char *argv[])
if (n->state != STATE_STARTED)
continue;
for (size_t j = 0; j < list_length(&n->hooks); j++) {
struct hook *h = (struct hook *) list_at(&n->hooks, j);
for (size_t j = 0; j < list_length(&n->in.hooks); j++) {
struct hook *h = (struct hook *) list_at(&n->in.hooks, j);
hook_periodic(h);
}
for (size_t j = 0; j < list_length(&n->out.hooks); j++) {
struct hook *h = (struct hook *) list_at(&n->out.hooks, j);
hook_periodic(h);
}

View file

@ -129,18 +129,18 @@ static void * send_loop(void *ctx)
{
unsigned last_sequenceno = 0;
int ret, scanned, sent, ready, cnt = 0;
struct sample *smps[node->vectorize];
struct sample *smps[node->out.vectorize];
/* Initialize memory */
ret = pool_init(&sendd.pool, LOG2_CEIL(node->vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage);
ret = pool_init(&sendd.pool, LOG2_CEIL(node->out.vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage);
if (ret < 0)
error("Failed to allocate memory for receive pool.");
while (!io_eof(&io)) {
ready = sample_alloc_many(&sendd.pool, smps, node->vectorize);
ready = sample_alloc_many(&sendd.pool, smps, node->out.vectorize);
if (ret < 0)
error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret);
else if (ready < node->vectorize)
error("Failed to get %u samples out of send pool (%d).", node->out.vectorize, ret);
else if (ready < node->out.vectorize)
warn("Send pool underrun");
scanned = io_scan(&io, smps, ready);
@ -193,18 +193,18 @@ leave: if (io_eof(&io)) {
static void * recv_loop(void *ctx)
{
int recv, ret, cnt = 0, ready = 0;
struct sample *smps[node->vectorize];
struct sample *smps[node->in.vectorize];
/* Initialize memory */
ret = pool_init(&recvv.pool, LOG2_CEIL(node->vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage);
ret = pool_init(&recvv.pool, LOG2_CEIL(node->in.vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage);
if (ret < 0)
error("Failed to allocate memory for receive pool.");
for (;;) {
ready = sample_alloc_many(&recvv.pool, smps, node->vectorize);
ready = sample_alloc_many(&recvv.pool, smps, node->in.vectorize);
if (ready < 0)
error("Failed to allocate %u samples from receive pool.", node->vectorize);
else if (ready < node->vectorize)
error("Failed to allocate %u samples from receive pool.", node->in.vectorize);
else if (ready < node->in.vectorize)
warn("Receive pool underrun");
recv = node_read(node, smps, ready);