diff --git a/include/villas/node.h b/include/villas/node.h index ff890362c..06c0dcc09 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -38,6 +38,17 @@ #include "queue.h" #include "common.h" +struct node_direction { + int enabled; + int builtin; /**< This node should use built-in hooks by default. */ + int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */ + + struct list hooks; /**< List of write hooks (struct hook). */ + struct list signals; /**< List of signal meta data such as signal names */ + + json_t *cfg; /**< A JSON object containing the configuration of the node. */ +}; + /** The data structure for a node. * * Every entity which exchanges messages is represented by a node. @@ -49,19 +60,14 @@ struct node char *_name; /**< Singleton: A string used to print to screen. */ char *_name_long; /**< Singleton: A string used to print to screen. */ - int builtin; /**< This node should use built-in hooks by default. */ - int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */ int affinity; /**< CPU Affinity of this node */ int samplelen; /**< The maximum number of values this node can receive. */ - int id; /**< An id of this node which is only unique in the scope of it's super-node (VILLASnode instance). */ - unsigned sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */ struct stats *stats; /**< Statistic counters. This is a pointer to the statistic hooks private data. */ - struct list hooks; /**< List of write hooks (struct hook). */ - struct list signals; /**< List of signal meta data such as signal names */ + struct node_direction in, out; enum state state; diff --git a/lib/api/actions/nodes.c b/lib/api/actions/nodes.c index d78caa86f..66bd292bf 100644 --- a/lib/api/actions/nodes.c +++ b/lib/api/actions/nodes.c @@ -37,12 +37,15 @@ static int api_nodes(struct api_action *r, json_t *args, json_t **resp, struct a for (size_t i = 0; i < list_length(&s->api->super_node->nodes); i++) { struct node *n = (struct node *) list_at(&s->api->super_node->nodes, i); - json_t *json_node = json_pack("{ s: s, s: i, s: i, s: i, s: i }", + json_t *json_node = json_pack("{ s: s, s: i, s: i, s: { s: i }, s: { s: i } }", "name", node_name_short(n), "state", n->state, - "vectorize", n->vectorize, "affinity", n->affinity, - "id", i + "in", + "vectorize", n->in.vectorize, + "out", + "vectorize", n->out.vectorize + ); if (n->stats) diff --git a/lib/hooks/restart.c b/lib/hooks/restart.c index 01de543c7..6a5e577f8 100644 --- a/lib/hooks/restart.c +++ b/lib/hooks/restart.c @@ -72,8 +72,14 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt) cur->flags |= SAMPLE_IS_FIRST; /* Run restart hooks */ - for (size_t i = 0; i < list_length(&h->node->hooks); i++) { - struct hook *k = (struct hook *) list_at(&h->node->hooks, i); + for (size_t i = 0; i < list_length(&h->node->in.hooks); i++) { + struct hook *k = (struct hook *) list_at(&h->node->in.hooks, i); + + hook_restart(k); + } + + for (size_t i = 0; i < list_length(&h->node->out.hooks); i++) { + struct hook *k = (struct hook *) list_at(&h->node->out.hooks, i); hook_restart(k); } diff --git a/lib/io.c b/lib/io.c index 571ab8e22..7461e8643 100644 --- a/lib/io.c +++ b/lib/io.c @@ -101,8 +101,8 @@ int io_init(struct io *io, struct format_type *fmt, struct node *n, int flags) io->output.node = n; if (n) { - io->input.signals = &n->signals; - io->output.signals = &n->signals; + io->input.signals = &n->in.signals; + io->output.signals = &n->out.signals; } else { io->input.signals = NULL; diff --git a/lib/node.c b/lib/node.c index f8f28cb6e..69320ff0b 100644 --- a/lib/node.c +++ b/lib/node.c @@ -33,35 +33,19 @@ #include #include -int node_init(struct node *n, struct node_type *vt) +static int node_direction_init(struct node_direction *nd, struct node *n) { - static int max_id; + nd->enabled = 1; + nd->vectorize = 1; + nd->builtin = 1; - assert(n->state == STATE_DESTROYED); - - n->_vt = vt; - n->_vd = alloc(vt->size); - - n->stats = NULL; - n->name = NULL; - n->_name = NULL; - n->_name_long = NULL; - - n->id = max_id++; - - /* Default values */ - n->vectorize = 1; - n->builtin = 1; - n->samplelen = DEFAULT_SAMPLELEN; - - list_push(&vt->instances, n); - - list_init(&n->signals); + list_init(&nd->signals); #ifdef WITH_HOOKS /* Add internal hooks if they are not already in the list */ - list_init(&n->hooks); - if (n->builtin) { + list_init(&nd->hooks); + + if (nd->builtin) { int ret; for (size_t i = 0; i < list_length(&plugins); i++) { struct plugin *q = (struct plugin *) list_at(&plugins, i); @@ -80,46 +64,163 @@ int node_init(struct node *n, struct node_type *vt) if (ret) return ret; - list_push(&n->hooks, h); + list_push(&nd->hooks, h); } } #endif /* WITH_HOOKS */ - n->state = STATE_INITIALIZED; + return 0; +} + +static int node_direction_destroy(struct node_direction *nd, struct node *n) +{ + int ret; + + ret = list_destroy(&nd->signals, (dtor_cb_t) signal_destroy, true); + if (ret) + return ret; + +#ifdef WITH_HOOKS + ret = list_destroy(&nd->hooks, (dtor_cb_t) hook_destroy, true); + if (ret) + return ret; +#endif return 0; } -int node_init2(struct node *n) +static int node_direction_parse(struct node_direction *nd, struct node *n, json_t *cfg) +{ + int ret; + + json_t *json_hooks = NULL; + json_t *json_signals = NULL; + + json_error_t err; + + ret = json_unpack_ex(cfg, &err, 0, "{ s?: o, s?: o, s?: i, s?: b }", + "hooks", &json_hooks, + "signals", &json_signals, + "vectorize", &nd->vectorize, + "builtin", &nd->builtin + ); + if (ret) + jerror(&err, "Failed to parse node '%s'", node_name(n)); + +#ifdef WITH_HOOKS + if (json_hooks) { + ret = hook_parse_list(&nd->hooks, json_hooks, NULL, n); + if (ret < 0) + return ret; + } +#endif /* WITH_HOOKS */ + + if (json_signals) { + ret = signal_parse_list(&nd->signals, json_signals); + if (ret) + error("Failed to parse signal definition of node '%s'", node_name(n)); + } + + return 0; +} + +static int node_direction_check(struct node_direction *nd, struct node *n) +{ + if (nd->vectorize <= 0) + error("Invalid `vectorize` value %d for node %s. Must be natural number!", nd->vectorize, node_name(n)); + + if (n->_vt->vectorize && n->_vt->vectorize < nd->vectorize) + error("Invalid value for `vectorize`. Node type requires a number smaller than %d!", + n->_vt->vectorize); + + return 0; +} + +static int node_direction_start(struct node_direction *nd, struct node *n) { #ifdef WITH_HOOKS + int ret; + /* We sort the hooks according to their priority before starting the path */ - list_sort(&n->hooks, hook_cmp_priority); + list_sort(&nd->hooks, hook_cmp_priority); + + for (size_t i = 0; i < list_length(&nd->hooks); i++) { + struct hook *h = (struct hook *) list_at(&nd->hooks, i); + + ret = hook_start(h); + if (ret) + return ret; + } #endif /* WITH_HOOKS */ return 0; } +static int node_direction_stop(struct node_direction *nd, struct node *n) +{ +#ifdef WITH_HOOKS + int ret; + + for (size_t i = 0; i < list_length(&nd->hooks); i++) { + struct hook *h = (struct hook *) list_at(&nd->hooks, i); + + ret = hook_stop(h); + if (ret) + return ret; + } +#endif /* WITH_HOOKS */ + + return 0; +} + +int node_init(struct node *n, struct node_type *vt) +{ + int ret; + assert(n->state == STATE_DESTROYED); + + n->_vt = vt; + n->_vd = alloc(vt->size); + + n->stats = NULL; + n->name = NULL; + n->_name = NULL; + n->_name_long = NULL; + + /* Default values */ + n->samplelen = DEFAULT_SAMPLELEN; + + ret = node_direction_init(&n->in, n); + if (ret) + return ret; + + ret = node_direction_init(&n->out, n); + if (ret) + return ret; + + n->state = STATE_INITIALIZED; + + list_push(&vt->instances, n); + + return 0; +} + int node_parse(struct node *n, json_t *cfg, const char *name) { struct node_type *nt; int ret; json_error_t err; - json_t *json_hooks = NULL; - json_t *json_signals = NULL; + json_t *json_in = NULL, *json_out = NULL; const char *type; n->name = strdup(name); - ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: i, s?: i, s?: o, s?: b, s?: o }", + ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: i, s?: o, s?: o }", "type", &type, - "vectorize", &n->vectorize, "samplelen", &n->samplelen, - "hooks", &json_hooks, - "builtin", &n->builtin, - "signals", &json_signals + "in", &json_in, + "out", &json_out ); if (ret) jerror(&err, "Failed to parse node '%s'", node_name(n)); @@ -127,18 +228,16 @@ int node_parse(struct node *n, json_t *cfg, const char *name) nt = node_type_lookup(type); assert(nt == n->_vt); -#ifdef WITH_HOOKS - if (json_hooks) { - ret = hook_parse_list(&n->hooks, json_hooks, NULL, n); - if (ret < 0) - return ret; - } -#endif /* WITH_HOOKS */ - - if (json_signals) { - ret = signal_parse_list(&n->signals, json_signals); + if (json_in) { + ret = node_direction_parse(&n->in, n, json_in); if (ret) - error("Failed to parse signal definition of node '%s'", node_name(n)); + error("Failed to parse input direction of node '%s'", node_name(n)); + } + + if (json_out) { + ret = node_direction_parse(&n->out, n, json_out); + if (ret) + error("Failed to parse output direction of node '%s'", node_name(n)); } ret = n->_vt->parse ? n->_vt->parse(n, cfg) : 0; @@ -179,14 +278,16 @@ int node_parse_cli(struct node *n, int argc, char *argv[]) int node_check(struct node *n) { + int ret; assert(n->state != STATE_DESTROYED); - if (n->vectorize <= 0) - error("Invalid `vectorize` value %d for node %s. Must be natural number!", n->vectorize, node_name(n)); + ret = node_direction_check(&n->in, n); + if (ret) + return ret; - if (n->_vt->vectorize && n->_vt->vectorize < n->vectorize) - error("Invalid value for `vectorize`. Node type requires a number smaller than %d!", - n->_vt->vectorize); + ret = node_direction_check(&n->out, n); + if (ret) + return ret; n->state = STATE_CHECKED; @@ -201,15 +302,13 @@ int node_start(struct node *n) info("Starting node %s", node_name_long(n)); { INDENT -#ifdef WITH_HOOKS - for (size_t i = 0; i < list_length(&n->hooks); i++) { - struct hook *h = (struct hook *) list_at(&n->hooks, i); + ret = node_direction_start(&n->in, n); + if (ret) + return ret; - ret = hook_start(h); - if (ret) - return ret; - } -#endif /* WITH_HOOKS */ + ret = node_direction_start(&n->out, n); + if (ret) + return ret; ret = n->_vt->start ? n->_vt->start(n) : 0; if (ret) @@ -232,15 +331,13 @@ int node_stop(struct node *n) info("Stopping node %s", node_name(n)); { INDENT -#ifdef WITH_HOOKS - for (size_t i = 0; i < list_length(&n->hooks); i++) { - struct hook *h = (struct hook *) list_at(&n->hooks, i); + ret = node_direction_stop(&n->in, n); + if (ret) + return ret; - ret = hook_stop(h); - if (ret) - return ret; - } -#endif /* WITH_HOOKS */ + ret = node_direction_stop(&n->out, n); + if (ret) + return ret; ret = n->_vt->stop ? n->_vt->stop(n) : 0; } @@ -253,19 +350,22 @@ int node_stop(struct node *n) int node_destroy(struct node *n) { + int ret; assert(n->state != STATE_DESTROYED && n->state != STATE_STARTED); -#ifdef WITH_HOOKS - list_destroy(&n->hooks, (dtor_cb_t) hook_destroy, true); -#endif + ret = node_direction_destroy(&n->in, n); + if (ret) + return ret; + + ret = node_direction_destroy(&n->out, n); + if (ret) + return ret; if (n->_vt->destroy) n->_vt->destroy(n); list_remove(&n->_vt->instances, n); - list_destroy(&n->signals, (dtor_cb_t) signal_destroy, true); - if (n->_vd) free(n->_vd); @@ -328,7 +428,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt) #ifdef WITH_HOOKS /* Run read hooks */ - int rread = hook_read_list(&n->hooks, smps, nread); + int rread = hook_read_list(&n->in.hooks, smps, nread); int skipped = nread - rread; if (skipped > 0 && n->stats != NULL) { @@ -354,7 +454,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt) #ifdef WITH_HOOKS /* Run write hooks */ - cnt = hook_write_list(&n->hooks, smps, cnt); + cnt = hook_write_list(&n->out.hooks, smps, cnt); if (cnt <= 0) return cnt; #endif /* WITH_HOOKS */ @@ -395,7 +495,12 @@ char * node_name_long(struct node *n) if (n->_vt->print) { struct node_type *vt = n->_vt; char *name_long = vt->print(n); - strcatf(&n->_name_long, "%s: #hooks=%zu, id=%d, vectorize=%d, samplelen=%d, %s", node_name(n), list_length(&n->hooks), n->id, n->vectorize, n->samplelen, name_long); + strcatf(&n->_name_long, "%s: #in.hooks=%zu, in.vectorize=%d, #out.hooks=%zu, out.vectorize=%d, samplelen=%d, %s", + node_name(n), + list_length(&n->in.hooks), n->in.vectorize, + list_length(&n->out.hooks), n->out.vectorize, + n->samplelen, name_long); + free(name_long); } else diff --git a/lib/nodes/shmem.c b/lib/nodes/shmem.c index f63dce860..9be84cc30 100644 --- a/lib/nodes/shmem.c +++ b/lib/nodes/shmem.c @@ -46,7 +46,7 @@ int shmem_parse(struct node *n, json_t *cfg) json_error_t err; /* Default values */ - shm->conf.queuelen = MAX(DEFAULT_SHMEM_QUEUELEN, n->vectorize); + shm->conf.queuelen = MAX(DEFAULT_SHMEM_QUEUELEN, n->in.vectorize); shm->conf.samplelen = n->samplelen; shm->conf.polling = false; shm->exec = NULL; diff --git a/lib/path.c b/lib/path.c index 3d2fb86b5..7763e1aa5 100644 --- a/lib/path.c +++ b/lib/path.c @@ -46,7 +46,7 @@ static int path_source_init(struct path_source *ps) { int ret; - ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, ps->node->vectorize), SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage); + ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, ps->node->in.vectorize), SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage); if (ret) return ret; @@ -72,7 +72,7 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) { int recv, tomux, ready, cnt; - cnt = ps->node->vectorize; + cnt = ps->node->in.vectorize; struct sample *read_smps[cnt]; struct sample *muxed_smps[cnt]; @@ -122,10 +122,7 @@ static void path_source_read(struct path_source *ps, struct path *p, int i) if (toenqueue != tomux) { int skipped = tomux - toenqueue; - debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, timux, path_name(p)); - - if (p->stats) - stats_update(p->stats, STATS_SKIPPED, skipped); + debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for path %s", skipped, tomux, path_name(p)); } #else int toenqueue = tomux; @@ -197,7 +194,7 @@ static void path_destination_enqueue(struct path *p, struct sample *smps[], unsi static void path_destination_write(struct path_destination *pd, struct path *p) { - int cnt = pd->node->vectorize; + int cnt = pd->node->out.vectorize; int sent; int available; int released; @@ -611,8 +608,13 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) && node_fd(ps->node) != 1; } - list_destroy(&sources, NULL, false); - list_destroy(&destinations, NULL, false); + ret = list_destroy(&sources, NULL, false); + if (ret) + return ret; + + ret = list_destroy(&destinations, NULL, false); + if (ret) + return ret; p->cfg = cfg; p->state = STATE_PARSED; diff --git a/lib/super_node.c b/lib/super_node.c index 4a6a9ebf9..705b8cab7 100644 --- a/lib/super_node.c +++ b/lib/super_node.c @@ -369,10 +369,6 @@ int super_node_start(struct super_node *sn) int refs = list_count(&sn->paths, (cmp_cb_t) path_uses_node, n); if (refs > 0) { INDENT - ret = node_init2(n); - if (ret) - error("Failed to start node: %s", node_name(n)); - ret = node_start(n); if (ret) error("Failed to start node: %s", node_name(n)); diff --git a/src/node.c b/src/node.c index 0119362ad..f5de8d1c5 100644 --- a/src/node.c +++ b/src/node.c @@ -195,8 +195,14 @@ int main(int argc, char *argv[]) if (n->state != STATE_STARTED) continue; - for (size_t j = 0; j < list_length(&n->hooks); j++) { - struct hook *h = (struct hook *) list_at(&n->hooks, j); + for (size_t j = 0; j < list_length(&n->in.hooks); j++) { + struct hook *h = (struct hook *) list_at(&n->in.hooks, j); + + hook_periodic(h); + } + + for (size_t j = 0; j < list_length(&n->out.hooks); j++) { + struct hook *h = (struct hook *) list_at(&n->out.hooks, j); hook_periodic(h); } diff --git a/src/pipe.c b/src/pipe.c index 76f296c37..4c887d3a0 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -129,18 +129,18 @@ static void * send_loop(void *ctx) { unsigned last_sequenceno = 0; int ret, scanned, sent, ready, cnt = 0; - struct sample *smps[node->vectorize]; + struct sample *smps[node->out.vectorize]; /* Initialize memory */ - ret = pool_init(&sendd.pool, LOG2_CEIL(node->vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage); + ret = pool_init(&sendd.pool, LOG2_CEIL(node->out.vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage); if (ret < 0) error("Failed to allocate memory for receive pool."); while (!io_eof(&io)) { - ready = sample_alloc_many(&sendd.pool, smps, node->vectorize); + ready = sample_alloc_many(&sendd.pool, smps, node->out.vectorize); if (ret < 0) - error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret); - else if (ready < node->vectorize) + error("Failed to get %u samples out of send pool (%d).", node->out.vectorize, ret); + else if (ready < node->out.vectorize) warn("Send pool underrun"); scanned = io_scan(&io, smps, ready); @@ -193,18 +193,18 @@ leave: if (io_eof(&io)) { static void * recv_loop(void *ctx) { int recv, ret, cnt = 0, ready = 0; - struct sample *smps[node->vectorize]; + struct sample *smps[node->in.vectorize]; /* Initialize memory */ - ret = pool_init(&recvv.pool, LOG2_CEIL(node->vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage); + ret = pool_init(&recvv.pool, LOG2_CEIL(node->in.vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage); if (ret < 0) error("Failed to allocate memory for receive pool."); for (;;) { - ready = sample_alloc_many(&recvv.pool, smps, node->vectorize); + ready = sample_alloc_many(&recvv.pool, smps, node->in.vectorize); if (ready < 0) - error("Failed to allocate %u samples from receive pool.", node->vectorize); - else if (ready < node->vectorize) + error("Failed to allocate %u samples from receive pool.", node->in.vectorize); + else if (ready < node->in.vectorize) warn("Receive pool underrun"); recv = node_read(node, smps, ready);