diff --git a/include/villas/hook.h b/include/villas/hook.h index aa1653785..eadd4c281 100644 --- a/include/villas/hook.h +++ b/include/villas/hook.h @@ -47,6 +47,7 @@ struct hook { enum state state; struct path *path; + struct node *node; struct hook_type *_vt; /**< C++ like Vtable pointer. */ void *_vd; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */ @@ -56,7 +57,7 @@ struct hook { json_t *cfg; /**< A JSON object containing the configuration of the hook. */ }; -int hook_init(struct hook *h, struct hook_type *vt, struct path *p); +int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node *n); int hook_parse(struct hook *h, json_t *cfg); int hook_parse_cli(struct hook *h, int argc, char *argv[]); @@ -94,4 +95,4 @@ int hook_cmp_priority(const void *a, const void *b); * hooks = [ "print" ] * } */ -int hook_parse_list(struct list *list, json_t *cfg, struct path *p); +int hook_parse_list(struct list *list, json_t *cfg, struct path *p, struct node *n); diff --git a/include/villas/hook_type.h b/include/villas/hook_type.h index d7c10af61..37d02225a 100644 --- a/include/villas/hook_type.h +++ b/include/villas/hook_type.h @@ -43,9 +43,15 @@ struct hook; struct sample; +enum hook_flags { + HOOK_BUILTIN = (1 << 0), /**< Should we add this hook by default to every path?. */ + HOOK_PATH = (1 << 1), /**< This hook type is used by paths. */ + HOOK_NODE = (1 << 2) /**< This hook type is used by nodes. */ +}; + struct hook_type { int priority; /**< Default priority of this hook type. */ - bool builtin; /**< Should we add this hook by default to every path?. */ + int flags; size_t size; /**< Size of allocation for struct hook::_vd */ diff --git a/include/villas/node.h b/include/villas/node.h index 2b186ee57..94e0235de 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -53,7 +53,11 @@ struct node int id; /**< An id of this node which is only unique in the scope of it's super-node (VILLASnode instance). */ - unsigned long sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */ + unsigned sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */ + + struct stats *stats; /**< Statistic counters. This is a pointer to the statistic hooks private data. */ + + struct list hooks; /**< List of write hooks (struct hook). */ enum state state; diff --git a/include/villas/path.h b/include/villas/path.h index c99e0c575..92a6c7249 100644 --- a/include/villas/path.h +++ b/include/villas/path.h @@ -48,16 +48,13 @@ struct path_source { struct node *node; struct pool pool; - - struct list hooks; /**< Read Hooks. */ - struct list mappings; /**< List of struct mapping_entry */ + struct list mappings; /**< List of mappings (struct mapping_entry). */ }; struct path_destination { struct node *node; - struct queue queue; - struct list hooks; /**< Write Hooks. */ + struct queue queue; }; /** The datastructure for a path. */ @@ -74,7 +71,7 @@ struct path { struct list sources; /**< List of all incoming nodes (struct path_source). */ struct list destinations; /**< List of all outgoing nodes (struct path_destination). */ - struct list hooks; /**< Processing hooks. */ + struct list hooks; /**< List of processing hooks (struct hook). */ int enabled; /**< Is this path enabled. */ int reverse; /**< This path as a matching reverse path. */ @@ -82,19 +79,14 @@ struct path { int samplelen; /**< Will be calculated based on path::sources.mappings */ int sequence; - pthread_t tid; /**< The thread id for this path. */ - char *_name; /**< Singleton: A string which is used to print this path to screen. */ - struct stats *stats; /**< Statistic counters. This is a pointer to the statistic hooks private data. */ - - struct super_node *super_node; /**< The super node this path belongs to. */ - + pthread_t tid; /**< The thread id for this path. */ json_t *cfg; /**< A JSON object containing the configuration of the path. */ }; /** Initialize internal data structures. */ -int path_init(struct path *p, struct super_node *sn); +int path_init(struct path *p); int path_init2(struct path *p); diff --git a/lib/hook.c b/lib/hook.c index ac0c1bb85..574008d22 100644 --- a/lib/hook.c +++ b/lib/hook.c @@ -32,7 +32,7 @@ #include "plugin.h" #include "config_helper.h" -int hook_init(struct hook *h, struct hook_type *vt, struct path *p) +int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node *n) { int ret; @@ -41,6 +41,7 @@ int hook_init(struct hook *h, struct hook_type *vt, struct path *p) h->priority = vt->priority; h->path = p; + h->node = n; h->_vt = vt; h->_vd = alloc(vt->size); @@ -206,7 +207,7 @@ int hook_cmp_priority(const void *a, const void *b) return ha->priority - hb->priority; } -int hook_parse_list(struct list *list, json_t *cfg, struct path *o) +int hook_parse_list(struct list *list, json_t *cfg, struct path *o, struct node *n) { if (!json_is_array(cfg)) error("Hooks must be configured as a list of objects"); @@ -229,7 +230,7 @@ int hook_parse_list(struct list *list, json_t *cfg, struct path *o) struct hook *h = alloc(sizeof(struct hook)); - ret = hook_init(h, &p->hook, o); + ret = hook_init(h, &p->hook, o, n); if (ret) jerror(&err, "Failed to initialize hook"); diff --git a/lib/hooks/convert.c b/lib/hooks/convert.c index 853e1815e..dbf57d095 100644 --- a/lib/hooks/convert.c +++ b/lib/hooks/convert.c @@ -107,6 +107,7 @@ static struct plugin p = { .description = "Convert message from / to floating-point / integer", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_PATH | HOOK_NODE, .priority = 99, .init = convert_init, .parse = convert_parse, diff --git a/lib/hooks/decimate.c b/lib/hooks/decimate.c index 58a74df80..462d775b9 100644 --- a/lib/hooks/decimate.c +++ b/lib/hooks/decimate.c @@ -82,6 +82,7 @@ static struct plugin p = { .description = "Downsamping by integer factor", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_NODE | HOOK_PATH, .priority = 99, .init = decimate_init, .parse = decimate_parse, diff --git a/lib/hooks/drop.c b/lib/hooks/drop.c index 531927f63..abf10a96e 100644 --- a/lib/hooks/drop.c +++ b/lib/hooks/drop.c @@ -27,7 +27,7 @@ #include "hook.h" #include "plugin.h" #include "stats.h" -#include "path.h" +#include "node.h" #include "sample.h" struct drop { @@ -67,8 +67,8 @@ static int drop_read(struct hook *h, struct sample *smps[], unsigned *cnt) dist = cur->sequence - (int32_t) prev->sequence; if (dist <= 0) { debug(10, "Reordered sample: sequence=%u, distance=%d", cur->sequence, dist); - if (h->path && h->path->stats) - stats_update(h->path->stats->delta, STATS_REORDERED, dist); + if (h->node && h->node->stats) + stats_update(h->node->stats->delta, STATS_REORDERED, dist); } else goto ok; @@ -105,8 +105,8 @@ static struct plugin p = { .description = "Drop messages with reordered sequence numbers", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_BUILTIN | HOOK_NODE, .priority = 3, - .builtin = true, .read = drop_read, .start = drop_start, .stop = drop_stop, diff --git a/lib/hooks/fix_ts.c b/lib/hooks/fix_ts.c index 8c016dd58..4e6d8f6b1 100644 --- a/lib/hooks/fix_ts.c +++ b/lib/hooks/fix_ts.c @@ -59,8 +59,8 @@ static struct plugin p = { .description = "Update timestamps of sample if not set", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_NODE | HOOK_BUILTIN, .priority = 0, - .builtin = true, .read = fix_ts_read, } }; diff --git a/lib/hooks/jitter_calc.c b/lib/hooks/jitter_calc.c index de7f3d947..8479c7963 100644 --- a/lib/hooks/jitter_calc.c +++ b/lib/hooks/jitter_calc.c @@ -129,6 +129,7 @@ static struct plugin p = { .description = "Calc jitter, mean and variance of GPS vs NTP TS", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_NODE, .priority = 0, .init = jitter_calc_init, .destroy= jitter_calc_deinit, diff --git a/lib/hooks/map.c b/lib/hooks/map.c index 81ea8222d..7d6602c4d 100644 --- a/lib/hooks/map.c +++ b/lib/hooks/map.c @@ -72,7 +72,7 @@ static int map_parse(struct hook *h, json_t *cfg) return 0; } -static int map_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int map_process(struct hook *h, struct sample *smps[], unsigned *cnt) { int ret; struct map *p = h->_vd; @@ -104,11 +104,12 @@ static struct plugin p = { .description = "Remap values and / or add header, timestamp values to the sample", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_PATH, .priority = 99, .init = map_init, .destroy= map_destroy, .parse = map_parse, - .read = map_read, + .process= map_process, .size = sizeof(struct map) } }; diff --git a/lib/hooks/print.c b/lib/hooks/print.c index f00338510..70645a4d3 100644 --- a/lib/hooks/print.c +++ b/lib/hooks/print.c @@ -101,7 +101,7 @@ static int print_parse(struct hook *h, json_t *cfg) return 0; } -static int print_read(struct hook *h, struct sample *smps[], unsigned *cnt) +static int print_process(struct hook *h, struct sample *smps[], unsigned *cnt) { struct print *p = h->_vd; @@ -115,12 +115,13 @@ static struct plugin p = { .description = "Print the message to stdout", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_NODE | HOOK_PATH, .priority = 99, .init = print_init, .parse = print_parse, .start = print_start, .stop = print_stop, - .read = print_read, + .process= print_process, .size = sizeof(struct print) } }; diff --git a/lib/hooks/restart.c b/lib/hooks/restart.c index 76ce49fd8..fc46b7a1f 100644 --- a/lib/hooks/restart.c +++ b/lib/hooks/restart.c @@ -26,7 +26,7 @@ #include "hook.h" #include "plugin.h" -#include "path.h" +#include "node.h" #include "sample.h" struct restart { @@ -58,19 +58,19 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt) struct restart *r = h->_vd; struct sample *prev, *cur = NULL; - assert(h->path); + assert(h->node); for (i = 0, prev = r->prev; i < *cnt; i++, prev = cur) { cur = smps[i]; if (prev) { if (cur->sequence == 0 && prev->sequence <= UINT32_MAX - 32) { - warn("Simulation for path %s restarted (previous->seq=%u, current->seq=%u)", - path_name(h->path), prev->sequence, cur->sequence); + warn("Simulation from node %s restarted (previous->seq=%u, current->seq=%u)", + node_name(h->node), prev->sequence, cur->sequence); /* Run restart hooks */ - for (size_t i = 0; i < list_length(&h->path->hooks); i++) { - struct hook *k = list_at(&h->path->hooks, i); + for (size_t i = 0; i < list_length(&h->node->hooks); i++) { + struct hook *k = list_at(&h->node->hooks, i); hook_restart(k); } @@ -90,11 +90,11 @@ static int restart_read(struct hook *h, struct sample *smps[], unsigned *cnt) static struct plugin p = { .name = "restart", - .description = "Call restart hooks for current path", + .description = "Call restart hooks for current node", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_NODE | HOOK_BUILTIN, .priority = 1, - .builtin = true, .read = restart_read, .start = restart_start, .stop = restart_stop, diff --git a/lib/hooks/shift_seq.c b/lib/hooks/shift_seq.c index 459fed9a1..a302d03d0 100644 --- a/lib/hooks/shift_seq.c +++ b/lib/hooks/shift_seq.c @@ -63,6 +63,7 @@ static struct plugin p = { .description = "Shift sequence number of samples", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_NODE | HOOK_PATH, .priority = 99, .parse = shift_seq_parse, .read = shift_seq_read, diff --git a/lib/hooks/shift_ts.c b/lib/hooks/shift_ts.c index 9c815279c..6504f2782 100644 --- a/lib/hooks/shift_ts.c +++ b/lib/hooks/shift_ts.c @@ -104,6 +104,7 @@ static struct plugin p = { .description = "Shift timestamps of samples", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_NODE | HOOK_PATH, .priority = 99, .init = shift_ts_init, .parse = shift_ts_parse, diff --git a/lib/hooks/skip_first.c b/lib/hooks/skip_first.c index c908da62d..6659888aa 100644 --- a/lib/hooks/skip_first.c +++ b/lib/hooks/skip_first.c @@ -149,6 +149,7 @@ static struct plugin p = { .description = "Skip the first samples", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_NODE | HOOK_PATH, .priority = 99, .parse = skip_first_parse, .start = skip_first_restart, diff --git a/lib/hooks/stats_collect.c b/lib/hooks/stats_collect.c index 782c1dc26..4c27ca0ed 100644 --- a/lib/hooks/stats_collect.c +++ b/lib/hooks/stats_collect.c @@ -29,7 +29,7 @@ #include "hook.h" #include "plugin.h" #include "stats.h" -#include "path.h" +#include "node.h" struct stats_collect { struct stats stats; @@ -50,8 +50,8 @@ static int stats_collect_init(struct hook *h) /* Register statistic object to path. * * This allows the path code to update statistics. */ - if (h->path) - h->path->stats = &p->stats; + if (h->node) + h->node->stats = &p->stats; /* Set default values */ p->format = STATS_FORMAT_HUMAN; @@ -165,6 +165,7 @@ static struct plugin p = { .description = "Collect statistics for the current path", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_NODE, .priority = 2, .init = stats_collect_init, .destroy= stats_collect_destroy, diff --git a/lib/hooks/ts.c b/lib/hooks/ts.c index 3f078b4de..0fb7d86e6 100644 --- a/lib/hooks/ts.c +++ b/lib/hooks/ts.c @@ -42,6 +42,7 @@ static struct plugin p = { .description = "Overwrite origin timestamp of samples with receive timestamp", .type = PLUGIN_TYPE_HOOK, .hook = { + .flags = HOOK_NODE, .priority = 99, .read = ts_read, .size = 0 diff --git a/lib/mapping.c b/lib/mapping.c index f81d97bf4..cb1fbf5e4 100644 --- a/lib/mapping.c +++ b/lib/mapping.c @@ -159,7 +159,7 @@ int mapping_parse_str(struct mapping_entry *e, const char *str, struct list *nod } else { e->data.offset = 0; - e->length = e->node->samplelen; + e->length = e->node ? e->node->samplelen : 0; } } else diff --git a/lib/node.c b/lib/node.c index 2cbe0f6aa..747181ee8 100644 --- a/lib/node.c +++ b/lib/node.c @@ -33,12 +33,14 @@ int node_init(struct node *n, struct node_type *vt) { static int max_id; + int ret; assert(n->state == STATE_DESTROYED); n->_vt = vt; n->_vd = alloc(vt->size); - + + n->stats = NULL; n->name = NULL; n->_name = NULL; n->_name_long = NULL; @@ -51,26 +53,58 @@ int node_init(struct node *n, struct node_type *vt) list_push(&vt->instances, n); + list_init(&n->hooks); + + /* Add internal hooks if they are not already in the list */ + for (size_t i = 0; i < list_length(&plugins); i++) { + struct plugin *q = list_at(&plugins, i); + + if (q->type != PLUGIN_TYPE_HOOK) + continue; + + struct hook_type *vt = &q->hook; + + if ((vt->flags & HOOK_NODE) && (vt->flags & HOOK_BUILTIN)) { + struct hook *h = alloc(sizeof(struct hook)); + + ret = hook_init(h, vt, NULL, n); + if (ret) + return ret; + + list_push(&n->hooks, h); + } + } + n->state = STATE_INITIALIZED; return 0; } +int node_init2(struct node *n) +{ + /* We sort the hooks according to their priority before starting the path */ + list_sort(&n->hooks, hook_cmp_priority); + + return 0; +} + int node_parse(struct node *n, json_t *cfg, const char *name) { struct plugin *p; int ret; json_error_t err; + json_t *json_hooks = NULL; const char *type; n->name = strdup(name); - ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: i, s?: i }", + ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: i, s?: i, s?: o }", "type", &type, "vectorize", &n->vectorize, - "samplelen", &n->samplelen + "samplelen", &n->samplelen, + "hooks", &json_hooks ); if (ret) jerror(&err, "Failed to parse node '%s'", node_name(n)); @@ -78,6 +112,12 @@ int node_parse(struct node *n, json_t *cfg, const char *name) p = plugin_lookup(PLUGIN_TYPE_NODE, type); assert(&p->node == n->_vt); + if (json_hooks) { + ret = hook_parse_list(&n->hooks, json_hooks, NULL, n); + if (ret < 0) + return ret; + } + ret = n->_vt->parse ? n->_vt->parse(n, cfg) : 0; if (ret) error("Failed to parse node '%s'", node_name(n)); @@ -138,6 +178,14 @@ int node_start(struct node *n) info("Starting node %s", node_name_long(n)); { INDENT + for (size_t i = 0; i < list_length(&n->hooks); i++) { + struct hook *h = list_at(&n->hooks, i); + + ret = hook_start(h); + if (ret) + return ret; + } + ret = n->_vt->start ? n->_vt->start(n) : 0; if (ret) return ret; @@ -159,6 +207,14 @@ int node_stop(struct node *n) info("Stopping node %s", node_name(n)); { INDENT + for (size_t i = 0; i < list_length(&n->hooks); i++) { + struct hook *h = list_at(&n->hooks, i); + + ret = hook_stop(h); + if (ret) + return ret; + } + ret = n->_vt->stop ? n->_vt->stop(n) : 0; } @@ -172,6 +228,8 @@ int node_destroy(struct node *n) { assert(n->state != STATE_DESTROYED && n->state != STATE_STARTED); + list_destroy(&n->hooks, (dtor_cb_t) hook_destroy, true); + if (n->_vt->destroy) n->_vt->destroy(n); @@ -196,7 +254,7 @@ int node_destroy(struct node *n) int node_read(struct node *n, struct sample *smps[], unsigned cnt) { - int readd, nread = 0; + int readd, rread, nread = 0; if (!n->_vt->read) return -1; @@ -223,7 +281,17 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt) for (int i = 0; i < nread; i++) smps[i]->source = n; - return nread; + rread = hook_read_list(&n->hooks, smps, nread); + if (nread != rread) { + int skipped = nread - rread; + + debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for node %s", skipped, nread, node_name(n)); + + if (n->stats) + stats_update(n->stats->delta, STATS_SKIPPED, skipped); + } + + return rread; } int node_write(struct node *n, struct sample *smps[], unsigned cnt) @@ -233,6 +301,10 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt) if (!n->_vt->write) return -1; + cnt = hook_write_list(&n->hooks, smps, cnt); + if (cnt <= 0) + return cnt; + /* Send in parts if vector not supported */ if (n->_vt->vectorize > 0 && n->_vt->vectorize < cnt) { while (cnt - nsent > 0) { @@ -269,7 +341,7 @@ char * node_name_long(struct node *n) if (n->_vt->print) { struct node_type *vt = n->_vt; char *name_long = vt->print(n); - strcatf(&n->_name_long, "%s: id=%d, vectorize=%d, samplelen=%d, %s", node_name(n), n->id, n->vectorize, n->samplelen, name_long); + strcatf(&n->_name_long, "%s: #hooks=%zu, id=%d, vectorize=%d, samplelen=%d, %s", node_name(n), list_length(&n->hooks), n->id, n->vectorize, n->samplelen, name_long); free(name_long); } else diff --git a/lib/path.c b/lib/path.c index b867f678b..6728705f5 100644 --- a/lib/path.c +++ b/lib/path.c @@ -35,12 +35,33 @@ #include "queue.h" #include "hook.h" #include "plugin.h" -#include "super_node.h" #include "memory.h" #include "stats.h" #include "node.h" -static void path_read_source(struct path *p, struct path_source *ps) +static int path_source_init(struct path_source *ps) +{ + int ret; + + ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, ps->node->vectorize), SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage); + if (ret) + return ret; + + return 0; +} + +static int path_source_destroy(struct path_source *ps) +{ + int ret; + + ret = pool_destroy(&ps->pool); + if (ret) + return ret; + + return 0; +} + +static void path_source_read(struct path *p, struct path_source *ps) { int ready, recv, mux, enqueue, enqueued; int cnt = ps->node->vectorize; @@ -55,19 +76,12 @@ static void path_read_source(struct path *p, struct path_source *ps) /* Read ready samples and store them to blocks pointed by smps[] */ recv = node_read(ps->node, read_smps, ready); - if (recv < 0) + if (recv == 0) + goto out2; + else if (recv < 0) error("Failed to receive message from node %s", node_name(ps->node)); else if (recv < ready) warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready); - - /* Run read hooks */ - enqueue = hook_read_list(&p->hooks, read_smps, recv); - if (enqueue != recv) { - debug(LOG_PATH | 10, "Hooks skipped %u out of %u samples for path %s", recv - enqueue, recv, path_name(p)); - - if (p->stats) - stats_update(p->stats->delta, STATS_SKIPPED, recv - enqueue); - } /* Mux samples */ mux = sample_alloc(&p->pool, muxed_smps, recv); @@ -84,12 +98,8 @@ static void path_read_source(struct path *p, struct path_source *ps) /* Run processing hooks */ enqueue = hook_process_list(&p->hooks, muxed_smps, mux); - if (enqueue != mux) { - debug(LOG_PATH | 10, "Hooks skipped %u out of %u samples for path %s", mux - enqueue, recv, path_name(p)); - - if (p->stats) - stats_update(p->stats->delta, STATS_SKIPPED, mux - enqueue); - } + if (enqueue == 0) + goto out1; /* Keep track of the lowest index that wasn't enqueued; * all following samples must be freed here */ @@ -106,7 +116,30 @@ static void path_read_source(struct path *p, struct path_source *ps) debug(LOG_PATH | 15, "Enqueued %u samples from %s to queue of %s", enqueued, node_name(ps->node), node_name(pd->node)); } - sample_put_many(muxed_smps, ready); +out1: sample_put_many(muxed_smps, recv); +out2: sample_put_many(read_smps, ready); +} + +static int path_destination_init(struct path_destination *pd, int queuelen) +{ + int ret; + + ret = queue_init(&pd->queue, queuelen, &memtype_hugepage); + if (ret) + return ret; + + return 0; +} + +static int path_destination_destroy(struct path_destination *pd) +{ + int ret; + + ret = queue_destroy(&pd->queue); + if (ret) + return ret; + + return 0; } static void path_poll(struct path *p) @@ -122,7 +155,7 @@ static void path_poll(struct path *p) struct path_source *ps = list_at(&p->sources, i); if (p->reader.pfds[i].revents & POLLIN) { - path_read_source(p, ps); + path_source_read(p, ps); updates++; } } @@ -135,7 +168,6 @@ static void path_write(struct path *p) int cnt = pd->node->vectorize; int sent; - int tosend; int available; int released; @@ -151,15 +183,11 @@ static void path_write(struct path *p) debug(LOG_PATH | 15, "Dequeued %u samples from queue of node %s which is part of path %s", available, node_name(pd->node), path_name(p)); - tosend = hook_write_list(&p->hooks, smps, available); - if (tosend == 0) - continue; - - sent = node_write(pd->node, smps, tosend); + sent = node_write(pd->node, smps, available); if (sent < 0) error("Failed to sent %u samples to node %s", cnt, node_name(pd->node)); - else if (sent < tosend) - warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, tosend); + else if (sent < available) + warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, available); released = sample_put_many(smps, sent); @@ -174,29 +202,20 @@ static void * path_run(void *arg) struct path *p = arg; for (;;) { - path_poll(p); + /* We only need to poll in case there is more than one source */ + if (list_length(&p->sources) > 1) + path_poll(p); + path_write(p); } return NULL; } -static int path_source_destroy(struct path_source *ps) +int path_init(struct path *p) { - pool_destroy(&ps->pool); + int ret; - return 0; -} - -static int path_destination_destroy(struct path_destination *pd) -{ - queue_destroy(&pd->queue); - - return 0; -} - -int path_init(struct path *p, struct super_node *sn) -{ assert(p->state == STATE_DESTROYED); list_init(&p->hooks); @@ -210,7 +229,25 @@ int path_init(struct path *p, struct super_node *sn) p->enabled = 1; p->queuelen = DEFAULT_QUEUELEN; - p->super_node = sn; + /* Add internal hooks if they are not already in the list */ + for (size_t i = 0; i < list_length(&plugins); i++) { + struct plugin *q = list_at(&plugins, i); + + if (q->type != PLUGIN_TYPE_HOOK) + continue; + + struct hook_type *vt = &q->hook; + + if ((vt->flags & HOOK_PATH) && (vt->flags & HOOK_BUILTIN)) { + struct hook *h = alloc(sizeof(struct hook)); + + ret = hook_init(h, vt, p, NULL); + if (ret) + return ret; + + list_push(&p->hooks, h); + } + } p->state = STATE_INITIALIZED; @@ -223,27 +260,6 @@ int path_init2(struct path *p) assert(p->state == STATE_CHECKED); - /* Add internal hooks if they are not already in the list*/ - for (size_t i = 0; i < list_length(&plugins); i++) { - struct plugin *q = list_at(&plugins, i); - - if (q->type == PLUGIN_TYPE_HOOK) { - struct hook_type *vt = &q->hook; - - if (vt->builtin) { - struct hook *h = alloc(sizeof(struct hook)); - - ret = hook_init(h, vt, p); - if (ret) { - free(h); - return ret; - } - - list_push(&p->hooks, h); - } - } - } - /* We sort the hooks according to their priority before starting the path */ list_sort(&p->hooks, hook_cmp_priority); @@ -251,30 +267,34 @@ int path_init2(struct path *p) for (size_t i = 0; i < list_length(&p->destinations); i++) { struct path_destination *pd = list_at(&p->destinations, i); - ret = queue_init(&pd->queue, p->queuelen, &memtype_hugepage); + ret = path_destination_init(pd, p->queuelen); if (ret) return ret; } /* Initialize sources */ + for (size_t i = 0; i < list_length(&p->sources); i++) { + struct path_source *ps = list_at(&p->sources, i); + + ret = path_source_init(ps); + if (ret) + return ret; + } + + /* Calc sample length of path */ p->samplelen = 0; for (size_t i = 0; i < list_length(&p->sources); i++) { struct path_source *ps = list_at(&p->sources, i); - /** @todo replace p->queuelen with ps->node->vectorize ? */ - ret = pool_init(&ps->pool, p->queuelen, SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage); - if (ret) - error("Failed to allocate memory pool for path"); - for (size_t i = 0; i < list_length(&ps->mappings); i++) { struct mapping_entry *me = list_at(&ps->mappings, i); - + if (me->offset + me->length > p->samplelen) p->samplelen = me->offset + me->length; } } - - ret = pool_init(&p->pool, list_length(&p->destinations) * p->queuelen, SAMPLE_LEN(p->samplelen), &memtype_hugepage); + + ret = pool_init(&p->pool, MAX(1, list_length(&p->destinations)) * p->queuelen, SAMPLE_LEN(p->samplelen), &memtype_hugepage); if (ret) return ret; @@ -353,10 +373,11 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) ps = alloc(sizeof(struct path_source)); ps->node = me->node; - + + ps->mappings.state = STATE_DESTROYED; + list_init(&ps->mappings); - list_init(&ps->hooks); - + list_push(&p->sources, ps); } @@ -369,14 +390,12 @@ int path_parse(struct path *p, json_t *cfg, struct list *nodes) struct path_destination *pd = alloc(sizeof(struct path_destination)); pd->node = n; - - list_init(&pd->hooks); list_push(&p->destinations, pd); } if (json_hooks) { - ret = hook_parse_list(&p->hooks, json_hooks, p); + ret = hook_parse_list(&p->hooks, json_hooks, p, NULL); if (ret) return ret; } @@ -571,7 +590,7 @@ int path_reverse(struct path *p, struct path *r) struct hook *h = list_at(&p->hooks, i); struct hook *g = alloc(sizeof(struct hook)); - ret = hook_init(g, h->_vt, r); + ret = hook_init(g, h->_vt, r, NULL); if (ret) return ret; diff --git a/lib/super_node.c b/lib/super_node.c index fac76c93f..8f05377f9 100644 --- a/lib/super_node.c +++ b/lib/super_node.c @@ -285,7 +285,7 @@ int super_node_parse_json(struct super_node *sn, json_t *cfg) json_array_foreach(cfg_paths, index, cfg_path) { struct path *p = alloc(sizeof(struct path)); - ret = path_init(p, sn); + ret = path_init(p); if (ret) error("Failed to initialize path"); @@ -298,7 +298,7 @@ int super_node_parse_json(struct super_node *sn, json_t *cfg) if (p->reverse) { struct path *r = alloc(sizeof(struct path)); - ret = path_init(r, sn); + ret = path_init(r); if (ret) error("Failed to init path"); diff --git a/src/hook.c b/src/hook.c index 9782ced1c..f82aa4439 100644 --- a/src/hook.c +++ b/src/hook.c @@ -193,7 +193,7 @@ check: if (optarg == endptr) if (!p) error("Unknown hook function '%s'", hook); - ret = hook_init(&h, &p->hook, NULL); + ret = hook_init(&h, &p->hook, NULL, NULL); if (ret) error("Failed to initialize hook"); diff --git a/src/node.c b/src/node.c index e4a18f68e..0d2bb31af 100644 --- a/src/node.c +++ b/src/node.c @@ -170,6 +170,20 @@ int main(int argc, char *argv[]) hook_periodic(h); } } + + for (size_t i = 0; i < list_length(&sn.nodes); i++) { + struct node *n = list_at(&sn.nodes, i); + + if (n->state != STATE_STARTED) + continue; + + for (size_t j = 0; j < list_length(&n->hooks); j++) { + struct hook *h = list_at(&n->hooks, j); + + hook_periodic(h); + } + } + } } else {