1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

bunch of bugfixes and refactoring for preperation of hooks

This commit is contained in:
Steffen Vogel 2019-03-09 00:32:22 +01:00
parent c906143338
commit 4942d8ee74
11 changed files with 250 additions and 113 deletions

View file

@ -26,6 +26,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
/**
* @addtogroup hooks User-defined hook functions
* @ingroup path
@ -35,6 +36,7 @@
#pragma once
#include <villas/hook_type.h>
#include <villas/list.h>
#include <villas/common.h>
#ifdef __cplusplus
@ -44,7 +46,6 @@ extern "C" {
/* Forward declarations */
struct path;
struct sample;
struct vlist;
/** Descriptor for user defined hooks. See hooks[]. */
struct hook {
@ -56,6 +57,8 @@ struct hook {
struct path *path;
struct node *node;
struct vlist signals;
struct hook_type *_vt; /**< C++ like Vtable pointer. */
void *_vd; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */
@ -63,10 +66,13 @@ struct hook {
};
int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node *n);
int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path *p, struct node *n);
int hook_init_signals(struct hook *h, struct vlist *signals);
int hook_parse(struct hook *h, json_t *cfg);
int hook_prepare(struct hook *h, struct vlist *signals);
int hook_destroy(struct hook *h);
int hook_start(struct hook *h);
@ -78,7 +84,6 @@ int hook_periodic(struct hook *h);
int hook_restart(struct hook *h);
int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt);
int hook_process_list(struct vlist *hs, struct sample *smps[], unsigned cnt);
/** Compare two hook functions with their priority. Used by vlist_sort() */
int hook_cmp_priority(const void *a, const void *b);
@ -89,6 +94,10 @@ struct hook_type * hook_type(struct hook *h)
return h->_vt;
}
int hook_list_init(struct vlist *hs);
int hook_list_destroy(struct vlist *hs);
/** Parses an object of hooks
*
* Example:
@ -103,7 +112,15 @@ struct hook_type * hook_type(struct hook *h)
* hooks = [ "print" ]
* }
*/
int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *p, struct node *n);
int hook_list_parse(struct vlist *hs, json_t *cfg, int mask, struct path *p, struct node *n);
int hook_list_prepare(struct vlist *hs, struct vlist *sigs, int mask, struct path *p, struct node *n);
int hook_list_prepare_signals(struct vlist *hs, struct vlist *signals);
int hook_list_add(struct vlist *hs, int mask, struct path *p, struct node *n);
int hook_list_process(struct vlist *hs, struct sample *smps[], unsigned cnt);
#ifdef __cplusplus
}

View file

@ -65,6 +65,8 @@ struct hook_type {
int (*init)(struct hook *h); /**< Called before hook is started to parsed. */
int (*destroy)(struct hook *h); /**< Called after path has been stopped to release memory allocated by HOOK_INIT */
int (*init_signals)(struct hook *h);
int (*start)(struct hook *h); /**< Called whenever a hook is started; before threads are created. */
int (*stop)(struct hook *h); /**< Called whenever a hook is stopped; after threads are destoyed. */

View file

@ -88,18 +88,20 @@ struct mapping_entry {
};
};
int mapping_remap(const struct vlist *m, struct sample *remapped, const struct sample *original, const struct stats *s);
int mapping_update(const struct mapping_entry *e, struct sample *remapped, const struct sample *original, const struct stats *s);
int mapping_update(const struct mapping_entry *e, struct sample *remapped, const struct sample *original);
int mapping_parse(struct mapping_entry *e, json_t *cfg, struct vlist *nodes);
int mapping_parse_str(struct mapping_entry *e, const char *str, struct vlist *nodes);
int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes);
int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str);
int mapping_list_parse(struct vlist *ml, json_t *cfg, struct vlist *nodes);
int mapping_list_prepare(struct vlist *ml);
int mapping_list_remap(const struct vlist *ml, struct sample *remapped, const struct sample *original);
#ifdef __cplusplus
}
#endif

View file

