1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

yet another rewrite of the hook system

This commit is contained in:
Steffen Vogel 2017-03-27 12:26:11 +02:00
parent 9890777c72
commit 38a983b26e
16 changed files with 791 additions and 624 deletions

View file

@ -20,6 +20,7 @@
#include <time.h>
#include <string.h>
#include <stdbool.h>
#include "queue.h"
#include "list.h"
@ -32,72 +33,25 @@ struct hook;
struct sample;
struct super_node;
/** Optional parameters to hook callbacks */
struct hook_info {
struct path *path;
struct list *nodes;
struct list *paths;
struct sample **samples;
size_t count;
};
/** Callback type of hook function
*
* @param h The hook datastructure which contains parameter, name and private context for the hook.
* @param when Provides the type of hook for which this occurence of the callback function was executed. See hook_type for possible values.
* @param i The hook_info structure contains references to the current node, path or samples. Some fields of this structure can be NULL.
* @retval 0 Success. Continue processing and forwarding the message.
* @retval <0 Error. Drop the message.
*/
typedef int (*hook_cb_t)(struct hook *h, int when, struct hook_info *i);
/** Destructor callback for hook_storage()
*
* @param data A pointer to the data which should be destroyed.
*/
typedef int (*dtor_cb_t)(void *);
/** Constructor callback for hook_storage() */
typedef int (*ctor_cb_t)(void *);
enum hook_state {
HOOK_DESTROYED,
HOOK_INITIALIZED
};
/** The type of a hook defines when a hook will be exectuted. This is used as a bitmask. */
enum hook_when {
HOOK_PATH_START = 1 << 0, /**< Called whenever a path is started; before threads are created. */
HOOK_PATH_STOP = 1 << 1, /**< Called whenever a path is stopped; after threads are destoyed. */
HOOK_PATH_RESTART = 1 << 2, /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */
HOOK_READ = 1 << 3, /**< Called for every single received samples. */
HOOK_WRITE = 1 << 4, /**< Called for every single sample which will be sent. */
HOOK_ASYNC = 1 << 7, /**< Called asynchronously with fixed rate (see path::rate). */
HOOK_PERIODIC = 1 << 8, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */
HOOK_INIT = 1 << 9, /**< Called before path is started to parseHOOK_DESTROYs. */
HOOK_DESTROY = 1 << 10, /**< Called after path has been stopped to release memory allocated by HOOK_INIT */
HOOK_AUTO = 1 << 11, /**< Internal hooks are added to every path implicitely. */
HOOK_PARSE = 1 << 12, /**< Called for parsing hook arguments. */
/** @{ Classes of hooks */
/** Hooks which are using private data must allocate and free them propery. */
HOOK_STORAGE = HOOK_INIT | HOOK_DESTROY,
/** All path related actions */
HOOK_PATH = HOOK_PATH_START | HOOK_PATH_STOP | HOOK_PATH_RESTART
/** @} */
};
struct hook_type {
enum hook_when when; /**< The type of the hook as a bitfield */
hook_cb_t cb; /**< The hook callback function as a function pointer. */
int priority; /**< Default priority of this hook type. */
bool builtin; /**< Should we add this hook by default to every path?. */
size_t size; /**< Size of allocation for struct hook::_vd */
int (*parse)(struct hook *h, config_setting_t *cfg);
int (*init)(struct hook *h); /**< Called before path is started to parseHOOK_DESTROYs. */
int (*destroy)(struct hook *h); /**< Called after path has been stopped to release memory allocated by HOOK_INIT */
int (*start)(struct hook *h); /**< Called whenever a path is started; before threads are created. */
int (*stop)(struct hook *h); /**< Called whenever a path is stopped; after threads are destoyed. */
int (*periodic)(struct hook *h);/**< Called periodically. Period is set by global 'stats' option in the configuration file. */
int (*restart)(struct hook *h); /**< Called whenever a new simulation case is started. This is detected by a sequence no equal to zero. */
int (*read)(struct hook *h, struct sample *smps[], size_t *cnt); /**< Called for every single received samples. */
int (*write)(struct hook *h, struct sample *smps[], size_t *cnt); /**< Called for every single sample which will be sent. */
};
/** Descriptor for user defined hooks. See hooks[]. */
@ -105,17 +59,16 @@ struct hook {
enum state state;
struct sample *prev, *last;
struct path *path;
struct hook_type *_vt; /**< C++ like Vtable pointer. */
void *_vd; /**< Private data for this hook. This pointer can be used to pass data between consecutive calls of the callback. */
int priority; /**< A priority to change the order of execution within one type of hook. */
config_setting_t *cfg;
};
/** Save references to global nodes, paths and settings */
int hook_init(struct hook *h, struct hook_type *vt, struct super_node *sn);
int hook_init(struct hook *h, struct hook_type *vt, struct path *p);
/** Parse a single hook.
*
@ -127,10 +80,20 @@ int hook_init(struct hook *h, struct hook_type *vt, struct super_node *sn);
*/
int hook_parse(struct hook *h, config_setting_t *cfg);
int hook_run(struct hook *h, int when, struct hook_info *i);
int hook_destroy(struct hook *h);
int hook_start(struct hook *h);
int hook_stop(struct hook *h);
int hook_periodic(struct hook *h);
int hook_restart(struct hook *h);
int hook_read(struct hook *h, struct sample *smps[], size_t *cnt);
int hook_write(struct hook *h, struct sample *smps[], size_t *cnt);
size_t hook_read_list(struct list *hs, struct sample *smps[], size_t cnt);
size_t hook_write_list(struct list *hs, struct sample *smps[], size_t cnt);
/** Compare two hook functions with their priority. Used by list_sort() */
int hook_cmp_priority(const void *a, const void *b);
@ -148,4 +111,4 @@ int hook_cmp_priority(const void *a, const void *b);
* hooks = [ "print" ]
* }
*/
int hook_parse_list(struct list *list, config_setting_t *cfg, struct super_node *sn);
int hook_parse_list(struct list *list, config_setting_t *cfg, struct path *p);

View file

