diff --git a/include/villas/hook.h b/include/villas/hook.h index 2d81af6d4..4451ad2c0 100644 --- a/include/villas/hook.h +++ b/include/villas/hook.h @@ -26,6 +26,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ + /** * @addtogroup hooks User-defined hook functions * @ingroup path @@ -35,6 +36,7 @@ #pragma once #include +#include #include #ifdef __cplusplus @@ -44,7 +46,6 @@ extern "C" { /* Forward declarations */ struct path; struct sample; -struct vlist; /** Descriptor for user defined hooks. See hooks[]. */ struct hook { @@ -56,6 +57,8 @@ struct hook { struct path *path; struct node *node; + struct vlist signals; + struct hook_type *_vt; /**< C++ like Vtable pointer. */ void *_vd; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */ @@ -63,10 +66,13 @@ struct hook { }; int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node *n); -int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path *p, struct node *n); + +int hook_init_signals(struct hook *h, struct vlist *signals); int hook_parse(struct hook *h, json_t *cfg); +int hook_prepare(struct hook *h, struct vlist *signals); + int hook_destroy(struct hook *h); int hook_start(struct hook *h); @@ -78,7 +84,6 @@ int hook_periodic(struct hook *h); int hook_restart(struct hook *h); int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt); -int hook_process_list(struct vlist *hs, struct sample *smps[], unsigned cnt); /** Compare two hook functions with their priority. Used by vlist_sort() */ int hook_cmp_priority(const void *a, const void *b); @@ -89,6 +94,10 @@ struct hook_type * hook_type(struct hook *h) return h->_vt; } +int hook_list_init(struct vlist *hs); + +int hook_list_destroy(struct vlist *hs); + /** Parses an object of hooks * * Example: @@ -103,7 +112,15 @@ struct hook_type * hook_type(struct hook *h) * hooks = [ "print" ] * } */ -int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *p, struct node *n); +int hook_list_parse(struct vlist *hs, json_t *cfg, int mask, struct path *p, struct node *n); + +int hook_list_prepare(struct vlist *hs, struct vlist *sigs, int mask, struct path *p, struct node *n); + +int hook_list_prepare_signals(struct vlist *hs, struct vlist *signals); + +int hook_list_add(struct vlist *hs, int mask, struct path *p, struct node *n); + +int hook_list_process(struct vlist *hs, struct sample *smps[], unsigned cnt); #ifdef __cplusplus } diff --git a/include/villas/hook_type.h b/include/villas/hook_type.h index a7089af19..632b8705d 100644 --- a/include/villas/hook_type.h +++ b/include/villas/hook_type.h @@ -65,6 +65,8 @@ struct hook_type { int (*init)(struct hook *h); /**< Called before hook is started to parsed. */ int (*destroy)(struct hook *h); /**< Called after path has been stopped to release memory allocated by HOOK_INIT */ + int (*init_signals)(struct hook *h); + int (*start)(struct hook *h); /**< Called whenever a hook is started; before threads are created. */ int (*stop)(struct hook *h); /**< Called whenever a hook is stopped; after threads are destoyed. */ diff --git a/include/villas/mapping.h b/include/villas/mapping.h index 6c9522af3..9515da8f0 100644 --- a/include/villas/mapping.h +++ b/include/villas/mapping.h @@ -88,18 +88,20 @@ struct mapping_entry { }; }; -int mapping_remap(const struct vlist *m, struct sample *remapped, const struct sample *original, const struct stats *s); - -int mapping_update(const struct mapping_entry *e, struct sample *remapped, const struct sample *original, const struct stats *s); +int mapping_update(const struct mapping_entry *e, struct sample *remapped, const struct sample *original); int mapping_parse(struct mapping_entry *e, json_t *cfg, struct vlist *nodes); int mapping_parse_str(struct mapping_entry *e, const char *str, struct vlist *nodes); -int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes); - int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str); +int mapping_list_parse(struct vlist *ml, json_t *cfg, struct vlist *nodes); + +int mapping_list_prepare(struct vlist *ml); + +int mapping_list_remap(const struct vlist *ml, struct sample *remapped, const struct sample *original); + #ifdef __cplusplus } #endif diff --git a/lib/hook.c b/lib/hook.c index 85bd71b2f..751f32729 100644 --- a/lib/hook.c +++ b/lib/hook.c @@ -42,6 +42,12 @@ int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node h->path = p; h->node = n; + h->signals.state = STATE_DESTROYED; + + ret = signal_list_init(&h->signals); + if (ret) + return ret; + h->_vt = vt; h->_vd = alloc(vt->size); @@ -49,7 +55,30 @@ int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node if (ret) return ret; - h->state = STATE_INITIALIZED; + // We dont need to parse builtin hooks + h->state = hook_type(h)->flags & HOOK_BUILTIN ? STATE_PARSED : STATE_INITIALIZED; + + return 0; +} + +int hook_prepare(struct hook *h, struct vlist *signals) +{ + int ret; + + assert(h->state == STATE_PARSED); + + if (!h->enabled) + return 0; + + ret = signal_list_copy(&h->signals, signals); + if (ret) + return -1; + + ret = hook_type(h)->init_signals ? hook_type(h)->init_signals(h) : 0; + if (ret) + return ret; + + h->state = STATE_PREPARED; return 0; } @@ -82,7 +111,11 @@ int hook_destroy(struct hook *h) { int ret; - assert(h->state != STATE_DESTROYED); + assert(h->state != STATE_DESTROYED && h->state != STATE_STARTED); + + ret = signal_list_destroy(&h->signals); + if (ret) + return ret; ret = hook_type(h)->destroy ? hook_type(h)->destroy(h) : 0; if (ret) @@ -98,34 +131,46 @@ int hook_destroy(struct hook *h) int hook_start(struct hook *h) { + int ret; + assert(h->state == STATE_PREPARED); + if (!h->enabled) return 0; - if (hook_type(h)->start) { - debug(LOG_HOOK | 10, "Start hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority); + debug(LOG_HOOK | 10, "Start hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority); - return hook_type(h)->start(h); - } - else - return 0; + ret = hook_type(h)->start ? hook_type(h)->start(h) : 0; + if (ret) + return ret; + + h->state = STATE_STARTED; + + return 0; } int hook_stop(struct hook *h) { + int ret; + assert(h->state == STATE_STARTED); + if (!h->enabled) return 0; - if (hook_type(h)->stop) { - debug(LOG_HOOK | 10, "Stopping hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority); + debug(LOG_HOOK | 10, "Stopping hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority); - return hook_type(h)->stop(h); - } - else - return 0; + ret = hook_type(h)->stop ? hook_type(h)->stop(h) : 0; + if (ret) + return ret; + + h->state = STATE_STOPPED; + + return 0; } int hook_periodic(struct hook *h) { + assert(h->state == STATE_STARTED); + if (!h->enabled) return 0; @@ -140,47 +185,36 @@ int hook_periodic(struct hook *h) int hook_restart(struct hook *h) { + int ret; + assert(h->state == STATE_STARTED); + if (!h->enabled) return 0; - if (hook_type(h)->restart) { - debug(LOG_HOOK | 10, "Restarting hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority); + debug(LOG_HOOK | 10, "Restarting hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority); - return hook_type(h)->restart(h); - } - else - return 0; + ret = hook_type(h)->restart ? hook_type(h)->restart(h) : 0; + if (ret) + return ret; + + return 0; } int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt) { + int ret; + assert(h->state == STATE_STARTED); + if (!h->enabled) return 0; - if (hook_type(h)->process) { - debug(LOG_HOOK | 10, "Process hook %s: priority=%d, cnt=%d", hook_type_name(hook_type(h)), h->priority, *cnt); + debug(LOG_HOOK | 10, "Process hook %s: priority=%d, cnt=%d", hook_type_name(hook_type(h)), h->priority, *cnt); - return hook_type(h)->process(h, smps, cnt); - } - else - return 0; -} + ret = hook_type(h)->process ? hook_type(h)->process(h, smps, cnt) : 0; + if (ret) + return ret; -int hook_process_list(struct vlist *hs, struct sample *smps[], unsigned cnt) -{ - unsigned ret; - - for (size_t i = 0; i < vlist_length(hs); i++) { - struct hook *h = (struct hook *) vlist_at(hs, i); - - ret = hook_process(h, smps, &cnt); - if (ret || !cnt) - /* Abort hook processing if earlier hooks removed all samples - * or they returned something non-zero */ - break; - } - - return cnt; + return 0; } int hook_cmp_priority(const void *a, const void *b) @@ -191,7 +225,29 @@ int hook_cmp_priority(const void *a, const void *b) return ha->priority - hb->priority; } -int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *o, struct node *n) +int hook_list_init(struct vlist *hs) +{ + int ret; + + ret = vlist_init(hs); + if (ret) + return ret; + + return 0; +} + +int hook_list_destroy(struct vlist *hs) +{ + int ret; + + ret = vlist_destroy(hs, (dtor_cb_t) hook_destroy, true); + if (ret) + return ret; + + return 0; +} + +int hook_list_parse(struct vlist *hs, json_t *cfg, int mask, struct path *o, struct node *n) { if (!json_is_array(cfg)) error("Hooks must be configured as a list of objects"); @@ -225,17 +281,42 @@ int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *o, s if (ret) jerror(&err, "Failed to parse hook configuration"); - vlist_push(list, h); + vlist_push(hs, h); } return 0; } -int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path *p, struct node *n) +int hook_list_prepare(struct vlist *hs, struct vlist *sigs, int m, struct path *p, struct node *n) { int ret; - assert(l->state == STATE_INITIALIZED); + /* Add internal hooks if they are not already in the list */ + ret = hook_list_add(hs, m, p, n); + if (ret) + return ret; + + /* We sort the hooks according to their priority */ + vlist_sort(hs, hook_cmp_priority); + + for (size_t i = 0; i < vlist_length(hs); i++) { + struct hook *h = (struct hook *) vlist_at(hs, i); + + ret = hook_prepare(h, sigs); + if (ret) + return ret; + + sigs = &h->signals; + } + + return 0; +} + +int hook_list_add(struct vlist *hs, int mask, struct path *p, struct node *n) +{ + int ret; + + assert(hs->state == STATE_INITIALIZED); for (size_t i = 0; i < vlist_length(&plugins); i++) { struct plugin *q = (struct plugin *) vlist_at(&plugins, i); @@ -246,11 +327,7 @@ int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path if (q->type != PLUGIN_TYPE_HOOK) continue; - if (builtin && - vt->flags & HOOK_BUILTIN && - vt->flags & mask) - { - + if ((vt->flags & mask) == mask) { h = (struct hook *) alloc(sizeof(struct hook)); if (!h) return -1; @@ -259,7 +336,7 @@ int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path if (ret) return ret; - vlist_push(l, h); + vlist_push(hs, h); } } @@ -270,3 +347,20 @@ const char * hook_type_name(struct hook_type *vt) { return plugin_name(vt); } + +int hook_list_process(struct vlist *hs, struct sample *smps[], unsigned cnt) +{ + unsigned ret; + + for (size_t i = 0; i < vlist_length(hs); i++) { + struct hook *h = (struct hook *) vlist_at(hs, i); + + ret = hook_process(h, smps, &cnt); + if (ret || !cnt) + /* Abort hook processing if earlier hooks removed all samples + * or they returned something non-zero */ + break; + } + + return cnt; +} diff --git a/lib/mapping.c b/lib/mapping.c index 7069d013b..5108471e7 100644 --- a/lib/mapping.c +++ b/lib/mapping.c @@ -144,7 +144,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n else { /* Map all signals */ me->data.offset = 0; - me->length = me->node ? vlist_length(&me->node->in.signals) : 0; + me->length = -1; goto end; } @@ -201,9 +201,9 @@ int mapping_parse(struct mapping_entry *me, json_t *cfg, struct vlist *nodes) return mapping_parse_str(me, str, nodes); } -int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes) +int mapping_list_parse(struct vlist *ml, json_t *cfg, struct vlist *nodes) { - int ret, off; + int ret; size_t i; json_t *json_entry; @@ -218,7 +218,6 @@ int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes) else return -1; - off = 0; json_array_foreach(json_mapping, i, json_entry) { struct mapping_entry *me = (struct mapping_entry *) alloc(sizeof(struct mapping_entry)); @@ -226,10 +225,7 @@ int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes) if (ret) goto out; - me->offset = off; - off += me->length; - - vlist_push(l, me); + vlist_push(ml, me); } ret = 0; @@ -239,22 +235,16 @@ out: json_decref(json_mapping); return ret; } -int mapping_update(const struct mapping_entry *me, struct sample *remapped, const struct sample *original, const struct stats *s) +int mapping_update(const struct mapping_entry *me, struct sample *remapped, const struct sample *original) { - int len = me->length; - int off = me->offset; - - /* me->length == 0 means that we want to take all values */ - if (!len) - len = original->length; - - if (len + off > remapped->capacity) + if (me->length + me->offset > remapped->capacity) return -1; switch (me->type) { - case MAPPING_TYPE_STATS: - remapped->data[off++] = stats_get_value(s, me->stats.metric, me->stats.type); + case MAPPING_TYPE_STATS: { + remapped->data[me->offset] = stats_get_value(me->node->stats, me->stats.metric, me->stats.type); break; + } case MAPPING_TYPE_TIMESTAMP: { const struct timespec *ts; @@ -270,8 +260,8 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons return -1; } - remapped->data[off++].i = ts->tv_sec; - remapped->data[off++].i = ts->tv_nsec; + remapped->data[me->offset + 0].i = ts->tv_sec; + remapped->data[me->offset + 1].i = ts->tv_nsec; break; } @@ -279,11 +269,13 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons case MAPPING_TYPE_HEADER: switch (me->header.type) { case MAPPING_HEADER_TYPE_LENGTH: - remapped->data[off++].i = original->length; + remapped->data[me->offset].i = original->length; break; + case MAPPING_HEADER_TYPE_SEQUENCE: - remapped->data[off++].i = original->sequence; + remapped->data[me->offset].i = original->sequence; break; + default: return -1; } @@ -291,11 +283,11 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons break; case MAPPING_TYPE_DATA: - for (int j = me->data.offset; j < len + me->data.offset; j++) { + for (int j = me->data.offset, i = me->offset; j < me->length + me->data.offset; j++, i++) { if (j >= original->length) - remapped->data[off++].f = 0; + remapped->data[i].f = -1; else - remapped->data[off++] = original->data[j]; + remapped->data[i] = original->data[j]; } break; @@ -304,14 +296,14 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons return 0; } -int mapping_remap(const struct vlist *m, struct sample *remapped, const struct sample *original, const struct stats *s) +int mapping_list_remap(const struct vlist *ml, struct sample *remapped, const struct sample *original) { int ret; - for (size_t i = 0; i < vlist_length(m); i++) { - struct mapping_entry *me = (struct mapping_entry *) vlist_at(m, i); + for (size_t i = 0; i < vlist_length(ml); i++) { + struct mapping_entry *me = (struct mapping_entry *) vlist_at(ml, i); - ret = mapping_update(me, remapped, original, s); + ret = mapping_update(me, remapped, original); if (ret) return ret; } @@ -319,6 +311,24 @@ int mapping_remap(const struct vlist *m, struct sample *remapped, const struct s return 0; } +int mapping_list_prepare(struct vlist *ml) +{ + for (size_t i = 0, off = 0; i < vlist_length(ml); i++) { + struct mapping_entry *me = (struct mapping_entry *) vlist_at(ml, i); + + if (me->length < 0) { + struct vlist *sigs = node_get_signals(me->node, NODE_DIR_IN); + + me->length = vlist_length(sigs); + } + + me->offset = off; + off += me->length; + } + + return 0; +} + int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str) { const char *type; diff --git a/lib/node.c b/lib/node.c index 8a38ff267..b1f94f6ec 100644 --- a/lib/node.c +++ b/lib/node.c @@ -65,11 +65,11 @@ int node_init(struct node *n, struct node_type *vt) #endif /* WITH_NETEM */ /* Default values */ - ret = node_direction_init(&n->in, n); + ret = node_direction_init(&n->in, NODE_DIR_IN, n); if (ret) return ret; - ret = node_direction_init(&n->out, n); + ret = node_direction_init(&n->out, NODE_DIR_OUT, n); if (ret) return ret; @@ -94,7 +94,7 @@ int node_prepare(struct node *n) if (ret) return ret; - ret = node_direction_prepare(&n->in, n); + ret = node_direction_prepare(&n->out, n); if (ret) return ret; @@ -418,7 +418,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel #ifdef WITH_HOOKS /* Run read hooks */ - int rread = hook_process_list(&n->in.hooks, smps, nread); + int rread = hook_list_process(&n->in.hooks, smps, nread); int skipped = nread - rread; if (skipped > 0 && n->stats != NULL) { @@ -448,7 +448,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re #ifdef WITH_HOOKS /* Run write hooks */ - cnt = hook_process_list(&n->out.hooks, smps, cnt); + cnt = hook_list_process(&n->out.hooks, smps, cnt); if (cnt <= 0) return cnt; #endif /* WITH_HOOKS */ @@ -479,7 +479,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re char * node_name(struct node *n) { if (!n->_name) - strcatf(&n->_name, CLR_RED("%s") "(" CLR_YEL("%s") ")", n->name, node_type_name(n->_vt)); + strcatf(&n->_name, CLR_RED("%s") "(" CLR_YEL("%s") ")", n->name, node_type_name(node_type(n))); return n->_name; } diff --git a/lib/node_direction.c b/lib/node_direction.c index 7029f7549..6d9361021 100644 --- a/lib/node_direction.c +++ b/lib/node_direction.c @@ -226,7 +226,7 @@ struct vlist * node_direction_get_signals(struct node_direction *nd) struct hook *h = vlist_last(&nd->hooks); - return h->signals; + return &h->signals; #else return &nd->signals; #endif diff --git a/lib/path.c b/lib/path.c index 177573529..767fd2abf 100644 --- a/lib/path.c +++ b/lib/path.c @@ -241,19 +241,21 @@ int path_prepare(struct path *p) if (ps->masked) bitset_set(&p->mask, i); + ret = mapping_list_prepare(&ps->mappings); + if (ret) + return ret; + for (size_t i = 0; i < vlist_length(&ps->mappings); i++) { struct mapping_entry *me = (struct mapping_entry *) vlist_at(&ps->mappings, i); + struct vlist *sigs = node_get_signals(me->node, NODE_DIR_IN); - int off = me->offset; - int len = me->length; - - for (int j = 0; j < len; j++) { + for (int j = 0; j < me->length; j++) { struct signal *sig; /* For data mappings we simple refer to the existing * signal descriptors of the source node. */ if (me->type == MAPPING_TYPE_DATA) { - sig = (struct signal *) vlist_at_safe(&me->node->in.signals, me->data.offset + j); + sig = (struct signal *) vlist_at_safe(sigs, me->data.offset + j); if (!sig) { warning("Failed to create signal description for path %s", path_name(p)); continue; @@ -270,8 +272,8 @@ int path_prepare(struct path *p) return -1; } - vlist_extend(&p->signals, off + j + 1, NULL); - vlist_set(&p->signals, off + j, sig); + vlist_extend(&p->signals, me->offset + j + 1, NULL); + vlist_set(&p->signals, me->offset + j, sig); } } } @@ -328,7 +330,7 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes) jerror(&err, "Failed to parse path configuration"); /* Input node(s) */ - ret = mapping_parse_list(&sources, json_in, nodes); + ret = mapping_list_parse(&sources, json_in, nodes); if (ret) error("Failed to parse input mapping of path %s", path_name(p)); diff --git a/lib/path_source.c b/lib/path_source.c index ae8225588..21bd21f36 100644 --- a/lib/path_source.c +++ b/lib/path_source.c @@ -121,7 +121,7 @@ int path_source_read(struct path_source *ps, struct path *p, int i) muxed_smps[i]->ts = tomux_smps[i]->ts; muxed_smps[i]->flags |= tomux_smps[i]->flags & (SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_TS_RECEIVED); - mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL); + mapping_list_remap(&ps->mappings, muxed_smps[i], tomux_smps[i]); } sample_copy(p->last_sample, muxed_smps[tomux-1]); diff --git a/lib/super_node.cpp b/lib/super_node.cpp index f4780ec1e..78c241e04 100644 --- a/lib/super_node.cpp +++ b/lib/super_node.cpp @@ -420,12 +420,6 @@ void SuperNode::preparePaths() } void SuperNode::prepare() -{ - prepareNodes(); - preparePaths(); -} - -void SuperNode::start() { int ret; @@ -437,6 +431,18 @@ void SuperNode::start() kernel::rt::init(priority, affinity); + prepareNodes(); + preparePaths(); + + state = STATE_PREPARED; +} + +void SuperNode::start() +{ + int ret; + + assert(state == STATE_PREPARED); + #ifdef WITH_API api.start(); #endif diff --git a/src/villas-hook.cpp b/src/villas-hook.cpp index 2d30b86e9..f4fee472b 100644 --- a/src/villas-hook.cpp +++ b/src/villas-hook.cpp @@ -194,6 +194,10 @@ check: if (optarg == endptr) if (ret) throw RuntimeError("Failed to parse hook config"); + ret = hook_prepare(&h, io.signals); + if (ret) + throw RuntimeError("Failed to prepare hook"); + ret = hook_start(&h); if (ret) throw RuntimeError("Failed to start hook");