@ -42,6 +42,12 @@ int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node
h->path = p;
h->node = n;
h->signals.state = STATE_DESTROYED;
ret = signal_list_init(&h->signals);
if (ret)
return ret;
h->_vt = vt;
h->_vd = alloc(vt->size);
@ -49,7 +55,30 @@ int hook_init(struct hook *h, struct hook_type *vt, struct path *p, struct node
if (ret)
return ret;
h->state = STATE_INITIALIZED;
// We dont need to parse builtin hooks
h->state = hook_type(h)->flags & HOOK_BUILTIN ? STATE_PARSED : STATE_INITIALIZED;
return 0;
}
int hook_prepare(struct hook *h, struct vlist *signals)
{
int ret;
assert(h->state == STATE_PARSED);
if (!h->enabled)
return 0;
ret = signal_list_copy(&h->signals, signals);
if (ret)
return -1;
ret = hook_type(h)->init_signals ? hook_type(h)->init_signals(h) : 0;
if (ret)
return ret;
h->state = STATE_PREPARED;
return 0;
}
@ -82,7 +111,11 @@ int hook_destroy(struct hook *h)
{
int ret;
assert(h->state != STATE_DESTROYED);
assert(h->state != STATE_DESTROYED && h->state != STATE_STARTED);
ret = signal_list_destroy(&h->signals);
if (ret)
return ret;
ret = hook_type(h)->destroy ? hook_type(h)->destroy(h) : 0;
if (ret)
@ -98,34 +131,46 @@ int hook_destroy(struct hook *h)
int hook_start(struct hook *h)
{
int ret;
assert(h->state == STATE_PREPARED);
if (!h->enabled)
return 0;
if (hook_type(h)->start) {
debug(LOG_HOOK | 10, "Start hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
debug(LOG_HOOK | 10, "Start hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
return hook_type(h)->start(h);
}
else
return 0;
ret = hook_type(h)->start ? hook_type(h)->start(h) : 0;
if (ret)
return ret;
h->state = STATE_STARTED;
return 0;
}
int hook_stop(struct hook *h)
{
int ret;
assert(h->state == STATE_STARTED);
if (!h->enabled)
return 0;
if (hook_type(h)->stop) {
debug(LOG_HOOK | 10, "Stopping hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
debug(LOG_HOOK | 10, "Stopping hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
return hook_type(h)->stop(h);
}
else
return 0;
ret = hook_type(h)->stop ? hook_type(h)->stop(h) : 0;
if (ret)
return ret;
h->state = STATE_STOPPED;
return 0;
}
int hook_periodic(struct hook *h)
{
assert(h->state == STATE_STARTED);
if (!h->enabled)
return 0;
@ -140,47 +185,36 @@ int hook_periodic(struct hook *h)
int hook_restart(struct hook *h)
{
int ret;
assert(h->state == STATE_STARTED);
if (!h->enabled)
return 0;
if (hook_type(h)->restart) {
debug(LOG_HOOK | 10, "Restarting hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
debug(LOG_HOOK | 10, "Restarting hook %s: priority=%d", hook_type_name(hook_type(h)), h->priority);
return hook_type(h)->restart(h);
}
else
return 0;
ret = hook_type(h)->restart ? hook_type(h)->restart(h) : 0;
if (ret)
return ret;
return 0;
}
int hook_process(struct hook *h, struct sample *smps[], unsigned *cnt)
{
int ret;
assert(h->state == STATE_STARTED);
if (!h->enabled)
return 0;
if (hook_type(h)->process) {
debug(LOG_HOOK | 10, "Process hook %s: priority=%d, cnt=%d", hook_type_name(hook_type(h)), h->priority, *cnt);
debug(LOG_HOOK | 10, "Process hook %s: priority=%d, cnt=%d", hook_type_name(hook_type(h)), h->priority, *cnt);
return hook_type(h)->process(h, smps, cnt);
}
else
return 0;
}
ret = hook_type(h)->process ? hook_type(h)->process(h, smps, cnt) : 0;
if (ret)
return ret;
int hook_process_list(struct vlist *hs, struct sample *smps[], unsigned cnt)
{
unsigned ret;
for (size_t i = 0; i < vlist_length(hs); i++) {
struct hook *h = (struct hook *) vlist_at(hs, i);
ret = hook_process(h, smps, &cnt);
if (ret || !cnt)
/* Abort hook processing if earlier hooks removed all samples
* or they returned something non-zero */
break;
}
return cnt;
return 0;
}
int hook_cmp_priority(const void *a, const void *b)
@ -191,7 +225,29 @@ int hook_cmp_priority(const void *a, const void *b)
return ha->priority - hb->priority;
}
int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *o, struct node *n)
int hook_list_init(struct vlist *hs)
{
int ret;
ret = vlist_init(hs);
if (ret)
return ret;
return 0;
}
int hook_list_destroy(struct vlist *hs)
{
int ret;
ret = vlist_destroy(hs, (dtor_cb_t) hook_destroy, true);
if (ret)
return ret;
return 0;
}
int hook_list_parse(struct vlist *hs, json_t *cfg, int mask, struct path *o, struct node *n)
{
if (!json_is_array(cfg))
error("Hooks must be configured as a list of objects");
@ -225,17 +281,42 @@ int hook_parse_list(struct vlist *list, json_t *cfg, int mask, struct path *o, s
if (ret)
jerror(&err, "Failed to parse hook configuration");
vlist_push(list, h);
vlist_push(hs, h);
}
return 0;
}
int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path *p, struct node *n)
int hook_list_prepare(struct vlist *hs, struct vlist *sigs, int m, struct path *p, struct node *n)
{
int ret;
assert(l->state == STATE_INITIALIZED);
/* Add internal hooks if they are not already in the list */
ret = hook_list_add(hs, m, p, n);
if (ret)
return ret;
/* We sort the hooks according to their priority */
vlist_sort(hs, hook_cmp_priority);
for (size_t i = 0; i < vlist_length(hs); i++) {
struct hook *h = (struct hook *) vlist_at(hs, i);
ret = hook_prepare(h, sigs);
if (ret)
return ret;
sigs = &h->signals;
}
return 0;
}
int hook_list_add(struct vlist *hs, int mask, struct path *p, struct node *n)
{
int ret;
assert(hs->state == STATE_INITIALIZED);
for (size_t i = 0; i < vlist_length(&plugins); i++) {
struct plugin *q = (struct plugin *) vlist_at(&plugins, i);
@ -246,11 +327,7 @@ int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path
if (q->type != PLUGIN_TYPE_HOOK)
continue;
if (builtin &&
vt->flags & HOOK_BUILTIN &&
vt->flags & mask)
{
if ((vt->flags & mask) == mask) {
h = (struct hook *) alloc(sizeof(struct hook));
if (!h)
return -1;
@ -259,7 +336,7 @@ int hook_init_builtin_list(struct vlist *l, bool builtin, int mask, struct path
if (ret)
return ret;
vlist_push(l, h);
vlist_push(hs, h);
}
}
@ -270,3 +347,20 @@ const char * hook_type_name(struct hook_type *vt)
{
return plugin_name(vt);
}
int hook_list_process(struct vlist *hs, struct sample *smps[], unsigned cnt)
{
unsigned ret;
for (size_t i = 0; i < vlist_length(hs); i++) {
struct hook *h = (struct hook *) vlist_at(hs, i);
ret = hook_process(h, smps, &cnt);
if (ret || !cnt)
/* Abort hook processing if earlier hooks removed all samples
* or they returned something non-zero */
break;
}
return cnt;
}

View file

@ -144,7 +144,7 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
else {
/* Map all signals */
me->data.offset = 0;
me->length = me->node ? vlist_length(&me->node->in.signals) : 0;
me->length = -1;
goto end;
}
@ -201,9 +201,9 @@ int mapping_parse(struct mapping_entry *me, json_t *cfg, struct vlist *nodes)
return mapping_parse_str(me, str, nodes);
}
int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes)
int mapping_list_parse(struct vlist *ml, json_t *cfg, struct vlist *nodes)
{
int ret, off;
int ret;
size_t i;
json_t *json_entry;
@ -218,7 +218,6 @@ int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes)
else
return -1;
off = 0;
json_array_foreach(json_mapping, i, json_entry) {
struct mapping_entry *me = (struct mapping_entry *) alloc(sizeof(struct mapping_entry));
@ -226,10 +225,7 @@ int mapping_parse_list(struct vlist *l, json_t *cfg, struct vlist *nodes)
if (ret)
goto out;
me->offset = off;
off += me->length;
vlist_push(l, me);
vlist_push(ml, me);
}
ret = 0;
@ -239,22 +235,16 @@ out: json_decref(json_mapping);
return ret;
}
int mapping_update(const struct mapping_entry *me, struct sample *remapped, const struct sample *original, const struct stats *s)
int mapping_update(const struct mapping_entry *me, struct sample *remapped, const struct sample *original)
{
int len = me->length;
int off = me->offset;
/* me->length == 0 means that we want to take all values */
if (!len)
len = original->length;
if (len + off > remapped->capacity)
if (me->length + me->offset > remapped->capacity)
return -1;
switch (me->type) {
case MAPPING_TYPE_STATS:
remapped->data[off++] = stats_get_value(s, me->stats.metric, me->stats.type);
case MAPPING_TYPE_STATS: {
remapped->data[me->offset] = stats_get_value(me->node->stats, me->stats.metric, me->stats.type);
break;
}
case MAPPING_TYPE_TIMESTAMP: {
const struct timespec *ts;
@ -270,8 +260,8 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
return -1;
}
remapped->data[off++].i = ts->tv_sec;
remapped->data[off++].i = ts->tv_nsec;
remapped->data[me->offset + 0].i = ts->tv_sec;
remapped->data[me->offset + 1].i = ts->tv_nsec;
break;
}
@ -279,11 +269,13 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
case MAPPING_TYPE_HEADER:
switch (me->header.type) {
case MAPPING_HEADER_TYPE_LENGTH:
remapped->data[off++].i = original->length;
remapped->data[me->offset].i = original->length;
break;
case MAPPING_HEADER_TYPE_SEQUENCE:
remapped->data[off++].i = original->sequence;
remapped->data[me->offset].i = original->sequence;
break;
default:
return -1;
}
@ -291,11 +283,11 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
break;
case MAPPING_TYPE_DATA:
for (int j = me->data.offset; j < len + me->data.offset; j++) {
for (int j = me->data.offset, i = me->offset; j < me->length + me->data.offset; j++, i++) {
if (j >= original->length)
remapped->data[off++].f = 0;
remapped->data[i].f = -1;
else
remapped->data[off++] = original->data[j];
remapped->data[i] = original->data[j];
}
break;
@ -304,14 +296,14 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
return 0;
}
int mapping_remap(const struct vlist *m, struct sample *remapped, const struct sample *original, const struct stats *s)
int mapping_list_remap(const struct vlist *ml, struct sample *remapped, const struct sample *original)
{
int ret;
for (size_t i = 0; i < vlist_length(m); i++) {
struct mapping_entry *me = (struct mapping_entry *) vlist_at(m, i);
for (size_t i = 0; i < vlist_length(ml); i++) {
struct mapping_entry *me = (struct mapping_entry *) vlist_at(ml, i);
ret = mapping_update(me, remapped, original, s);
ret = mapping_update(me, remapped, original);
if (ret)
return ret;
}
@ -319,6 +311,24 @@ int mapping_remap(const struct vlist *m, struct sample *remapped, const struct s
return 0;
}
int mapping_list_prepare(struct vlist *ml)
{
for (size_t i = 0, off = 0; i < vlist_length(ml); i++) {
struct mapping_entry *me = (struct mapping_entry *) vlist_at(ml, i);
if (me->length < 0) {
struct vlist *sigs = node_get_signals(me->node, NODE_DIR_IN);
me->length = vlist_length(sigs);
}
me->offset = off;
off += me->length;
}
return 0;
}
int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str)
{
const char *type;

View file

@ -65,11 +65,11 @@ int node_init(struct node *n, struct node_type *vt)
#endif /* WITH_NETEM */
/* Default values */
ret = node_direction_init(&n->in, n);
ret = node_direction_init(&n->in, NODE_DIR_IN, n);
if (ret)
return ret;
ret = node_direction_init(&n->out, n);
ret = node_direction_init(&n->out, NODE_DIR_OUT, n);
if (ret)
return ret;
@ -94,7 +94,7 @@ int node_prepare(struct node *n)
if (ret)
return ret;
ret = node_direction_prepare(&n->in, n);
ret = node_direction_prepare(&n->out, n);
if (ret)
return ret;
@ -418,7 +418,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
#ifdef WITH_HOOKS
/* Run read hooks */
int rread = hook_process_list(&n->in.hooks, smps, nread);
int rread = hook_list_process(&n->in.hooks, smps, nread);
int skipped = nread - rread;
if (skipped > 0 && n->stats != NULL) {
@ -448,7 +448,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
#ifdef WITH_HOOKS
/* Run write hooks */
cnt = hook_process_list(&n->out.hooks, smps, cnt);
cnt = hook_list_process(&n->out.hooks, smps, cnt);
if (cnt <= 0)
return cnt;
#endif /* WITH_HOOKS */
@ -479,7 +479,7 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *re
char * node_name(struct node *n)
{
if (!n->_name)
strcatf(&n->_name, CLR_RED("%s") "(" CLR_YEL("%s") ")", n->name, node_type_name(n->_vt));
strcatf(&n->_name, CLR_RED("%s") "(" CLR_YEL("%s") ")", n->name, node_type_name(node_type(n)));
return n->_name;
}

View file

@ -226,7 +226,7 @@ struct vlist * node_direction_get_signals(struct node_direction *nd)
struct hook *h = vlist_last(&nd->hooks);
return h->signals;
return &h->signals;
#else
return &nd->signals;
#endif

View file

@ -241,19 +241,21 @@ int path_prepare(struct path *p)
if (ps->masked)
bitset_set(&p->mask, i);
ret = mapping_list_prepare(&ps->mappings);
if (ret)
return ret;
for (size_t i = 0; i < vlist_length(&ps->mappings); i++) {
struct mapping_entry *me = (struct mapping_entry *) vlist_at(&ps->mappings, i);
struct vlist *sigs = node_get_signals(me->node, NODE_DIR_IN);
int off = me->offset;
int len = me->length;
for (int j = 0; j < len; j++) {
for (int j = 0; j < me->length; j++) {
struct signal *sig;
/* For data mappings we simple refer to the existing
* signal descriptors of the source node. */
if (me->type == MAPPING_TYPE_DATA) {
sig = (struct signal *) vlist_at_safe(&me->node->in.signals, me->data.offset + j);
sig = (struct signal *) vlist_at_safe(sigs, me->data.offset + j);
if (!sig) {
warning("Failed to create signal description for path %s", path_name(p));
continue;
@ -270,8 +272,8 @@ int path_prepare(struct path *p)
return -1;
}
vlist_extend(&p->signals, off + j + 1, NULL);
vlist_set(&p->signals, off + j, sig);
vlist_extend(&p->signals, me->offset + j + 1, NULL);
vlist_set(&p->signals, me->offset + j, sig);
}
}
}
@ -328,7 +330,7 @@ int path_parse(struct path *p, json_t *cfg, struct vlist *nodes)
jerror(&err, "Failed to parse path configuration");
/* Input node(s) */
ret = mapping_parse_list(&sources, json_in, nodes);
ret = mapping_list_parse(&sources, json_in, nodes);
if (ret)
error("Failed to parse input mapping of path %s", path_name(p));

View file

@ -121,7 +121,7 @@ int path_source_read(struct path_source *ps, struct path *p, int i)
muxed_smps[i]->ts = tomux_smps[i]->ts;
muxed_smps[i]->flags |= tomux_smps[i]->flags & (SAMPLE_HAS_TS_ORIGIN | SAMPLE_HAS_TS_RECEIVED);
mapping_remap(&ps->mappings, muxed_smps[i], tomux_smps[i], NULL);
mapping_list_remap(&ps->mappings, muxed_smps[i], tomux_smps[i]);
}
sample_copy(p->last_sample, muxed_smps[tomux-1]);

View file

@ -420,12 +420,6 @@ void SuperNode::preparePaths()
}
void SuperNode::prepare()
{
prepareNodes();
preparePaths();
}
void SuperNode::start()
{
int ret;
@ -437,6 +431,18 @@ void SuperNode::start()
kernel::rt::init(priority, affinity);
prepareNodes();
preparePaths();
state = STATE_PREPARED;
}
void SuperNode::start()
{
int ret;
assert(state == STATE_PREPARED);
#ifdef WITH_API
api.start();
#endif

View file

@ -194,6 +194,10 @@ check: if (optarg == endptr)
if (ret)
throw RuntimeError("Failed to parse hook config");
ret = hook_prepare(&h, io.signals);
if (ret)
throw RuntimeError("Failed to prepare hook");
ret = hook_start(&h);
if (ret)
throw RuntimeError("Failed to start hook");