@ -19,23 +19,20 @@
#include "node.h"
#include "plugin.h"
int hook_init(struct hook *h, struct hook_type *vt, struct super_node *sn)
int hook_init(struct hook *h, struct hook_type *vt, struct path *p)
{
int ret;
struct hook_info i = {
.nodes = &sn->nodes,
.paths = &sn->paths
};
assert(h->state == STATE_DESTROYED);
h->priority = vt->priority;
h->path = p;
h->_vt = vt;
h->_vd = alloc(vt->size);
ret = hook_run(h, HOOK_INIT, &i);
ret = h->_vt->init ? h->_vt->init(h) : 0;
if (ret)
return ret;
@ -49,21 +46,13 @@ int hook_parse(struct hook *h, config_setting_t *cfg)
int ret;
assert(h->state != STATE_DESTROYED);
h->cfg = cfg;
config_setting_lookup_int(h->cfg, "priority", &h->priority);
if (h->_vt->when & HOOK_PARSE) {
if (!h->cfg)
error("Missing configuration for hook: '%s'", plugin_name(h->_vt));
/* Parse hook arguments */
ret = hook_run(h, HOOK_PARSE, NULL);
if (ret)
return ret;
}
config_setting_lookup_int(cfg, "priority", &h->priority);
ret = h->_vt->parse ? h->_vt->parse(h, cfg) : 0;
if (ret)
return ret;
h->state = STATE_PARSED;
return 0;
@ -75,18 +64,82 @@ int hook_destroy(struct hook *h)
assert(h->state != STATE_DESTROYED);
ret = hook_run(h, HOOK_DESTROY, NULL);
ret = h->_vt->destroy ? h->_vt->destroy(h) : 0;
if (ret)
return ret;
if (h->_vd)
free(h->_vd);
h->state = HOOK_DESTROYED;
h->state = STATE_DESTROYED;
return 0;
}
int hook_start(struct hook *h)
{
return h->_vt->start ? h->_vt->start(h) : 0;
}
int hook_stop(struct hook *h)
{
return h->_vt->stop ? h->_vt->stop(h) : 0;
}
int hook_periodic(struct hook *h)
{
return h->_vt->periodic ? h->_vt->periodic(h) : 0;
}
int hook_restart(struct hook *h)
{
return h->_vt->restart ? h->_vt->restart(h) : 0;
}
int hook_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
return h->_vt->read ? h->_vt->read(h, smps, cnt) : 0;
}
size_t hook_read_list(struct list *hs, struct sample *smps[], size_t cnt)
{
size_t ret;
for (size_t i = 0; i < list_length(hs); i++) {
struct hook *h = list_at(hs, i);
ret = hook_read(h, smps, &cnt);
if (ret || !cnt)
/* Abort hook processing if earlier hooks removed all samples
* or they returned something non-zero */
break;
}
return cnt;
}
size_t hook_write_list(struct list *hs, struct sample *smps[], size_t cnt)
{
size_t ret;
for (size_t i = 0; i < list_length(hs); i++) {
struct hook *h = list_at(hs, i);
ret = hook_write(h, smps, &cnt);
if (ret || !cnt)
/* Abort hook processing if earlier hooks removed all samples
* or they returned something non-zero */
break;
}
return cnt;
}
int hook_write(struct hook *h, struct sample *smps[], size_t *cnt)
{
return h->_vt->write ? h->_vt->write(h, smps, cnt) : 0;
}
int hook_cmp_priority(const void *a, const void *b)
{
struct hook *ha = (struct hook *) a;
@ -95,24 +148,16 @@ int hook_cmp_priority(const void *a, const void *b)
return ha->priority - hb->priority;
}
int hook_run(struct hook *h, int when, struct hook_info *i)
{
debug(LOG_HOOK | 22, "Running hook '%s' when=%u, prio=%u, cnt=%zu", plugin_name(h->_vt), when, h->priority, i ? i->count : 0);
return h->_vt->when & when ? h->_vt->cb(h, when, i) : 0;
}
int hook_parse_list(struct list *list, config_setting_t *cfg, struct super_node *sn)
int hook_parse_list(struct list *list, config_setting_t *cfg, struct path *o)
{
struct hook h;
struct plugin *p;
int ret;
int ret, priority = 10;
if (!config_setting_is_group(cfg))
cerror(cfg, "Hooks must be configured with an object");
int priority = 10;
for (int i = 0; i < config_setting_length(cfg); i++) {
config_setting_t *cfg_hook = config_setting_get_elem(cfg, i);
@ -125,10 +170,12 @@ int hook_parse_list(struct list *list, config_setting_t *cfg, struct super_node
if (!config_setting_is_group(cfg_hook))
cerror(cfg_hook, "The 'hooks' setting must be an array of strings.");
ret = hook_init(&h, &p->hook, sn);
ret = hook_init(&h, &p->hook, o);
if (ret)
continue;
/* If the user does not assign a priority, we will use the
* position of the hook section in the congiguration file. */
h.priority = priority++;
ret = hook_parse(&h, cfg_hook);
@ -138,5 +185,5 @@ int hook_parse_list(struct list *list, config_setting_t *cfg, struct super_node
list_push(list, memdup(&h, sizeof(h)));
}
return list_length(list);
return 0;
}

View file

@ -18,37 +18,36 @@ struct convert {
} mode;
};
static int hook_convert(struct hook *h, int when, struct hook_info *j)
static int convert_parse(struct hook *h, config_setting_t *cfg)
{
struct convert *p = (struct convert *) h->_vd;
struct convert *p = h->_vd;
switch (when) {
case HOOK_PARSE: {
const char *mode;
if (!config_setting_lookup_string(h->cfg, "mode", &mode))
cerror(h->cfg, "Missing setting 'mode' for hook '%s'", plugin_name(h->_vt));
if (!strcmp(mode, "fixed"))
p->mode = TO_FIXED;
else if (!strcmp(mode, "float"))
p->mode = TO_FLOAT;
else
error("Invalid parameter '%s' for hook 'convert'", mode);
break;
}
case HOOK_READ:
for (int i = 0; i < j->count; i++) {
for (int k = 0; k < j->samples[i]->length; k++) {
switch (p->mode) {
case TO_FIXED: j->samples[i]->data[k].i = j->samples[i]->data[k].f * 1e3; break;
case TO_FLOAT: j->samples[i]->data[k].f = j->samples[i]->data[k].i; break;
}
}
const char *mode;
if (!config_setting_lookup_string(cfg, "mode", &mode))
cerror(cfg, "Missing setting 'mode' for hook '%s'", plugin_name(h->_vt));
if (!strcmp(mode, "fixed"))
p->mode = TO_FIXED;
else if (!strcmp(mode, "float"))
p->mode = TO_FLOAT;
else
error("Invalid parameter '%s' for hook 'convert'", mode);
return 0;
}
static int convert_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
struct convert *p = h->_vd;
for (int i = 0; i < *cnt; i++) {
for (int k = 0; k < smps[i]->length; k++) {
switch (p->mode) {
case TO_FIXED: smps[i]->data[k].i = smps[i]->data[k].f * 1e3; break;
case TO_FLOAT: smps[i]->data[k].f = smps[i]->data[k].i; break;
}
break;
}
}
return 0;
@ -60,9 +59,9 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.size = sizeof(struct convert),
.cb = hook_convert,
.when = HOOK_STORAGE | HOOK_PARSE | HOOK_READ
.parse = convert_parse,
.read = convert_read,
.size = sizeof(struct convert)
}
};

View file

@ -16,40 +16,44 @@ struct decimate {
unsigned counter;
};
static int hook_decimate(struct hook *h, int when, struct hook_info *j)
static int decimate_init(struct hook *h)
{
struct decimate *p = (struct decimate *) h->_vd;
switch (when) {
case HOOK_INIT:
p->counter = 0;
break;
case HOOK_PARSE:
if (!h->cfg)
error("Missing configuration for hook: '%s'", plugin_name(h->_vt));
if (!config_setting_lookup_int(h->cfg, "ratio", &p->ratio))
cerror(h->cfg, "Missing setting 'ratio' for hook '%s'", plugin_name(h->_vt));
struct decimate *p = h->_vd;
break;
case HOOK_READ:
assert(j->samples);
int i, ok;
for (i = 0, ok = 0; i < j->count; i++) {
if (p->counter++ % p->ratio == 0) {
struct sample *tmp;
tmp = j->samples[ok];
j->samples[ok++] = j->samples[i];
j->samples[i] = tmp;
}
}
p->counter = 0;
return ok;
return 0;
}
static int decimate_parse(struct hook *h, config_setting_t *cfg)
{
struct decimate *p = h->_vd;
if (!cfg)
error("Missing configuration for hook: '%s'", plugin_name(h->_vt));
if (!config_setting_lookup_int(cfg, "ratio", &p->ratio))
cerror(cfg, "Missing setting 'ratio' for hook '%s'", plugin_name(h->_vt));
return 0;
}
static int decimate_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
struct decimate *p = h->_vd;
int i, ok;
for (i = 0, ok = 0; i < *cnt; i++) {
if (p->counter++ % p->ratio == 0) {
struct sample *tmp;
tmp = smps[ok];
smps[ok++] = smps[i];
smps[i] = tmp;
}
}
*cnt = ok;
return 0;
}
@ -60,9 +64,10 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.size = sizeof(struct decimate),
.cb = hook_decimate,
.when = HOOK_STORAGE | HOOK_PARSE | HOOK_DESTROY | HOOK_READ
.init = decimate_init,
.parse = decimate_parse,
.read = decimate_read,
.size = sizeof(struct decimate)
}
};

