diff --git a/include/villas/hook.h b/include/villas/hook.h index ac6132de2..3835069d9 100644 --- a/include/villas/hook.h +++ b/include/villas/hook.h @@ -20,6 +20,7 @@ #include #include +#include #include "queue.h" #include "list.h" @@ -32,72 +33,25 @@ struct hook; struct sample; struct super_node; -/** Optional parameters to hook callbacks */ -struct hook_info { - struct path *path; - - struct list *nodes; - struct list *paths; - - struct sample **samples; - size_t count; -}; - -/** Callback type of hook function - * - * @param h The hook datastructure which contains parameter, name and private context for the hook. - * @param when Provides the type of hook for which this occurence of the callback function was executed. See hook_type for possible values. - * @param i The hook_info structure contains references to the current node, path or samples. Some fields of this structure can be NULL. - * @retval 0 Success. Continue processing and forwarding the message. - * @retval <0 Error. Drop the message. - */ -typedef int (*hook_cb_t)(struct hook *h, int when, struct hook_info *i); - -/** Destructor callback for hook_storage() - * - * @param data A pointer to the data which should be destroyed. - */ -typedef int (*dtor_cb_t)(void *); - -/** Constructor callback for hook_storage() */ -typedef int (*ctor_cb_t)(void *); - -enum hook_state { - HOOK_DESTROYED, - HOOK_INITIALIZED -}; - -/** The type of a hook defines when a hook will be exectuted. This is used as a bitmask. */ -enum hook_when { - HOOK_PATH_START = 1 << 0, /**< Called whenever a path is started; before threads are created. */ - HOOK_PATH_STOP = 1 << 1, /**< Called whenever a path is stopped; after threads are destoyed. */ - HOOK_PATH_RESTART = 1 << 2, /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */ - - HOOK_READ = 1 << 3, /**< Called for every single received samples. */ - HOOK_WRITE = 1 << 4, /**< Called for every single sample which will be sent. */ - - HOOK_ASYNC = 1 << 7, /**< Called asynchronously with fixed rate (see path::rate). */ - HOOK_PERIODIC = 1 << 8, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */ - - HOOK_INIT = 1 << 9, /**< Called before path is started to parseHOOK_DESTROYs. */ - HOOK_DESTROY = 1 << 10, /**< Called after path has been stopped to release memory allocated by HOOK_INIT */ - - HOOK_AUTO = 1 << 11, /**< Internal hooks are added to every path implicitely. */ - HOOK_PARSE = 1 << 12, /**< Called for parsing hook arguments. */ - - /** @{ Classes of hooks */ - /** Hooks which are using private data must allocate and free them propery. */ - HOOK_STORAGE = HOOK_INIT | HOOK_DESTROY, - /** All path related actions */ - HOOK_PATH = HOOK_PATH_START | HOOK_PATH_STOP | HOOK_PATH_RESTART - /** @} */ -}; - struct hook_type { - enum hook_when when; /**< The type of the hook as a bitfield */ - hook_cb_t cb; /**< The hook callback function as a function pointer. */ int priority; /**< Default priority of this hook type. */ + bool builtin; /**< Should we add this hook by default to every path?. */ + size_t size; /**< Size of allocation for struct hook::_vd */ + + int (*parse)(struct hook *h, config_setting_t *cfg); + + int (*init)(struct hook *h); /**< Called before path is started to parseHOOK_DESTROYs. */ + int (*destroy)(struct hook *h); /**< Called after path has been stopped to release memory allocated by HOOK_INIT */ + + int (*start)(struct hook *h); /**< Called whenever a path is started; before threads are created. */ + int (*stop)(struct hook *h); /**< Called whenever a path is stopped; after threads are destoyed. */ + + int (*periodic)(struct hook *h);/**< Called periodically. Period is set by global 'stats' option in the configuration file. */ + int (*restart)(struct hook *h); /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */ + + int (*read)(struct hook *h, struct sample *smps[], size_t *cnt); /**< Called for every single received samples. */ + int (*write)(struct hook *h, struct sample *smps[], size_t *cnt); /**< Called for every single sample which will be sent. */ }; /** Descriptor for user defined hooks. See hooks[]. */ @@ -105,17 +59,16 @@ struct hook { enum state state; struct sample *prev, *last; + struct path *path; struct hook_type *_vt; /**< C++ like Vtable pointer. */ void *_vd; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */ int priority; /**< A priority to change the order of execution within one type of hook. */ - - config_setting_t *cfg; }; /** Save references to global nodes, paths and settings */ -int hook_init(struct hook *h, struct hook_type *vt, struct super_node *sn); +int hook_init(struct hook *h, struct hook_type *vt, struct path *p); /** Parse a single hook. * @@ -127,10 +80,20 @@ int hook_init(struct hook *h, struct hook_type *vt, struct super_node *sn); */ int hook_parse(struct hook *h, config_setting_t *cfg); -int hook_run(struct hook *h, int when, struct hook_info *i); - int hook_destroy(struct hook *h); +int hook_start(struct hook *h); +int hook_stop(struct hook *h); + +int hook_periodic(struct hook *h); +int hook_restart(struct hook *h); + +int hook_read(struct hook *h, struct sample *smps[], size_t *cnt); +int hook_write(struct hook *h, struct sample *smps[], size_t *cnt); + +size_t hook_read_list(struct list *hs, struct sample *smps[], size_t cnt); +size_t hook_write_list(struct list *hs, struct sample *smps[], size_t cnt); + /** Compare two hook functions with their priority. Used by list_sort() */ int hook_cmp_priority(const void *a, const void *b); @@ -148,4 +111,4 @@ int hook_cmp_priority(const void *a, const void *b); * hooks = [ "print" ] * } */ -int hook_parse_list(struct list *list, config_setting_t *cfg, struct super_node *sn); \ No newline at end of file +int hook_parse_list(struct list *list, config_setting_t *cfg, struct path *p); \ No newline at end of file diff --git a/lib/hook.c b/lib/hook.c index c63a6e8fe..4e6270130 100644 --- a/lib/hook.c +++ b/lib/hook.c @@ -19,23 +19,20 @@ #include "node.h" #include "plugin.h" -int hook_init(struct hook *h, struct hook_type *vt, struct super_node *sn) +int hook_init(struct hook *h, struct hook_type *vt, struct path *p) { int ret; - struct hook_info i = { - .nodes = &sn->nodes, - .paths = &sn->paths - }; - assert(h->state == STATE_DESTROYED); h->priority = vt->priority; + h->path = p; h->_vt = vt; h->_vd = alloc(vt->size); + - ret = hook_run(h, HOOK_INIT, &i); + ret = h->_vt->init ? h->_vt->init(h) : 0; if (ret) return ret; @@ -49,21 +46,13 @@ int hook_parse(struct hook *h, config_setting_t *cfg) int ret; assert(h->state != STATE_DESTROYED); - - h->cfg = cfg; - - config_setting_lookup_int(h->cfg, "priority", &h->priority); - - if (h->_vt->when & HOOK_PARSE) { - if (!h->cfg) - error("Missing configuration for hook: '%s'", plugin_name(h->_vt)); - /* Parse hook arguments */ - ret = hook_run(h, HOOK_PARSE, NULL); - if (ret) - return ret; - } - + config_setting_lookup_int(cfg, "priority", &h->priority); + + ret = h->_vt->parse ? h->_vt->parse(h, cfg) : 0; + if (ret) + return ret; + h->state = STATE_PARSED; return 0; @@ -75,18 +64,82 @@ int hook_destroy(struct hook *h) assert(h->state != STATE_DESTROYED); - ret = hook_run(h, HOOK_DESTROY, NULL); + ret = h->_vt->destroy ? h->_vt->destroy(h) : 0; if (ret) return ret; if (h->_vd) free(h->_vd); - h->state = HOOK_DESTROYED; + h->state = STATE_DESTROYED; return 0; } +int hook_start(struct hook *h) +{ + return h->_vt->start ? h->_vt->start(h) : 0; +} + +int hook_stop(struct hook *h) +{ + return h->_vt->stop ? h->_vt->stop(h) : 0; +} + +int hook_periodic(struct hook *h) +{ + return h->_vt->periodic ? h->_vt->periodic(h) : 0; +} + +int hook_restart(struct hook *h) +{ + return h->_vt->restart ? h->_vt->restart(h) : 0; +} + +int hook_read(struct hook *h, struct sample *smps[], size_t *cnt) +{ + return h->_vt->read ? h->_vt->read(h, smps, cnt) : 0; +} + +size_t hook_read_list(struct list *hs, struct sample *smps[], size_t cnt) +{ + size_t ret; + + for (size_t i = 0; i < list_length(hs); i++) { + struct hook *h = list_at(hs, i); + + ret = hook_read(h, smps, &cnt); + if (ret || !cnt) + /* Abort hook processing if earlier hooks removed all samples + * or they returned something non-zero */ + break; + } + + return cnt; +} + +size_t hook_write_list(struct list *hs, struct sample *smps[], size_t cnt) +{ + size_t ret; + + for (size_t i = 0; i < list_length(hs); i++) { + struct hook *h = list_at(hs, i); + + ret = hook_write(h, smps, &cnt); + if (ret || !cnt) + /* Abort hook processing if earlier hooks removed all samples + * or they returned something non-zero */ + break; + } + + return cnt; +} + +int hook_write(struct hook *h, struct sample *smps[], size_t *cnt) +{ + return h->_vt->write ? h->_vt->write(h, smps, cnt) : 0; +} + int hook_cmp_priority(const void *a, const void *b) { struct hook *ha = (struct hook *) a; @@ -95,24 +148,16 @@ int hook_cmp_priority(const void *a, const void *b) return ha->priority - hb->priority; } -int hook_run(struct hook *h, int when, struct hook_info *i) -{ - debug(LOG_HOOK | 22, "Running hook '%s' when=%u, prio=%u, cnt=%zu", plugin_name(h->_vt), when, h->priority, i ? i->count : 0); - - return h->_vt->when & when ? h->_vt->cb(h, when, i) : 0; -} - -int hook_parse_list(struct list *list, config_setting_t *cfg, struct super_node *sn) +int hook_parse_list(struct list *list, config_setting_t *cfg, struct path *o) { struct hook h; struct plugin *p; - int ret; + int ret, priority = 10; if (!config_setting_is_group(cfg)) cerror(cfg, "Hooks must be configured with an object"); - - int priority = 10; + for (int i = 0; i < config_setting_length(cfg); i++) { config_setting_t *cfg_hook = config_setting_get_elem(cfg, i); @@ -125,10 +170,12 @@ int hook_parse_list(struct list *list, config_setting_t *cfg, struct super_node if (!config_setting_is_group(cfg_hook)) cerror(cfg_hook, "The 'hooks' setting must be an array of strings."); - ret = hook_init(&h, &p->hook, sn); + ret = hook_init(&h, &p->hook, o); if (ret) continue; + /* If the user does not assign a priority, we will use the + * position of the hook section in the congiguration file. */ h.priority = priority++; ret = hook_parse(&h, cfg_hook); @@ -138,5 +185,5 @@ int hook_parse_list(struct list *list, config_setting_t *cfg, struct super_node list_push(list, memdup(&h, sizeof(h))); } - return list_length(list); + return 0; } \ No newline at end of file diff --git a/lib/hooks/convert.c b/lib/hooks/convert.c index 5116ab710..c801d1105 100644 --- a/lib/hooks/convert.c +++ b/lib/hooks/convert.c @@ -18,37 +18,36 @@ struct convert { } mode; }; -static int hook_convert(struct hook *h, int when, struct hook_info *j) +static int convert_parse(struct hook *h, config_setting_t *cfg) { - struct convert *p = (struct convert *) h->_vd; + struct convert *p = h->_vd; - switch (when) { - case HOOK_PARSE: { - const char *mode; - - if (!config_setting_lookup_string(h->cfg, "mode", &mode)) - cerror(h->cfg, "Missing setting 'mode' for hook '%s'", plugin_name(h->_vt)); - - if (!strcmp(mode, "fixed")) - p->mode = TO_FIXED; - else if (!strcmp(mode, "float")) - p->mode = TO_FLOAT; - else - error("Invalid parameter '%s' for hook 'convert'", mode); - break; - } - - case HOOK_READ: - for (int i = 0; i < j->count; i++) { - for (int k = 0; k < j->samples[i]->length; k++) { - switch (p->mode) { - case TO_FIXED: j->samples[i]->data[k].i = j->samples[i]->data[k].f * 1e3; break; - case TO_FLOAT: j->samples[i]->data[k].f = j->samples[i]->data[k].i; break; - } - } + const char *mode; + + if (!config_setting_lookup_string(cfg, "mode", &mode)) + cerror(cfg, "Missing setting 'mode' for hook '%s'", plugin_name(h->_vt)); + + if (!strcmp(mode, "fixed")) + p->mode = TO_FIXED; + else if (!strcmp(mode, "float")) + p->mode = TO_FLOAT; + else + error("Invalid parameter '%s' for hook 'convert'", mode); + + return 0; +} + +static int convert_read(struct hook *h, struct sample *smps[], size_t *cnt) +{ + struct convert *p = h->_vd; + + for (int i = 0; i < *cnt; i++) { + for (int k = 0; k < smps[i]->length; k++) { + switch (p->mode) { + case TO_FIXED: smps[i]->data[k].i = smps[i]->data[k].f * 1e3; break; + case TO_FLOAT: smps[i]->data[k].f = smps[i]->data[k].i; break; } - - break; + } } return 0; @@ -60,9 +59,9 @@ static struct plugin p = { .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 99, - .size = sizeof(struct convert), - .cb = hook_convert, - .when = HOOK_STORAGE | HOOK_PARSE | HOOK_READ + .parse = convert_parse, + .read = convert_read, + .size = sizeof(struct convert) } }; diff --git a/lib/hooks/decimate.c b/lib/hooks/decimate.c index d662bccd0..462fbce9d 100644 --- a/lib/hooks/decimate.c +++ b/lib/hooks/decimate.c @@ -16,40 +16,44 @@ struct decimate { unsigned counter; }; -static int hook_decimate(struct hook *h, int when, struct hook_info *j) +static int decimate_init(struct hook *h) { - struct decimate *p = (struct decimate *) h->_vd; - - switch (when) { - case HOOK_INIT: - p->counter = 0; - break; - - case HOOK_PARSE: - if (!h->cfg) - error("Missing configuration for hook: '%s'", plugin_name(h->_vt)); - - if (!config_setting_lookup_int(h->cfg, "ratio", &p->ratio)) - cerror(h->cfg, "Missing setting 'ratio' for hook '%s'", plugin_name(h->_vt)); + struct decimate *p = h->_vd; - break; - - case HOOK_READ: - assert(j->samples); - - int i, ok; - for (i = 0, ok = 0; i < j->count; i++) { - if (p->counter++ % p->ratio == 0) { - struct sample *tmp; - - tmp = j->samples[ok]; - j->samples[ok++] = j->samples[i]; - j->samples[i] = tmp; - } - } + p->counter = 0; - return ok; + return 0; +} + +static int decimate_parse(struct hook *h, config_setting_t *cfg) +{ + struct decimate *p = h->_vd; + + if (!cfg) + error("Missing configuration for hook: '%s'", plugin_name(h->_vt)); + + if (!config_setting_lookup_int(cfg, "ratio", &p->ratio)) + cerror(cfg, "Missing setting 'ratio' for hook '%s'", plugin_name(h->_vt)); + + return 0; +} + +static int decimate_read(struct hook *h, struct sample *smps[], size_t *cnt) +{ + struct decimate *p = h->_vd; + + int i, ok; + for (i = 0, ok = 0; i < *cnt; i++) { + if (p->counter++ % p->ratio == 0) { + struct sample *tmp; + + tmp = smps[ok]; + smps[ok++] = smps[i]; + smps[i] = tmp; + } } + + *cnt = ok; return 0; } @@ -60,9 +64,10 @@ static struct plugin p = { .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 99, - .size = sizeof(struct decimate), - .cb = hook_decimate, - .when = HOOK_STORAGE | HOOK_PARSE | HOOK_DESTROY | HOOK_READ + .init = decimate_init, + .parse = decimate_parse, + .read = decimate_read, + .size = sizeof(struct decimate) } }; diff --git a/lib/hooks/drop.c b/lib/hooks/drop.c index 0774a921c..d5b551430 100644 --- a/lib/hooks/drop.c +++ b/lib/hooks/drop.c @@ -13,28 +13,26 @@ #include "stats.h" #include "path.h" -static int hook_drop(struct hook *h, int when, struct hook_info *j) +static int drop_read(struct hook *h, struct sample *smps[], size_t *cnt) { int i, ok, dist; - assert(j->samples); - - for (i = 0, ok = 0; i < j->count; i++) { - h->last = j->samples[i]; + for (i = 0, ok = 0; i < *cnt; i++) { + h->last = smps[i]; if (h->prev) { dist = h->last->sequence - (int32_t) h->prev->sequence; if (dist <= 0) { warn("Dropped sample: dist = %d, i = %d", dist, i); - if (j->path && j->path->stats) - stats_update(j->path->stats->delta, STATS_DROPPED, dist); + if (h->path && h->path->stats) + stats_update(h->path->stats->delta, STATS_REORDERED, dist); } else { struct sample *tmp; - - tmp = j->samples[i]; - j->samples[i] = j->samples[ok]; - j->samples[ok++] = tmp; + + tmp = smps[i]; + smps[i] = smps[ok]; + smps[ok++] = tmp; } /* To discard the first X samples in 'smps[]' we must @@ -46,15 +44,17 @@ static int hook_drop(struct hook *h, int when, struct hook_info *j) else { struct sample *tmp; - tmp = j->samples[i]; - j->samples[i] = j->samples[ok]; - j->samples[ok++] = tmp; + tmp = smps[i]; + smps[i] = smps[ok]; + smps[ok++] = tmp; } h->prev = h->last; } - return ok; + *cnt = ok; + + return 0; } static struct plugin p = { @@ -63,8 +63,8 @@ static struct plugin p = { .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 3, - .cb = hook_drop, - .when = HOOK_AUTO | HOOK_READ + .builtin = true, + .read = drop_read } }; diff --git a/lib/hooks/fix_ts.c b/lib/hooks/fix_ts.c index 0b597daa4..02fdd5672 100644 --- a/lib/hooks/fix_ts.c +++ b/lib/hooks/fix_ts.c @@ -12,26 +12,26 @@ #include "plugin.h" #include "timing.h" -int hook_fix_ts(struct hook *h, int when, struct hook_info *j) +int fix_ts_read(struct hook *h, struct sample *smps[], size_t *cnt) { - struct timespec now = time_now(); + struct timespec now; - assert(j->samples); + now = time_now(); - for (int i = 0; i < j->count; i++) { + for (int i = 0; i < *cnt; i++) { /* Check for missing receive timestamp * Usually node_type::read() should update the receive timestamp. * An example would be to use hardware timestamp capabilities of * modern NICs. */ - if ((j->samples[i]->ts.received.tv_sec == 0 && j->samples[i]->ts.received.tv_nsec == 0) || - (j->samples[i]->ts.received.tv_sec == -1 && j->samples[i]->ts.received.tv_nsec == -1)) - j->samples[i]->ts.received = now; + if ((smps[i]->ts.received.tv_sec == 0 && smps[i]->ts.received.tv_nsec == 0) || + (smps[i]->ts.received.tv_sec == -1 && smps[i]->ts.received.tv_nsec == -1)) + smps[i]->ts.received = now; /* Check for missing origin timestamp */ - if ((j->samples[i]->ts.origin.tv_sec == 0 && j->samples[i]->ts.origin.tv_nsec == 0) || - (j->samples[i]->ts.origin.tv_sec == -1 && j->samples[i]->ts.origin.tv_nsec == -1)) - j->samples[i]->ts.origin = now; + if ((smps[i]->ts.origin.tv_sec == 0 && smps[i]->ts.origin.tv_nsec == 0) || + (smps[i]->ts.origin.tv_sec == -1 && smps[i]->ts.origin.tv_nsec == -1)) + smps[i]->ts.origin = now; } return 0; @@ -43,8 +43,8 @@ static struct plugin p = { .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 0, - .cb = hook_fix_ts, - .when = HOOK_AUTO | HOOK_READ + .builtin = true, + .read = fix_ts_read, } }; diff --git a/lib/hooks/map.c b/lib/hooks/map.c index e5e82b30e..91af0d474 100644 --- a/lib/hooks/map.c +++ b/lib/hooks/map.c @@ -20,56 +20,61 @@ struct map { struct stats *stats; }; -static int hook_map(struct hook *h, int when, struct sample *smps[], size_t *cnt) +static int map_init(struct hook *h) +{ + struct map *p = h->_vd; + + return mapping_init(&p->mapping); +} + +static int map_destroy(struct hook *h) +{ + struct map *p = h->_vd; + + return mapping_destroy(&p->mapping); +} + +static int map_parse(struct hook *h, config_setting_t *cfg) +{ + struct map *p = h->_vd; + + int ret; + + config_setting_t *cfg_mapping; + + cfg_mapping = config_setting_lookup(cfg, "mapping"); + + if (!config_setting_is_array(cfg_mapping)) + return -1; + + ret = mapping_parse(&p->mapping, cfg_mapping); + if (ret) + return ret; + + return 0; +} + +static int map_read(struct hook *h, struct sample *smps[], size_t *cnt) { int ret; - struct map *p = (struct map *) h->_vd; + struct map *p = h->_vd; + struct sample *tmp[*cnt]; - switch (when) { - case HOOK_INIT: - mapping_init(&p->mapping); - break; - - case HOOK_DESTROY: - mapping_destroy(&p->mapping); - break; - - case HOOK_PARSE: { - config_setting_t *cfg_mapping; - - cfg_mapping = config_setting_lookup(h->cfg, "mapping"); - - if (!config_setting_is_array(cfg_mapping)) - return -1; - - ret = mapping_parse(&p->mapping, cfg_mapping); - if (ret) - return ret; + if (*cnt <= 0) + return 0; - break; - } + ret = sample_alloc(smps[0]->pool, tmp, *cnt); + if (ret != *cnt) + return ret; - case HOOK_READ: { - struct sample *tmp[*cnt]; - - if (*cnt <= 0) - return 0; - - ret = sample_alloc(smps[0]->pool, tmp, *cnt); - if (ret != *cnt) - return ret; - - for (int i = 0; i < *cnt; i++) { - mapping_remap(&p->mapping, smps[i], tmp[i], NULL); - - SWAP(smps[i], tmp[i]); - } - - sample_free(tmp, *cnt); - break; - } - } + for (int i = 0; i < *cnt; i++) { + mapping_remap(&p->mapping, smps[i], tmp[i], NULL); + SWAP(smps[i], tmp[i]); + } + + sample_free(tmp, *cnt); + return 0; } @@ -79,9 +84,11 @@ static struct plugin p = { .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 99, - .size = sizeof(struct map), - .cb = hook_map, - .when = HOOK_STORAGE | HOOK_READ | HOOK_PARSE + .init = map_init, + .destroy= map_destroy, + .parse = map_parse, + .read = map_read, + .size = sizeof(struct map) } }; diff --git a/lib/hooks/print.c b/lib/hooks/print.c index 0ef957cff..a4eec0f1c 100644 --- a/lib/hooks/print.c +++ b/lib/hooks/print.c @@ -17,40 +17,54 @@ struct print { const char *uri; }; -static int hook_print(struct hook *h, int when, struct hook_info *j) +static int print_init(struct hook *h) { - struct print *p = (struct print *) h->_vd; - - switch (when) { - case HOOK_INIT: - p->output = stdout; - p->uri = NULL; - break; + struct print *p = h->_vd; - case HOOK_PATH_START: - if (p->uri) { - p->output = fopen(p->uri, "w+"); - if (!p->output) - error("Failed to open file %s for writing", p->uri); - } - break; - - case HOOK_PATH_STOP: - if (p->uri) - fclose(p->output); - break; - - case HOOK_PARSE: - config_setting_lookup_string(h->cfg, "output", &p->uri); - break; - - case HOOK_READ: - assert(j->samples); + p->output = stdout; + p->uri = NULL; + + return 0; +} + +static int print_start(struct hook *h) +{ + struct print *p = h->_vd; - for (int i = 0; i < j->count; i++) - sample_fprint(p->output, j->samples[i], SAMPLE_ALL); - break; + if (p->uri) { + p->output = fopen(p->uri, "w+"); + if (!p->output) + error("Failed to open file %s for writing", p->uri); } + + return 0; +} + +static int print_stop(struct hook *h) +{ + struct print *p = h->_vd; + + if (p->uri) + fclose(p->output); + + return 0; +} + +static int print_parse(struct hook *h, config_setting_t *cfg) +{ + struct print *p = h->_vd; + + config_setting_lookup_string(cfg, "output", &p->uri); + + return 0; +} + +static int print_read(struct hook *h, struct sample *smps[], size_t *cnt) +{ + struct print *p = h->_vd; + + for (int i = 0; i < *cnt; i++) + sample_fprint(p->output, smps[i], SAMPLE_ALL); return 0; } @@ -61,9 +75,12 @@ static struct plugin p = { .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 99, - .size = sizeof(struct print), - .cb = hook_print, - .when = HOOK_STORAGE | HOOK_PARSE | HOOK_READ | HOOK_PATH + .init = print_init, + .parse = print_parse, + .start = print_start, + .stop = print_stop, + .read = print_read, + .size = sizeof(struct print) } }; diff --git a/lib/hooks/restart.c b/lib/hooks/restart.c index 941bddaae..2f62304c2 100644 --- a/lib/hooks/restart.c +++ b/lib/hooks/restart.c @@ -12,27 +12,31 @@ #include "plugin.h" #include "path.h" -static int hook_restart(struct hook *h, int when, struct hook_info *j) +static int restart_read(struct hook *h, struct sample *smps[], size_t *cnt) { - assert(j->samples); - assert(j->path); + assert(h->path); + + for (int i = 0; i < *cnt; i++) { + h->last = smps[i]; - for (int i = 0; i < j->count; i++) { - h->last = j->samples[i]; - if (h->prev) { if (h->last->sequence == 0 && h->prev->sequence <= UINT32_MAX - 32) { warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u)", - path_name(j->path), h->prev->sequence, h->last->sequence); + path_name(h->path), h->prev->sequence, h->last->sequence); - path_run_hooks(j->path, HOOK_PATH_RESTART, &j->samples[i], j->count - i); + /* Run restart hooks */ + for (size_t i = 0; i < list_length(&h->path->hooks); i++) { + struct hook *k = list_at(&h->path->hooks, i); + + hook_restart(k); + } } } - + h->prev = h->last; } - + return 0; } @@ -42,8 +46,8 @@ static struct plugin p = { .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 1, - .cb = hook_restart, - .when = HOOK_AUTO | HOOK_READ + .builtin = true, + .read = restart_read } }; diff --git a/lib/hooks/shift.c b/lib/hooks/shift.c deleted file mode 100644 index b61f1b302..000000000 --- a/lib/hooks/shift.c +++ /dev/null @@ -1,118 +0,0 @@ -/** Time shift hook. - * - * @author Steffen Vogel - * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC - *********************************************************************************/ - -/** @addtogroup hooks Hook functions - * @{ - */ - -#include "hook.h" -#include "plugin.h" -#include "timing.h" - -struct shift { - union { - struct timespec ts; /**< For SHIFT_TS_* modes. */ - int seq; /**< For SHIFT_SEQUENCE mode. */ - } offset; - enum { - SHIFT_TS_ORIGIN, - SHIFT_TS_RECEIVED, - SHIFT_TS_SENT, - SHIFT_SEQUENCE - } mode; -}; - -static int hook_shift(struct hook *h, int when, struct hook_info *j) -{ - struct shift *p = (struct shift *) h->_vd; - - const char *mode; - - switch (when) { - case HOOK_INIT: - p->mode = SHIFT_TS_ORIGIN; /* Default mode */ - break; - - case HOOK_PARSE: - if (!h->cfg) - error("Missing configuration for hook: '%s'", plugin_name(h->_vt)); - - if (config_setting_lookup_string(h->cfg, "mode", &mode)) { - if (!strcmp(mode, "origin")) - p->mode = SHIFT_TS_ORIGIN; - else if (!strcmp(mode, "received")) - p->mode = SHIFT_TS_RECEIVED; - else if (!strcmp(mode, "sent")) - p->mode = SHIFT_TS_SENT; - else if (!strcmp(mode, "sequence")) - p->mode = SHIFT_SEQUENCE; - else - error("Invalid mode parameter '%s' for hook '%s'", mode, plugin_name(h->_vt)); - } - - switch (p->mode) { - case SHIFT_TS_ORIGIN: - case SHIFT_TS_RECEIVED: - case SHIFT_TS_SENT: { - double offset; - - if (!config_setting_lookup_float(h->cfg, "offset", &offset)) - cerror(h->cfg, "Missing setting 'offset' for hook '%s'", plugin_name(h->_vt)); - - p->offset.ts = time_from_double(offset); - break; - } - - case SHIFT_SEQUENCE: { - int offset; - - if (!config_setting_lookup_int(h->cfg, "offset", &offset)) - cerror(h->cfg, "Missing setting 'offset' for hook '%s'", plugin_name(h->_vt)); - - p->offset.seq = offset; - break; - } - } - - break; - - case HOOK_READ: - for (int i = 0; i < j->count; i++) { - struct sample *s = j->samples[i]; - - switch (p->mode) { - case SHIFT_TS_ORIGIN: - s->ts.origin = time_add(&s->ts.origin, &p->offset.ts); break; - case SHIFT_TS_RECEIVED: - s->ts.received = time_add(&s->ts.received, &p->offset.ts); break; - case SHIFT_TS_SENT: - s->ts.origin = time_add(&s->ts.sent, &p->offset.ts); break; - case SHIFT_SEQUENCE: - s->sequence += p->offset.seq; break; - } - } - - break; - } - - return 0; -} - -static struct plugin p = { - .name = "shift", - .description = "Shift the origin timestamp or sequence number of samples", - .type = PLUGIN_TYPE_HOOK, - .hook = { - .priority = 99, - .size = sizeof(struct shift), - .cb = hook_shift, - .when = HOOK_STORAGE | HOOK_READ - } -}; - -REGISTER_PLUGIN(&p) - -/** @} */ \ No newline at end of file diff --git a/lib/hooks/shift_seq.c b/lib/hooks/shift_seq.c new file mode 100644 index 000000000..4e3133675 --- /dev/null +++ b/lib/hooks/shift_seq.c @@ -0,0 +1,52 @@ +/** Shift sequence number of samples + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + *********************************************************************************/ + +/** @addtogroup hooks Hook functions + * @{ + */ + +#include "hook.h" +#include "plugin.h" + +struct shift { + int offset; +}; + +static int shift_seq_parse(struct hook *h, config_setting_t *cfg) +{ + struct shift *p = h->_vd; + + if (!config_setting_lookup_int(cfg, "offset", &p->offset)) + cerror(cfg, "Missing setting 'offset' for hook '%s'", plugin_name(h->_vt)); + + return 0; +} + +static int shift_seq_read(struct hook *h, struct sample *smps[], size_t *cnt) +{ + struct shift *p = h->_vd; + + for (int i = 0; i < *cnt; i++) + smps[i]->sequence += p->offset; + + return 0; +} + +static struct plugin p = { + .name = "shift_seq", + .description = "Shift sequence number of samples", + .type = PLUGIN_TYPE_HOOK, + .hook = { + .priority = 99, + .parse = shift_seq_parse, + .read = shift_seq_read, + .size = sizeof(struct shift), + } +}; + +REGISTER_PLUGIN(&p) + +/** @} */ \ No newline at end of file diff --git a/lib/hooks/shift_ts.c b/lib/hooks/shift_ts.c new file mode 100644 index 000000000..612bbf0d5 --- /dev/null +++ b/lib/hooks/shift_ts.c @@ -0,0 +1,95 @@ +/** Shift timestamps of samples. + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + *********************************************************************************/ + +/** @addtogroup hooks Hook functions + * @{ + */ + +#include "hook.h" +#include "plugin.h" +#include "timing.h" + +struct shift_ts { + struct timespec offset; + enum { + SHIFT_ORIGIN, + SHIFT_RECEIVED, + SHIFT_SENT, + } mode; +}; + +static int shift_ts_init(struct hook *h) +{ + struct shift_ts *p = h->_vd; + + p->mode = SHIFT_ORIGIN; /* Default mode */ + + return 0; +} + +static int shift_ts_parse(struct hook *h, config_setting_t *cfg) +{ + struct shift_ts *p = h->_vd; + + const char *mode; + + if (config_setting_lookup_string(cfg, "mode", &mode)) { + if (!strcmp(mode, "origin")) + p->mode = SHIFT_ORIGIN; + else if (!strcmp(mode, "received")) + p->mode = SHIFT_RECEIVED; + else if (!strcmp(mode, "sent")) + p->mode = SHIFT_SENT; + else + cerror(cfg, "Invalid mode parameter '%s' for hook '%s'", mode, plugin_name(h->_vt)); + } + + double offset; + if (!config_setting_lookup_float(cfg, "offset", &offset)) + cerror(cfg, "Missing setting 'offset' for hook '%s'", plugin_name(h->_vt)); + + p->offset = time_from_double(offset); + + return 0; +} + +static int shift_ts_read(struct hook *h, struct sample *smps[], size_t *cnt) +{ + struct shift_ts *p = h->_vd; + + for (int i = 0; i < *cnt; i++) { + struct sample *s = smps[i]; + struct timespec *ts; + + switch (p->mode) { + case SHIFT_ORIGIN: ts = &s->ts.origin; break; + case SHIFT_RECEIVED: ts = &s->ts.received; break; + case SHIFT_SENT: ts = &s->ts.sent; break; + default: return -1; + } + + *ts = time_add(ts, &p->offset); break; + } + + return 0; +} + +static struct plugin p = { + .name = "shift_ts", + .description = "Shift timestamps of samples", + .type = PLUGIN_TYPE_HOOK, + .hook = { + .priority = 99, + .init = shift_ts_init, + .parse = shift_ts_parse, + .read = shift_ts_read, + .size = sizeof(struct shift_ts) + } +}; + +REGISTER_PLUGIN(&p) + +/** @} */ \ No newline at end of file diff --git a/lib/hooks/skip_first.c b/lib/hooks/skip_first.c index 8597d423c..860acb8a7 100644 --- a/lib/hooks/skip_first.c +++ b/lib/hooks/skip_first.c @@ -36,85 +36,84 @@ struct skip_first { }; }; -static int hook_skip_first(struct hook *h, int when, struct hook_info *j) +static int skip_first_parse(struct hook *h, config_setting_t *cfg) { - struct skip_first *p = (struct skip_first *) h->_vd; + struct skip_first *p = h->_vd; - switch (when) { - case HOOK_PARSE: { - double seconds; - - if (!h->cfg) - error("Missing configuration for hook: '%s'", plugin_name(h->_vt)); - - if (config_setting_lookup_float(h->cfg, "seconds", &seconds)) { - p->seconds.wait = time_from_double(seconds); - p->mode = HOOK_SKIP_MODE_SECONDS; - } - else if (config_setting_lookup_int(h->cfg, "samples", &p->samples.wait)) { - p->mode = HOOK_SKIP_MODE_SAMPLES; - } - else - cerror(h->cfg, "Missing setting 'seconds' or 'samples' for hook '%s'", plugin_name(h->_vt)); + double seconds; - break; + if (config_setting_lookup_float(cfg, "seconds", &seconds)) { + p->seconds.wait = time_from_double(seconds); + p->mode = HOOK_SKIP_MODE_SECONDS; + } + else if (config_setting_lookup_int(cfg, "samples", &p->samples.wait)) { + p->mode = HOOK_SKIP_MODE_SAMPLES; + } + else + cerror(cfg, "Missing setting 'seconds' or 'samples' for hook '%s'", plugin_name(h->_vt)); + + return 0; +} + +static int skip_first_restart(struct hook *h) +{ + struct skip_first *p = h->_vd; + + p->state = HOOK_SKIP_FIRST_STATE_STARTED; + + return 0; +} + +static int skip_first_read(struct hook *h, struct sample *smps[], size_t *cnt) +{ + struct skip_first *p = h->_vd; + + if (p->state == HOOK_SKIP_FIRST_STATE_STARTED) { + switch (p->mode) { + case HOOK_SKIP_MODE_SAMPLES: + p->samples.until = smps[0]->sequence + p->samples.wait; + break; + + case HOOK_SKIP_MODE_SECONDS: + p->seconds.until = time_add(&smps[0]->ts.received, &p->seconds.wait); + break; } - case HOOK_PATH_START: - case HOOK_PATH_RESTART: - p->state = HOOK_SKIP_FIRST_STATE_STARTED; - break; - - case HOOK_READ: - assert(j->samples); - - if (p->state == HOOK_SKIP_FIRST_STATE_STARTED) { - switch (p->mode) { - case HOOK_SKIP_MODE_SAMPLES: - p->samples.until = j->samples[0]->sequence + p->samples.wait; - break; - - case HOOK_SKIP_MODE_SECONDS: - p->seconds.until = time_add(&j->samples[0]->ts.received, &p->seconds.wait); - break; - } - - p->state = HOOK_SKIP_FIRST_STATE_SKIPPING; - } - - int i, ok; - for (i = 0, ok = 0; i < j->count; i++) { - bool skip; - switch (p->mode) { - case HOOK_SKIP_MODE_SAMPLES: - skip = p->samples.until >= j->samples[i]->sequence; - break; - - case HOOK_SKIP_MODE_SECONDS: - skip = time_delta(&p->seconds.until, &j->samples[i]->ts.received) < 0; - break; - default: - skip = false; - } - - if (!skip) { - struct sample *tmp; - - tmp = j->samples[i]; - j->samples[i] = j->samples[ok]; - j->samples[ok++] = tmp; - } - - /* To discard the first X samples in 'smps[]' we must - * shift them to the end of the 'smps[]' array. - * In case the hook returns a number 'ok' which is smaller than 'cnt', - * only the first 'ok' samples in 'smps[]' are accepted and further processed. - */ - } - - j->count = ok; + p->state = HOOK_SKIP_FIRST_STATE_SKIPPING; } + int i, ok; + for (i = 0, ok = 0; i < *cnt; i++) { + bool skip; + switch (p->mode) { + case HOOK_SKIP_MODE_SAMPLES: + skip = p->samples.until >= smps[i]->sequence; + break; + + case HOOK_SKIP_MODE_SECONDS: + skip = time_delta(&p->seconds.until, &smps[i]->ts.received) < 0; + break; + default: + skip = false; + } + + if (!skip) { + struct sample *tmp; + + tmp = smps[i]; + smps[i] = smps[ok]; + smps[ok++] = tmp; + } + + /* To discard the first X samples in 'smps[]' we must + * shift them to the end of the 'smps[]' array. + * In case the hook returns a number 'ok' which is smaller than 'cnt', + * only the first 'ok' samples in 'smps[]' are accepted and further processed. + */ + } + + *cnt = ok; + return 0; } @@ -124,9 +123,11 @@ static struct plugin p = { .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 99, - .size = sizeof(struct skip_first), - .cb = hook_skip_first, - .when = HOOK_STORAGE | HOOK_PARSE | HOOK_READ | HOOK_PATH + .parse = skip_first_parse, + .start = skip_first_restart, + .restart = skip_first_restart, + .read = skip_first_read, + .size = sizeof(struct skip_first) } }; diff --git a/lib/hooks/stats.c b/lib/hooks/stats.c index 0d65350ed..4de4ebf8a 100644 --- a/lib/hooks/stats.c +++ b/lib/hooks/stats.c @@ -13,7 +13,7 @@ #include "stats.h" #include "path.h" -struct stats_hook { +struct stats_collect { struct stats stats; enum stats_format format; @@ -23,128 +23,202 @@ struct stats_hook { const char *uri; }; -static int hook_stats(struct hook *h, int when, struct hook_info *j) +static int stats_collect_init(struct hook *h) { - struct stats_hook *p = (struct stats_hook *) h->_vd; + struct stats_collect *p = h->_vd; + + stats_init(&p->stats); + + /* Register statistic object to path. + * + * This allows the path code to update statistics. */ + if (h->path) + h->path->stats = &p->stats; - switch (when) { - case HOOK_INIT: - stats_init(&p->stats); - - /* Register statistic object to path. - * - * This allows the path code to update statistics. */ - if (j->path) - j->path->stats = &p->stats; - - /* Set default values */ - p->format = STATS_FORMAT_HUMAN; - p->verbose = 0; - p->uri = NULL; - p->output = stdout; - - break; - - case HOOK_PARSE: { - const char *format; - if (config_setting_lookup_string(h->cfg, "format", &format)) { - if (!strcmp(format, "human")) - p->format = STATS_FORMAT_HUMAN; - else if (!strcmp(format, "json")) - p->format = STATS_FORMAT_JSON; - else if (!strcmp(format, "matlab")) - p->format = STATS_FORMAT_MATLAB; - else - cerror(h->cfg, "Invalid statistic output format: %s", format); - } - - config_setting_lookup_int(h->cfg, "verbose", &p->verbose); - config_setting_lookup_string(h->cfg, "output", &p->uri); + /* Set default values */ + p->format = STATS_FORMAT_HUMAN; + p->verbose = 0; + p->uri = NULL; + p->output = stdout; + + return 0; +} - break; - } +static int stats_collect_destroy(struct hook *h) +{ + struct stats_collect *p = h->_vd; + + stats_destroy(&p->stats); + + return 0; +} - case HOOK_DESTROY: - stats_destroy(&p->stats); - break; - - case HOOK_READ: - assert(j->samples); - - stats_collect(p->stats.delta, j->samples, j->count); - stats_commit(&p->stats, p->stats.delta); - break; - - case HOOK_PATH_START: - if (p->uri) { - p->output = fopen(p->uri, "w+"); - if (!p->output) - error("Failed to open file %s for writing", p->uri); - } - break; - - case HOOK_PATH_STOP: - stats_print(&p->stats, p->output, p->format, p->verbose); - - if (p->uri) - fclose(p->output); - - break; - - case HOOK_PATH_RESTART: - stats_reset(&p->stats); - break; - - case HOOK_PERIODIC: - assert(j->path); - - stats_print_periodic(&p->stats, p->output, p->format, p->verbose, j->path); - break; +static int stats_collect_start(struct hook *h) +{ + struct stats_collect *p = h->_vd; + + if (p->uri) { + p->output = fopen(p->uri, "w+"); + if (!p->output) + error("Failed to open file %s for writing", p->uri); } return 0; } +static int stats_collect_stop(struct hook *h) +{ + struct stats_collect *p = h->_vd; + + stats_print(&p->stats, p->output, p->format, p->verbose); + + if (p->uri) + fclose(p->output); + + return 0; +} + +static int stats_collect_restart(struct hook *h) +{ + struct stats_collect *p = h->_vd; + + stats_reset(&p->stats); + + return 0; +} + +static int stats_collect_periodic(struct hook *h) +{ + struct stats_collect *p = h->_vd; + + stats_print_periodic(&p->stats, p->output, p->format, p->verbose, h->path); + + return 0; +} + +static int stats_collect_parse(struct hook *h, config_setting_t *cfg) +{ + struct stats_collect *p = h->_vd; + + const char *format; + if (config_setting_lookup_string(cfg, "format", &format)) { + if (!strcmp(format, "human")) + p->format = STATS_FORMAT_HUMAN; + else if (!strcmp(format, "json")) + p->format = STATS_FORMAT_JSON; + else if (!strcmp(format, "matlab")) + p->format = STATS_FORMAT_MATLAB; + else + cerror(cfg, "Invalid statistic output format: %s", format); + } + + config_setting_lookup_int(cfg, "verbose", &p->verbose); + config_setting_lookup_string(cfg, "output", &p->uri); + + return 0; +} + +static int stats_collect_read(struct hook *h, struct sample *smps[], size_t *cnt) +{ + struct stats_collect *p = h->_vd; + + stats_collect(p->stats.delta, smps, *cnt); + stats_commit(&p->stats, p->stats.delta); + + return 0; +} + struct stats_send { struct node *dest; - struct stats *stats; - int ratio; + + enum { + STATS_SEND_MODE_PERIODIC, + STATS_SEND_MODE_READ + } mode; + + int decimation; }; -/** @todo This is untested */ -static int hook_stats_send(struct hook *h, int when, struct hook_info *j) +static int stats_send_init(struct hook *h) { - struct stats_send *p = (struct stats_send *) h->_vd; + struct stats_send *p = h->_vd; + + p->decimation = 1; + p->mode = STATS_SEND_MODE_PERIODIC; - switch (when) { - case HOOK_INIT: - assert(j->nodes); - assert(j->path); - - if (!h->cfg) - error("Missing configuration for hook '%s'", plugin_name(h->_vt)); - - const char *dest; - - if (!config_setting_lookup_string(h->cfg, "destination", &dest)) - cerror(h->cfg, "Missing setting 'destination' for hook '%s'", plugin_name(h->_vt)); - - p->dest = list_lookup(j->nodes, dest); - if (!p->dest) - cerror(h->cfg, "Invalid destination node '%s' for hook '%s'", dest, plugin_name(h->_vt)); - break; - - case HOOK_PATH_START: - node_start(p->dest); - break; + return 0; +} - case HOOK_PATH_STOP: - node_stop(p->dest); - break; +static int stats_send_parse(struct hook *h, config_setting_t *cfg) +{ + struct stats_send *p = h->_vd; - case HOOK_READ: - stats_send(p->stats, p->dest); - break; + assert(h->path && h->path->super_node); + + const char *dest, *mode; + + if (config_setting_lookup_string(cfg, "destination", &dest)) { + p->dest = list_lookup(&h->path->super_node->nodes, dest); + if (!p->dest) + cerror(cfg, "Invalid destination node '%s' for hook '%s'", dest, plugin_name(h->_vt)); + } + else + cerror(cfg, "Missing setting 'destination' for hook '%s'", plugin_name(h->_vt)); + + if (config_setting_lookup_string(cfg, "destination", &mode)) { + if (!strcmp(mode, "periodic")) + p->mode = STATS_SEND_MODE_PERIODIC; + else if (!strcmp(mode, "read")) + p->mode = STATS_SEND_MODE_READ; + else + cerror(cfg, "Invalid value '%s' for setting 'mode' of hook '%s'", mode, plugin_name(h->_vt)); + } + + config_setting_lookup_int(cfg, "decimation", &p->decimation); + + return 0; +} + +static int stats_send_start(struct hook *h) +{ + struct stats_send *p = h->_vd; + + if (p->dest->state != STATE_STOPPED) + node_start(p->dest); + + return 0; +} + +static int stats_send_stop(struct hook *h) +{ + struct stats_send *p = h->_vd; + + if (p->dest->state != STATE_STOPPED) + node_stop(p->dest); + + return 0; +} + +static int stats_send_periodic(struct hook *h) +{ + struct stats_send *p = h->_vd; + + if (p->mode == STATS_SEND_MODE_PERIODIC) + stats_send(h->path->stats, p->dest); + + return 0; +} + +static int stats_send_read(struct hook *h, struct sample *smps[], size_t *cnt) +{ + struct stats_send *p = h->_vd; + + assert(h->path->stats); + + if (p->mode == STATS_SEND_MODE_READ) { + size_t processed = h->path->stats->histograms[STATS_OWD].total; + if (processed % p->decimation == 0) + stats_send(h->path->stats, p->dest); } return 0; @@ -156,9 +230,15 @@ static struct plugin p1 = { .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 2, - .size = sizeof(struct stats_hook), - .cb = hook_stats, - .when = HOOK_STORAGE | HOOK_PARSE | HOOK_PATH | HOOK_READ | HOOK_PERIODIC + .init = stats_collect_init, + .destroy= stats_collect_destroy, + .start = stats_collect_start, + .stop = stats_collect_stop, + .read = stats_collect_read, + .restart= stats_collect_restart, + .periodic= stats_collect_periodic, + .parse = stats_collect_parse, + .size = sizeof(struct stats_collect), } }; @@ -168,8 +248,13 @@ static struct plugin p2 = { .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 99, - .cb = hook_stats_send, - .when = HOOK_STORAGE | HOOK_PATH | HOOK_READ + .init = stats_send_init, + .parse = stats_send_parse, + .start = stats_send_start, + .stop = stats_send_stop, + .periodic= stats_send_periodic, + .read = stats_send_read, + .size = sizeof(struct stats_send) } }; diff --git a/lib/hooks/ts.c b/lib/hooks/ts.c index bac2f5e2b..7e65f7f2b 100644 --- a/lib/hooks/ts.c +++ b/lib/hooks/ts.c @@ -12,24 +12,22 @@ #include "plugin.h" #include "timing.h" -static int hook_ts(struct hook *h, int when, struct hook_info *j) +static int ts_read(struct hook *h, struct sample *smps[], size_t *cnt) { - assert(j->samples); - - for (int i = 0; i < j->count; i++) - j->samples[i]->ts.origin = j->samples[i]->ts.received; + for (int i = 0; i < *cnt; i++) + smps[i]->ts.origin = smps[i]->ts.received; return 0; } static struct plugin p = { .name = "ts", - .description = "Update timestamp of message with current time", + .description = "Overwrite origin timestamp of samples with receive timestamp", .type = PLUGIN_TYPE_HOOK, .hook = { .priority = 99, - .cb = hook_ts, - .when = HOOK_READ + .read = ts_read, + .size = 0 } }; diff --git a/src/hook.c b/src/hook.c index 0c41fd3f6..986ff490e 100644 --- a/src/hook.c +++ b/src/hook.c @@ -61,12 +61,15 @@ static int hook_parse_cli(struct hook *h, char *params[], int paramlen) static void usage() { printf("Usage: villas-hook [OPTIONS] NAME [PARAM] \n"); - printf(" NAME the name of the hook function to run\n"); - printf(" PARAM a string of configuration settings for the hook\n\n"); + printf(" PARAM a string of configuration settings for the hook\n"); printf(" OPTIONS are:\n"); printf(" -h show this help\n"); printf(" -d LVL set debug level to LVL\n"); printf(" -v CNT process CNT samples at once\n"); + printf(" NAME the name of the hook function\n\n"); + + printf("The following hook functions are supported:\n"); + plugin_dump(PLUGIN_TYPE_HOOK); printf("\n"); printf("Example:"); printf(" villas-signal random | villas-hook skip_first seconds=10\n"); @@ -77,7 +80,9 @@ static void usage() int main(int argc, char *argv[]) { - int ret, level, cnt; + int ret, level; + + size_t cnt, recv; /* Default values */ level = V; @@ -88,9 +93,9 @@ int main(int argc, char *argv[]) struct log log; struct plugin *p; struct sample *samples[cnt]; - struct pool pool = { .state = STATE_DESTROYED }; + + struct pool q = { .state = STATE_DESTROYED }; struct hook h = { .state = STATE_DESTROYED }; - struct hook_info hi = { .samples = samples }; char c; while ((c = getopt(argc, argv, "hv:d:")) != -1) { @@ -109,6 +114,8 @@ int main(int argc, char *argv[]) } log_init(&log, level, LOG_ALL); + log_start(&log); + memory_init(DEFAULT_NR_HUGEPAGES); if (argc < optind + 1) { @@ -119,41 +126,47 @@ int main(int argc, char *argv[]) if (cnt < 1) error("Vectorize option must be greater than 0"); - ret = pool_init(&pool, 10 * cnt, SAMPLE_LEN(DEFAULT_VALUES), &memtype_hugepage); + ret = pool_init(&q, 10 * cnt, SAMPLE_LEN(DEFAULT_VALUES), &memtype_hugepage); if (ret) error("Failed to initilize memory pool"); - name = argv[optind]; + name = argv[optind]; p = plugin_lookup(PLUGIN_TYPE_HOOK, name); if (!p) - error("Unknown hook function '%s'", argv[optind]); + error("Unknown hook function '%s'", name); config_init(&cfg); - hook_init(&h, &p->hook, NULL); - hook_parse_cli(&h, &argv[optind + 1], argc - optind - 1); - hook_run(&h, HOOK_PATH_START, &hi); + ret = hook_init(&h, &p->hook, NULL); + if (ret) + error("Failed to initialize hook"); + + ret = hook_parse_cli(&h, &argv[optind + 1], argc - optind - 1); + if (ret) + error("Failed to parse hook config"); + + hook_start(&h); while (!feof(stdin)) { - ret = sample_alloc(&pool, samples, cnt); + ret = sample_alloc(&q, samples, cnt); if (ret != cnt) - error("Failed to allocate %u samples from pool", cnt); + error("Failed to allocate %zu samples from pool", cnt); - hi.count = 0; + recv = 0; for (int j = 0; j < cnt && !feof(stdin); j++) { ret = sample_fscan(stdin, hi.samples[j], NULL); if (ret < 0) break; - hi.samples[j]->ts.received = time_now(); - hi.count++; + samples[j]->ts.received = time_now(); + recv++; } - debug(15, "Read %d samples from stdin", cnt); + debug(15, "Read %zu samples from stdin", recv); - hook_run(&h, HOOK_READ, &hi); - hook_run(&h, HOOK_WRITE, &hi); + hook_read(&h, samples, &recv); + hook_write(&h, samples, &recv); for (int j = 0; j < hi.count; j++) sample_fprint(stdout, hi.samples[j], SAMPLE_ALL); @@ -162,13 +175,12 @@ int main(int argc, char *argv[]) sample_free(samples, cnt); } - hook_run(&h, HOOK_PATH_STOP, &hi); - + hook_stop(&h); hook_destroy(&h); config_destroy(&cfg); sample_free(samples, cnt); - pool_destroy(&pool); + pool_destroy(&q); return 0; } \ No newline at end of file