View file

@ -13,28 +13,26 @@
#include "stats.h"
#include "path.h"
static int hook_drop(struct hook *h, int when, struct hook_info *j)
static int drop_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
int i, ok, dist;
assert(j->samples);
for (i = 0, ok = 0; i < j->count; i++) {
h->last = j->samples[i];
for (i = 0, ok = 0; i < *cnt; i++) {
h->last = smps[i];
if (h->prev) {
dist = h->last->sequence - (int32_t) h->prev->sequence;
if (dist <= 0) {
warn("Dropped sample: dist = %d, i = %d", dist, i);
if (j->path && j->path->stats)
stats_update(j->path->stats->delta, STATS_DROPPED, dist);
if (h->path && h->path->stats)
stats_update(h->path->stats->delta, STATS_REORDERED, dist);
}
else {
struct sample *tmp;
tmp = j->samples[i];
j->samples[i] = j->samples[ok];
j->samples[ok++] = tmp;
tmp = smps[i];
smps[i] = smps[ok];
smps[ok++] = tmp;
}
/* To discard the first X samples in 'smps[]' we must
@ -46,15 +44,17 @@ static int hook_drop(struct hook *h, int when, struct hook_info *j)
else {
struct sample *tmp;
tmp = j->samples[i];
j->samples[i] = j->samples[ok];
j->samples[ok++] = tmp;
tmp = smps[i];
smps[i] = smps[ok];
smps[ok++] = tmp;
}
h->prev = h->last;
}
return ok;
*cnt = ok;
return 0;
}
static struct plugin p = {
@ -63,8 +63,8 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 3,
.cb = hook_drop,
.when = HOOK_AUTO | HOOK_READ
.builtin = true,
.read = drop_read
}
};

View file

@ -12,26 +12,26 @@
#include "plugin.h"
#include "timing.h"
int hook_fix_ts(struct hook *h, int when, struct hook_info *j)
int fix_ts_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
struct timespec now = time_now();
struct timespec now;
assert(j->samples);
now = time_now();
for (int i = 0; i < j->count; i++) {
for (int i = 0; i < *cnt; i++) {
/* Check for missing receive timestamp
* Usually node_type::read() should update the receive timestamp.
* An example would be to use hardware timestamp capabilities of
* modern NICs.
*/
if ((j->samples[i]->ts.received.tv_sec == 0 && j->samples[i]->ts.received.tv_nsec == 0) ||
(j->samples[i]->ts.received.tv_sec == -1 && j->samples[i]->ts.received.tv_nsec == -1))
j->samples[i]->ts.received = now;
if ((smps[i]->ts.received.tv_sec == 0 && smps[i]->ts.received.tv_nsec == 0) ||
(smps[i]->ts.received.tv_sec == -1 && smps[i]->ts.received.tv_nsec == -1))
smps[i]->ts.received = now;
/* Check for missing origin timestamp */
if ((j->samples[i]->ts.origin.tv_sec == 0 && j->samples[i]->ts.origin.tv_nsec == 0) ||
(j->samples[i]->ts.origin.tv_sec == -1 && j->samples[i]->ts.origin.tv_nsec == -1))
j->samples[i]->ts.origin = now;
if ((smps[i]->ts.origin.tv_sec == 0 && smps[i]->ts.origin.tv_nsec == 0) ||
(smps[i]->ts.origin.tv_sec == -1 && smps[i]->ts.origin.tv_nsec == -1))
smps[i]->ts.origin = now;
}
return 0;
@ -43,8 +43,8 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 0,
.cb = hook_fix_ts,
.when = HOOK_AUTO | HOOK_READ
.builtin = true,
.read = fix_ts_read,
}
};

View file

@ -20,56 +20,61 @@ struct map {
struct stats *stats;
};
static int hook_map(struct hook *h, int when, struct sample *smps[], size_t *cnt)
static int map_init(struct hook *h)
{
struct map *p = h->_vd;
return mapping_init(&p->mapping);
}
static int map_destroy(struct hook *h)
{
struct map *p = h->_vd;
return mapping_destroy(&p->mapping);
}
static int map_parse(struct hook *h, config_setting_t *cfg)
{
struct map *p = h->_vd;
int ret;
config_setting_t *cfg_mapping;
cfg_mapping = config_setting_lookup(cfg, "mapping");
if (!config_setting_is_array(cfg_mapping))
return -1;
ret = mapping_parse(&p->mapping, cfg_mapping);
if (ret)
return ret;
return 0;
}
static int map_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
int ret;
struct map *p = (struct map *) h->_vd;
struct map *p = h->_vd;
struct sample *tmp[*cnt];
switch (when) {
case HOOK_INIT:
mapping_init(&p->mapping);
break;
case HOOK_DESTROY:
mapping_destroy(&p->mapping);
break;
case HOOK_PARSE: {
config_setting_t *cfg_mapping;
cfg_mapping = config_setting_lookup(h->cfg, "mapping");
if (!config_setting_is_array(cfg_mapping))
return -1;
ret = mapping_parse(&p->mapping, cfg_mapping);
if (ret)
return ret;
if (*cnt <= 0)
return 0;
break;
}
ret = sample_alloc(smps[0]->pool, tmp, *cnt);
if (ret != *cnt)
return ret;
case HOOK_READ: {
struct sample *tmp[*cnt];
if (*cnt <= 0)
return 0;
ret = sample_alloc(smps[0]->pool, tmp, *cnt);
if (ret != *cnt)
return ret;
for (int i = 0; i < *cnt; i++) {
mapping_remap(&p->mapping, smps[i], tmp[i], NULL);
SWAP(smps[i], tmp[i]);
}
sample_free(tmp, *cnt);
break;
}
}
for (int i = 0; i < *cnt; i++) {
mapping_remap(&p->mapping, smps[i], tmp[i], NULL);
SWAP(smps[i], tmp[i]);
}
sample_free(tmp, *cnt);
return 0;
}
@ -79,9 +84,11 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.size = sizeof(struct map),
.cb = hook_map,
.when = HOOK_STORAGE | HOOK_READ | HOOK_PARSE
.init = map_init,
.destroy= map_destroy,
.parse = map_parse,
.read = map_read,
.size = sizeof(struct map)
}
};

View file

@ -17,40 +17,54 @@ struct print {
const char *uri;
};
static int hook_print(struct hook *h, int when, struct hook_info *j)
static int print_init(struct hook *h)
{
struct print *p = (struct print *) h->_vd;
switch (when) {
case HOOK_INIT:
p->output = stdout;
p->uri = NULL;
break;
struct print *p = h->_vd;
case HOOK_PATH_START:
if (p->uri) {
p->output = fopen(p->uri, "w+");
if (!p->output)
error("Failed to open file %s for writing", p->uri);
}
break;
case HOOK_PATH_STOP:
if (p->uri)
fclose(p->output);
break;
case HOOK_PARSE:
config_setting_lookup_string(h->cfg, "output", &p->uri);
break;
case HOOK_READ:
assert(j->samples);
p->output = stdout;
p->uri = NULL;
return 0;
}
static int print_start(struct hook *h)
{
struct print *p = h->_vd;
for (int i = 0; i < j->count; i++)
sample_fprint(p->output, j->samples[i], SAMPLE_ALL);
break;
if (p->uri) {
p->output = fopen(p->uri, "w+");
if (!p->output)
error("Failed to open file %s for writing", p->uri);
}
return 0;
}
static int print_stop(struct hook *h)
{
struct print *p = h->_vd;
if (p->uri)
fclose(p->output);
return 0;
}
static int print_parse(struct hook *h, config_setting_t *cfg)
{
struct print *p = h->_vd;
config_setting_lookup_string(cfg, "output", &p->uri);
return 0;
}
static int print_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
struct print *p = h->_vd;
for (int i = 0; i < *cnt; i++)
sample_fprint(p->output, smps[i], SAMPLE_ALL);
return 0;
}
@ -61,9 +75,12 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.size = sizeof(struct print),
.cb = hook_print,
.when = HOOK_STORAGE | HOOK_PARSE | HOOK_READ | HOOK_PATH
.init = print_init,
.parse = print_parse,
.start = print_start,
.stop = print_stop,
.read = print_read,
.size = sizeof(struct print)
}
};

View file

@ -12,27 +12,31 @@
#include "plugin.h"
#include "path.h"
static int hook_restart(struct hook *h, int when, struct hook_info *j)
static int restart_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
assert(j->samples);
assert(j->path);
assert(h->path);
for (int i = 0; i < *cnt; i++) {
h->last = smps[i];
for (int i = 0; i < j->count; i++) {
h->last = j->samples[i];
if (h->prev) {
if (h->last->sequence == 0 &&
h->prev->sequence <= UINT32_MAX - 32) {
warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u)",
path_name(j->path), h->prev->sequence, h->last->sequence);
path_name(h->path), h->prev->sequence, h->last->sequence);
path_run_hooks(j->path, HOOK_PATH_RESTART, &j->samples[i], j->count - i);
/* Run restart hooks */
for (size_t i = 0; i < list_length(&h->path->hooks); i++) {
struct hook *k = list_at(&h->path->hooks, i);
hook_restart(k);
}
}
}
h->prev = h->last;
}
return 0;
}
@ -42,8 +46,8 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 1,
.cb = hook_restart,
.when = HOOK_AUTO | HOOK_READ
.builtin = true,
.read = restart_read
}
};

View file

@ -1,118 +0,0 @@
/** Time shift hook.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
*********************************************************************************/
/** @addtogroup hooks Hook functions
* @{
*/
#include "hook.h"
#include "plugin.h"
#include "timing.h"
struct shift {
union {
struct timespec ts; /**< For SHIFT_TS_* modes. */
int seq; /**< For SHIFT_SEQUENCE mode. */
} offset;
enum {
SHIFT_TS_ORIGIN,
SHIFT_TS_RECEIVED,
SHIFT_TS_SENT,
SHIFT_SEQUENCE
} mode;
};
static int hook_shift(struct hook *h, int when, struct hook_info *j)
{
struct shift *p = (struct shift *) h->_vd;
const char *mode;
switch (when) {
case HOOK_INIT:
p->mode = SHIFT_TS_ORIGIN; /* Default mode */
break;
case HOOK_PARSE:
if (!h->cfg)
error("Missing configuration for hook: '%s'", plugin_name(h->_vt));
if (config_setting_lookup_string(h->cfg, "mode", &mode)) {
if (!strcmp(mode, "origin"))
p->mode = SHIFT_TS_ORIGIN;
else if (!strcmp(mode, "received"))
p->mode = SHIFT_TS_RECEIVED;
else if (!strcmp(mode, "sent"))
p->mode = SHIFT_TS_SENT;
else if (!strcmp(mode, "sequence"))
p->mode = SHIFT_SEQUENCE;
else
error("Invalid mode parameter '%s' for hook '%s'", mode, plugin_name(h->_vt));
}
switch (p->mode) {
case SHIFT_TS_ORIGIN:
case SHIFT_TS_RECEIVED:
case SHIFT_TS_SENT: {
double offset;
if (!config_setting_lookup_float(h->cfg, "offset", &offset))
cerror(h->cfg, "Missing setting 'offset' for hook '%s'", plugin_name(h->_vt));
p->offset.ts = time_from_double(offset);
break;
}
case SHIFT_SEQUENCE: {
int offset;
if (!config_setting_lookup_int(h->cfg, "offset", &offset))
cerror(h->cfg, "Missing setting 'offset' for hook '%s'", plugin_name(h->_vt));
p->offset.seq = offset;
break;
}
}
break;
case HOOK_READ:
for (int i = 0; i < j->count; i++) {
struct sample *s = j->samples[i];
switch (p->mode) {
case SHIFT_TS_ORIGIN:
s->ts.origin = time_add(&s->ts.origin, &p->offset.ts); break;
case SHIFT_TS_RECEIVED:
s->ts.received = time_add(&s->ts.received, &p->offset.ts); break;
case SHIFT_TS_SENT:
s->ts.origin = time_add(&s->ts.sent, &p->offset.ts); break;
case SHIFT_SEQUENCE:
s->sequence += p->offset.seq; break;
}
}
break;
}
return 0;
}
static struct plugin p = {
.name = "shift",
.description = "Shift the origin timestamp or sequence number of samples",
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.size = sizeof(struct shift),
.cb = hook_shift,
.when = HOOK_STORAGE | HOOK_READ
}
};
REGISTER_PLUGIN(&p)
/** @} */

52
lib/hooks/shift_seq.c Normal file
View file

@ -0,0 +1,52 @@
/** Shift sequence number of samples
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
*********************************************************************************/
/** @addtogroup hooks Hook functions
* @{
*/
#include "hook.h"
#include "plugin.h"
struct shift {
int offset;
};
static int shift_seq_parse(struct hook *h, config_setting_t *cfg)
{
struct shift *p = h->_vd;
if (!config_setting_lookup_int(cfg, "offset", &p->offset))
cerror(cfg, "Missing setting 'offset' for hook '%s'", plugin_name(h->_vt));
return 0;
}
static int shift_seq_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
struct shift *p = h->_vd;
for (int i = 0; i < *cnt; i++)
smps[i]->sequence += p->offset;
return 0;
}
static struct plugin p = {
.name = "shift_seq",
.description = "Shift sequence number of samples",
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.parse = shift_seq_parse,
.read = shift_seq_read,
.size = sizeof(struct shift),
}
};
REGISTER_PLUGIN(&p)
/** @} */

95
lib/hooks/shift_ts.c Normal file
View file

@ -0,0 +1,95 @@
/** Shift timestamps of samples.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
*********************************************************************************/
/** @addtogroup hooks Hook functions
* @{
*/
#include "hook.h"
#include "plugin.h"
#include "timing.h"
struct shift_ts {
struct timespec offset;
enum {
SHIFT_ORIGIN,
SHIFT_RECEIVED,
SHIFT_SENT,
} mode;
};
static int shift_ts_init(struct hook *h)
{
struct shift_ts *p = h->_vd;
p->mode = SHIFT_ORIGIN; /* Default mode */
return 0;
}
static int shift_ts_parse(struct hook *h, config_setting_t *cfg)
{
struct shift_ts *p = h->_vd;
const char *mode;
if (config_setting_lookup_string(cfg, "mode", &mode)) {
if (!strcmp(mode, "origin"))
p->mode = SHIFT_ORIGIN;
else if (!strcmp(mode, "received"))
p->mode = SHIFT_RECEIVED;
else if (!strcmp(mode, "sent"))
p->mode = SHIFT_SENT;
else
cerror(cfg, "Invalid mode parameter '%s' for hook '%s'", mode, plugin_name(h->_vt));
}
double offset;
if (!config_setting_lookup_float(cfg, "offset", &offset))
cerror(cfg, "Missing setting 'offset' for hook '%s'", plugin_name(h->_vt));
p->offset = time_from_double(offset);
return 0;
}
static int shift_ts_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
struct shift_ts *p = h->_vd;
for (int i = 0; i < *cnt; i++) {
struct sample *s = smps[i];
struct timespec *ts;
switch (p->mode) {
case SHIFT_ORIGIN: ts = &s->ts.origin; break;
case SHIFT_RECEIVED: ts = &s->ts.received; break;
case SHIFT_SENT: ts = &s->ts.sent; break;
default: return -1;
}
*ts = time_add(ts, &p->offset); break;
}
return 0;
}
static struct plugin p = {
.name = "shift_ts",
.description = "Shift timestamps of samples",
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.init = shift_ts_init,
.parse = shift_ts_parse,
.read = shift_ts_read,
.size = sizeof(struct shift_ts)
}
};
REGISTER_PLUGIN(&p)
/** @} */

View file

@ -36,85 +36,84 @@ struct skip_first {
};
};
static int hook_skip_first(struct hook *h, int when, struct hook_info *j)
static int skip_first_parse(struct hook *h, config_setting_t *cfg)
{
struct skip_first *p = (struct skip_first *) h->_vd;
struct skip_first *p = h->_vd;
switch (when) {
case HOOK_PARSE: {
double seconds;
if (!h->cfg)
error("Missing configuration for hook: '%s'", plugin_name(h->_vt));
if (config_setting_lookup_float(h->cfg, "seconds", &seconds)) {
p->seconds.wait = time_from_double(seconds);
p->mode = HOOK_SKIP_MODE_SECONDS;
}
else if (config_setting_lookup_int(h->cfg, "samples", &p->samples.wait)) {
p->mode = HOOK_SKIP_MODE_SAMPLES;
}
else
cerror(h->cfg, "Missing setting 'seconds' or 'samples' for hook '%s'", plugin_name(h->_vt));
double seconds;
break;
if (config_setting_lookup_float(cfg, "seconds", &seconds)) {
p->seconds.wait = time_from_double(seconds);
p->mode = HOOK_SKIP_MODE_SECONDS;
}
else if (config_setting_lookup_int(cfg, "samples", &p->samples.wait)) {
p->mode = HOOK_SKIP_MODE_SAMPLES;
}
else
cerror(cfg, "Missing setting 'seconds' or 'samples' for hook '%s'", plugin_name(h->_vt));
return 0;
}
static int skip_first_restart(struct hook *h)
{
struct skip_first *p = h->_vd;
p->state = HOOK_SKIP_FIRST_STATE_STARTED;
return 0;
}
static int skip_first_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
struct skip_first *p = h->_vd;
if (p->state == HOOK_SKIP_FIRST_STATE_STARTED) {
switch (p->mode) {
case HOOK_SKIP_MODE_SAMPLES:
p->samples.until = smps[0]->sequence + p->samples.wait;
break;
case HOOK_SKIP_MODE_SECONDS:
p->seconds.until = time_add(&smps[0]->ts.received, &p->seconds.wait);
break;
}
case HOOK_PATH_START:
case HOOK_PATH_RESTART:
p->state = HOOK_SKIP_FIRST_STATE_STARTED;
break;
case HOOK_READ:
assert(j->samples);
if (p->state == HOOK_SKIP_FIRST_STATE_STARTED) {
switch (p->mode) {
case HOOK_SKIP_MODE_SAMPLES:
p->samples.until = j->samples[0]->sequence + p->samples.wait;
break;
case HOOK_SKIP_MODE_SECONDS:
p->seconds.until = time_add(&j->samples[0]->ts.received, &p->seconds.wait);
break;
}
p->state = HOOK_SKIP_FIRST_STATE_SKIPPING;
}
int i, ok;
for (i = 0, ok = 0; i < j->count; i++) {
bool skip;
switch (p->mode) {
case HOOK_SKIP_MODE_SAMPLES:
skip = p->samples.until >= j->samples[i]->sequence;
break;
case HOOK_SKIP_MODE_SECONDS:
skip = time_delta(&p->seconds.until, &j->samples[i]->ts.received) < 0;
break;
default:
skip = false;
}
if (!skip) {
struct sample *tmp;
tmp = j->samples[i];
j->samples[i] = j->samples[ok];
j->samples[ok++] = tmp;
}
/* To discard the first X samples in 'smps[]' we must
* shift them to the end of the 'smps[]' array.
* In case the hook returns a number 'ok' which is smaller than 'cnt',
* only the first 'ok' samples in 'smps[]' are accepted and further processed.
*/
}
j->count = ok;
p->state = HOOK_SKIP_FIRST_STATE_SKIPPING;
}
int i, ok;
for (i = 0, ok = 0; i < *cnt; i++) {
bool skip;
switch (p->mode) {
case HOOK_SKIP_MODE_SAMPLES:
skip = p->samples.until >= smps[i]->sequence;
break;
case HOOK_SKIP_MODE_SECONDS:
skip = time_delta(&p->seconds.until, &smps[i]->ts.received) < 0;
break;
default:
skip = false;
}
if (!skip) {
struct sample *tmp;
tmp = smps[i];
smps[i] = smps[ok];
smps[ok++] = tmp;
}
/* To discard the first X samples in 'smps[]' we must
* shift them to the end of the 'smps[]' array.
* In case the hook returns a number 'ok' which is smaller than 'cnt',
* only the first 'ok' samples in 'smps[]' are accepted and further processed.
*/
}
*cnt = ok;
return 0;
}
@ -124,9 +123,11 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.size = sizeof(struct skip_first),
.cb = hook_skip_first,
.when = HOOK_STORAGE | HOOK_PARSE | HOOK_READ | HOOK_PATH
.parse = skip_first_parse,
.start = skip_first_restart,
.restart = skip_first_restart,
.read = skip_first_read,
.size = sizeof(struct skip_first)
}
};

View file

@ -13,7 +13,7 @@
#include "stats.h"
#include "path.h"
struct stats_hook {
struct stats_collect {
struct stats stats;
enum stats_format format;
@ -23,128 +23,202 @@ struct stats_hook {
const char *uri;
};
static int hook_stats(struct hook *h, int when, struct hook_info *j)
static int stats_collect_init(struct hook *h)
{
struct stats_hook *p = (struct stats_hook *) h->_vd;
struct stats_collect *p = h->_vd;
stats_init(&p->stats);
/* Register statistic object to path.
*
* This allows the path code to update statistics. */
if (h->path)
h->path->stats = &p->stats;
switch (when) {
case HOOK_INIT:
stats_init(&p->stats);
/* Register statistic object to path.
*
* This allows the path code to update statistics. */
if (j->path)
j->path->stats = &p->stats;
/* Set default values */
p->format = STATS_FORMAT_HUMAN;
p->verbose = 0;
p->uri = NULL;
p->output = stdout;
break;
case HOOK_PARSE: {
const char *format;
if (config_setting_lookup_string(h->cfg, "format", &format)) {
if (!strcmp(format, "human"))
p->format = STATS_FORMAT_HUMAN;
else if (!strcmp(format, "json"))
p->format = STATS_FORMAT_JSON;
else if (!strcmp(format, "matlab"))
p->format = STATS_FORMAT_MATLAB;
else
cerror(h->cfg, "Invalid statistic output format: %s", format);
}
config_setting_lookup_int(h->cfg, "verbose", &p->verbose);
config_setting_lookup_string(h->cfg, "output", &p->uri);
/* Set default values */
p->format = STATS_FORMAT_HUMAN;
p->verbose = 0;
p->uri = NULL;
p->output = stdout;
return 0;
}
break;
}
static int stats_collect_destroy(struct hook *h)
{
struct stats_collect *p = h->_vd;
stats_destroy(&p->stats);
return 0;
}
case HOOK_DESTROY:
stats_destroy(&p->stats);
break;
case HOOK_READ:
assert(j->samples);
stats_collect(p->stats.delta, j->samples, j->count);
stats_commit(&p->stats, p->stats.delta);
break;
case HOOK_PATH_START:
if (p->uri) {
p->output = fopen(p->uri, "w+");
if (!p->output)
error("Failed to open file %s for writing", p->uri);
}
break;
case HOOK_PATH_STOP:
stats_print(&p->stats, p->output, p->format, p->verbose);
if (p->uri)
fclose(p->output);
break;
case HOOK_PATH_RESTART:
stats_reset(&p->stats);
break;
case HOOK_PERIODIC:
assert(j->path);
stats_print_periodic(&p->stats, p->output, p->format, p->verbose, j->path);
break;
static int stats_collect_start(struct hook *h)
{
struct stats_collect *p = h->_vd;
if (p->uri) {
p->output = fopen(p->uri, "w+");
if (!p->output)
error("Failed to open file %s for writing", p->uri);
}
return 0;
}
static int stats_collect_stop(struct hook *h)
{
struct stats_collect *p = h->_vd;
stats_print(&p->stats, p->output, p->format, p->verbose);
if (p->uri)
fclose(p->output);
return 0;
}
static int stats_collect_restart(struct hook *h)
{
struct stats_collect *p = h->_vd;
stats_reset(&p->stats);
return 0;
}
static int stats_collect_periodic(struct hook *h)
{
struct stats_collect *p = h->_vd;
stats_print_periodic(&p->stats, p->output, p->format, p->verbose, h->path);
return 0;
}
static int stats_collect_parse(struct hook *h, config_setting_t *cfg)
{
struct stats_collect *p = h->_vd;
const char *format;
if (config_setting_lookup_string(cfg, "format", &format)) {
if (!strcmp(format, "human"))
p->format = STATS_FORMAT_HUMAN;
else if (!strcmp(format, "json"))
p->format = STATS_FORMAT_JSON;
else if (!strcmp(format, "matlab"))
p->format = STATS_FORMAT_MATLAB;
else
cerror(cfg, "Invalid statistic output format: %s", format);
}
config_setting_lookup_int(cfg, "verbose", &p->verbose);
config_setting_lookup_string(cfg, "output", &p->uri);
return 0;
}
static int stats_collect_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
struct stats_collect *p = h->_vd;
stats_collect(p->stats.delta, smps, *cnt);
stats_commit(&p->stats, p->stats.delta);
return 0;
}
struct stats_send {
struct node *dest;
struct stats *stats;
int ratio;
enum {
STATS_SEND_MODE_PERIODIC,
STATS_SEND_MODE_READ
} mode;
int decimation;
};
/** @todo This is untested */
static int hook_stats_send(struct hook *h, int when, struct hook_info *j)
static int stats_send_init(struct hook *h)
{
struct stats_send *p = (struct stats_send *) h->_vd;
struct stats_send *p = h->_vd;
p->decimation = 1;
p->mode = STATS_SEND_MODE_PERIODIC;
switch (when) {
case HOOK_INIT:
assert(j->nodes);
assert(j->path);
if (!h->cfg)
error("Missing configuration for hook '%s'", plugin_name(h->_vt));
const char *dest;
if (!config_setting_lookup_string(h->cfg, "destination", &dest))
cerror(h->cfg, "Missing setting 'destination' for hook '%s'", plugin_name(h->_vt));
p->dest = list_lookup(j->nodes, dest);
if (!p->dest)
cerror(h->cfg, "Invalid destination node '%s' for hook '%s'", dest, plugin_name(h->_vt));
break;
case HOOK_PATH_START:
node_start(p->dest);
break;
return 0;
}
case HOOK_PATH_STOP:
node_stop(p->dest);
break;
static int stats_send_parse(struct hook *h, config_setting_t *cfg)
{
struct stats_send *p = h->_vd;
case HOOK_READ:
stats_send(p->stats, p->dest);
break;
assert(h->path && h->path->super_node);
const char *dest, *mode;
if (config_setting_lookup_string(cfg, "destination", &dest)) {
p->dest = list_lookup(&h->path->super_node->nodes, dest);
if (!p->dest)
cerror(cfg, "Invalid destination node '%s' for hook '%s'", dest, plugin_name(h->_vt));
}
else
cerror(cfg, "Missing setting 'destination' for hook '%s'", plugin_name(h->_vt));
if (config_setting_lookup_string(cfg, "destination", &mode)) {
if (!strcmp(mode, "periodic"))
p->mode = STATS_SEND_MODE_PERIODIC;
else if (!strcmp(mode, "read"))
p->mode = STATS_SEND_MODE_READ;
else
cerror(cfg, "Invalid value '%s' for setting 'mode' of hook '%s'", mode, plugin_name(h->_vt));
}
config_setting_lookup_int(cfg, "decimation", &p->decimation);
return 0;
}
static int stats_send_start(struct hook *h)
{
struct stats_send *p = h->_vd;
if (p->dest->state != STATE_STOPPED)
node_start(p->dest);
return 0;
}
static int stats_send_stop(struct hook *h)
{
struct stats_send *p = h->_vd;
if (p->dest->state != STATE_STOPPED)
node_stop(p->dest);
return 0;
}
static int stats_send_periodic(struct hook *h)
{
struct stats_send *p = h->_vd;
if (p->mode == STATS_SEND_MODE_PERIODIC)
stats_send(h->path->stats, p->dest);
return 0;
}
static int stats_send_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
struct stats_send *p = h->_vd;
assert(h->path->stats);
if (p->mode == STATS_SEND_MODE_READ) {
size_t processed = h->path->stats->histograms[STATS_OWD].total;
if (processed % p->decimation == 0)
stats_send(h->path->stats, p->dest);
}
return 0;
@ -156,9 +230,15 @@ static struct plugin p1 = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 2,
.size = sizeof(struct stats_hook),
.cb = hook_stats,
.when = HOOK_STORAGE | HOOK_PARSE | HOOK_PATH | HOOK_READ | HOOK_PERIODIC
.init = stats_collect_init,
.destroy= stats_collect_destroy,
.start = stats_collect_start,
.stop = stats_collect_stop,
.read = stats_collect_read,
.restart= stats_collect_restart,
.periodic= stats_collect_periodic,
.parse = stats_collect_parse,
.size = sizeof(struct stats_collect),
}
};
@ -168,8 +248,13 @@ static struct plugin p2 = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.cb = hook_stats_send,
.when = HOOK_STORAGE | HOOK_PATH | HOOK_READ
.init = stats_send_init,
.parse = stats_send_parse,
.start = stats_send_start,
.stop = stats_send_stop,
.periodic= stats_send_periodic,
.read = stats_send_read,
.size = sizeof(struct stats_send)
}
};

View file

@ -12,24 +12,22 @@
#include "plugin.h"
#include "timing.h"
static int hook_ts(struct hook *h, int when, struct hook_info *j)
static int ts_read(struct hook *h, struct sample *smps[], size_t *cnt)
{
assert(j->samples);
for (int i = 0; i < j->count; i++)
j->samples[i]->ts.origin = j->samples[i]->ts.received;
for (int i = 0; i < *cnt; i++)
smps[i]->ts.origin = smps[i]->ts.received;
return 0;
}
static struct plugin p = {
.name = "ts",
.description = "Update timestamp of message with current time",
.description = "Overwrite origin timestamp of samples with receive timestamp",
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.cb = hook_ts,
.when = HOOK_READ
.read = ts_read,
.size = 0
}
};

View file

@ -61,12 +61,15 @@ static int hook_parse_cli(struct hook *h, char *params[], int paramlen)
static void usage()
{
printf("Usage: villas-hook [OPTIONS] NAME [PARAM] \n");
printf(" NAME the name of the hook function to run\n");
printf(" PARAM a string of configuration settings for the hook\n\n");
printf(" PARAM a string of configuration settings for the hook\n");
printf(" OPTIONS are:\n");
printf(" -h show this help\n");
printf(" -d LVL set debug level to LVL\n");
printf(" -v CNT process CNT samples at once\n");
printf(" NAME the name of the hook function\n\n");
printf("The following hook functions are supported:\n");
plugin_dump(PLUGIN_TYPE_HOOK);
printf("\n");
printf("Example:");
printf(" villas-signal random | villas-hook skip_first seconds=10\n");
@ -77,7 +80,9 @@ static void usage()
int main(int argc, char *argv[])
{
int ret, level, cnt;
int ret, level;
size_t cnt, recv;
/* Default values */
level = V;
@ -88,9 +93,9 @@ int main(int argc, char *argv[])
struct log log;
struct plugin *p;
struct sample *samples[cnt];
struct pool pool = { .state = STATE_DESTROYED };
struct pool q = { .state = STATE_DESTROYED };
struct hook h = { .state = STATE_DESTROYED };
struct hook_info hi = { .samples = samples };
char c;
while ((c = getopt(argc, argv, "hv:d:")) != -1) {
@ -109,6 +114,8 @@ int main(int argc, char *argv[])
}
log_init(&log, level, LOG_ALL);
log_start(&log);
memory_init(DEFAULT_NR_HUGEPAGES);
if (argc < optind + 1) {
@ -119,41 +126,47 @@ int main(int argc, char *argv[])
if (cnt < 1)
error("Vectorize option must be greater than 0");
ret = pool_init(&pool, 10 * cnt, SAMPLE_LEN(DEFAULT_VALUES), &memtype_hugepage);
ret = pool_init(&q, 10 * cnt, SAMPLE_LEN(DEFAULT_VALUES), &memtype_hugepage);
if (ret)
error("Failed to initilize memory pool");
name = argv[optind];
name = argv[optind];
p = plugin_lookup(PLUGIN_TYPE_HOOK, name);
if (!p)
error("Unknown hook function '%s'", argv[optind]);
error("Unknown hook function '%s'", name);
config_init(&cfg);
hook_init(&h, &p->hook, NULL);
hook_parse_cli(&h, &argv[optind + 1], argc - optind - 1);
hook_run(&h, HOOK_PATH_START, &hi);
ret = hook_init(&h, &p->hook, NULL);
if (ret)
error("Failed to initialize hook");
ret = hook_parse_cli(&h, &argv[optind + 1], argc - optind - 1);
if (ret)
error("Failed to parse hook config");
hook_start(&h);
while (!feof(stdin)) {
ret = sample_alloc(&pool, samples, cnt);
ret = sample_alloc(&q, samples, cnt);
if (ret != cnt)
error("Failed to allocate %u samples from pool", cnt);
error("Failed to allocate %zu samples from pool", cnt);
hi.count = 0;
recv = 0;
for (int j = 0; j < cnt && !feof(stdin); j++) {
ret = sample_fscan(stdin, hi.samples[j], NULL);
if (ret < 0)
break;
hi.samples[j]->ts.received = time_now();
hi.count++;
samples[j]->ts.received = time_now();
recv++;
}
debug(15, "Read %d samples from stdin", cnt);
debug(15, "Read %zu samples from stdin", recv);
hook_run(&h, HOOK_READ, &hi);
hook_run(&h, HOOK_WRITE, &hi);
hook_read(&h, samples, &recv);
hook_write(&h, samples, &recv);
for (int j = 0; j < hi.count; j++)
sample_fprint(stdout, hi.samples[j], SAMPLE_ALL);
@ -162,13 +175,12 @@ int main(int argc, char *argv[])
sample_free(samples, cnt);
}
hook_run(&h, HOOK_PATH_STOP, &hi);
hook_stop(&h);
hook_destroy(&h);
config_destroy(&cfg);
sample_free(samples, cnt);
pool_destroy(&pool);
pool_destroy(&q);
return 0;
}