1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

use name method for storing hook private data as for nodes

This commit is contained in:
Steffen Vogel 2017-03-20 09:13:01 -03:00
parent 0c5204ccc3
commit 8613ee688e
7 changed files with 247 additions and 120 deletions

View file

@ -30,8 +30,10 @@ int hook_init(struct hook *h, struct hook_type *vt, struct super_node *sn)
assert(h->state == STATE_DESTROYED);
h->_vt = vt;
h->priority = vt->priority;
h->_vt = vt;
h->_vd = alloc(vt->size);
ret = hook_run(h, HOOK_INIT, &i);
if (ret)
@ -51,11 +53,16 @@ int hook_parse(struct hook *h, config_setting_t *cfg)
h->cfg = cfg;
config_setting_lookup_int(h->cfg, "priority", &h->priority);
if (h->_vt->when & HOOK_PARSE) {
if (!h->cfg)
error("Missing configuration for hook: '%s'", plugin_name(h->_vt));
/* Parse hook arguments */
ret = hook_run(h, HOOK_PARSE, NULL);
if (ret)
return ret;
/* Parse hook arguments */
ret = hook_run(h, HOOK_PARSE, NULL);
if (ret)
return ret;
}
h->state = STATE_PARSED;
@ -72,6 +79,9 @@ int hook_destroy(struct hook *h)
if (ret)
return ret;
if (h->_vd)
free(h->_vd);
h->state = HOOK_DESTROYED;
return 0;
@ -92,29 +102,6 @@ int hook_run(struct hook *h, int when, struct hook_info *i)
return h->_vt->when & when ? h->_vt->cb(h, when, i) : 0;
}
void * hook_storage(struct hook *h, int when, size_t len, ctor_cb_t ctor, dtor_cb_t dtor)
{
switch (when) {
case HOOK_INIT:
h->_vd = alloc(len);
if (ctor)
ctor(h->_vd);
break;
case HOOK_DESTROY:
if (dtor)
dtor(h->_vd);
free(h->_vd);
h->_vd = NULL;
break;
}
return h->_vd;
}
int hook_parse_list(struct list *list, config_setting_t *cfg, struct super_node *sn)
{
struct hook h;

View file

@ -11,37 +11,37 @@
#include "hook.h"
#include "plugin.h"
struct convert {
enum {
TO_FIXED,
TO_FLOAT
} mode;
};
static int hook_convert(struct hook *h, int when, struct hook_info *j)
{
struct {
enum {
TO_FIXED,
TO_FLOAT
} mode;
} *private = hook_storage(h, when, sizeof(*private), NULL, NULL);
const char *mode;
struct convert *p = (struct convert *) h->_vd;
switch (when) {
case HOOK_PARSE:
if (!h->cfg)
error("Missing configuration for hook: '%s'", plugin_name(h->_vt));
case HOOK_PARSE: {
const char *mode;
if (!config_setting_lookup_string(h->cfg, "mode", &mode))
cerror(h->cfg, "Missing setting 'mode' for hook '%s'", plugin_name(h->_vt));
if (!strcmp(mode, "fixed"))
private->mode = TO_FIXED;
p->mode = TO_FIXED;
else if (!strcmp(mode, "float"))
private->mode = TO_FLOAT;
p->mode = TO_FLOAT;
else
error("Invalid parameter '%s' for hook 'convert'", mode);
break;
}
case HOOK_READ:
for (int i = 0; i < j->count; i++) {
for (int k = 0; k < j->samples[i]->length; k++) {
switch (private->mode) {
switch (p->mode) {
case TO_FIXED: j->samples[i]->data[k].i = j->samples[i]->data[k].f * 1e3; break;
case TO_FLOAT: j->samples[i]->data[k].f = j->samples[i]->data[k].i; break;
}
@ -60,6 +60,7 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.size = sizeof(struct convert),
.cb = hook_convert,
.when = HOOK_STORAGE | HOOK_PARSE | HOOK_READ
}

View file

@ -11,23 +11,25 @@
#include "hook.h"
#include "plugin.h"
struct decimate {
int ratio;
unsigned counter;
};
static int hook_decimate(struct hook *h, int when, struct hook_info *j)
{
struct {
int ratio;
unsigned counter;
} *private = hook_storage(h, when, sizeof(*private), NULL, NULL);
struct decimate *p = (struct decimate *) h->_vd;
switch (when) {
case HOOK_INIT:
private->counter = 0;
p->counter = 0;
break;
case HOOK_PARSE:
if (!h->cfg)
error("Missing configuration for hook: '%s'", plugin_name(h->_vt));
if (!config_setting_lookup_int(h->cfg, "ratio", &private->ratio))
if (!config_setting_lookup_int(h->cfg, "ratio", &p->ratio))
cerror(h->cfg, "Missing setting 'ratio' for hook '%s'", plugin_name(h->_vt));
break;
@ -37,7 +39,7 @@ static int hook_decimate(struct hook *h, int when, struct hook_info *j)
int i, ok;
for (i = 0, ok = 0; i < j->count; i++) {
if (private->counter++ % private->ratio == 0) {
if (p->counter++ % p->ratio == 0) {
struct sample *tmp;
tmp = j->samples[ok];
@ -58,6 +60,7 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.size = sizeof(struct decimate),
.cb = hook_decimate,
.when = HOOK_STORAGE | HOOK_PARSE | HOOK_DESTROY | HOOK_READ
}

View file

@ -12,12 +12,45 @@
#include "plugin.h"
#include "sample.h"
struct print {
FILE *output;
const char *uri;
};
static int hook_print(struct hook *h, int when, struct hook_info *j)
{
assert(j->samples);
struct print *p = (struct print *) h->_vd;
for (int i = 0; i < j->count; i++)
sample_fprint(stdout, j->samples[i], SAMPLE_ALL);
switch (when) {
case HOOK_INIT:
p->output = stdout;
p->uri = NULL;
break;
case HOOK_PATH_START:
if (p->uri) {
p->output = fopen(p->uri, "w+");
if (!p->output)
error("Failed to open file %s for writing", p->uri);
}
break;
case HOOK_PATH_STOP:
if (p->uri)
fclose(p->output);
break;
case HOOK_PARSE:
config_setting_lookup_string(h->cfg, "output", &p->uri);
break;
case HOOK_READ:
assert(j->samples);
for (int i = 0; i < j->count; i++)
sample_fprint(p->output, j->samples[i], SAMPLE_ALL);
break;
}
return j->count;
}
@ -28,8 +61,9 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.size = sizeof(struct print),
.cb = hook_print,
.when = HOOK_READ
.when = HOOK_STORAGE | HOOK_PARSE | HOOK_READ | HOOK_PATH
}
};

View file

@ -12,27 +12,28 @@
#include "plugin.h"
#include "timing.h"
struct shift {
union {
struct timespec ts; /**< For SHIFT_TS_* modes. */
int seq; /**< For SHIFT_SEQUENCE mode. */
} offset;
enum {
SHIFT_TS_ORIGIN,
SHIFT_TS_RECEIVED,
SHIFT_TS_SENT,
SHIFT_SEQUENCE
} mode;
};
static int hook_shift(struct hook *h, int when, struct hook_info *j)
{
struct {
union {
struct timespec ts; /**< For SHIFT_TS_* modes. */
int seq; /**< For SHIFT_SEQUENCE mode. */
} offset;
enum {
SHIFT_TS_ORIGIN,
SHIFT_TS_RECEIVED,
SHIFT_TS_SENT,
SHIFT_SEQUENCE
} mode;
} *private = hook_storage(h, when, sizeof(*private), NULL, NULL);
struct shift *p = (struct shift *) h->_vd;
const char *mode;
switch (when) {
case HOOK_INIT:
private->mode = SHIFT_TS_ORIGIN; /* Default mode */
p->mode = SHIFT_TS_ORIGIN; /* Default mode */
break;
case HOOK_PARSE:
@ -41,18 +42,18 @@ static int hook_shift(struct hook *h, int when, struct hook_info *j)
if (config_setting_lookup_string(h->cfg, "mode", &mode)) {
if (!strcmp(mode, "origin"))
private->mode = SHIFT_TS_ORIGIN;
p->mode = SHIFT_TS_ORIGIN;
else if (!strcmp(mode, "received"))
private->mode = SHIFT_TS_RECEIVED;
p->mode = SHIFT_TS_RECEIVED;
else if (!strcmp(mode, "sent"))
private->mode = SHIFT_TS_SENT;
p->mode = SHIFT_TS_SENT;
else if (!strcmp(mode, "sequence"))
private->mode = SHIFT_SEQUENCE;
p->mode = SHIFT_SEQUENCE;
else
error("Invalid mode parameter '%s' for hook '%s'", mode, plugin_name(h->_vt));
}
switch (private->mode) {
switch (p->mode) {
case SHIFT_TS_ORIGIN:
case SHIFT_TS_RECEIVED:
case SHIFT_TS_SENT: {
@ -61,7 +62,7 @@ static int hook_shift(struct hook *h, int when, struct hook_info *j)
if (!config_setting_lookup_float(h->cfg, "offset", &offset))
cerror(h->cfg, "Missing setting 'offset' for hook '%s'", plugin_name(h->_vt));
private->offset.ts = time_from_double(offset);
p->offset.ts = time_from_double(offset);
break;
}
@ -71,7 +72,7 @@ static int hook_shift(struct hook *h, int when, struct hook_info *j)
if (!config_setting_lookup_int(h->cfg, "offset", &offset))
cerror(h->cfg, "Missing setting 'offset' for hook '%s'", plugin_name(h->_vt));
private->offset.seq = offset;
p->offset.seq = offset;
break;
}
}
@ -82,15 +83,15 @@ static int hook_shift(struct hook *h, int when, struct hook_info *j)
for (int i = 0; i < j->count; i++) {
struct sample *s = j->samples[i];
switch (private->mode) {
switch (p->mode) {
case SHIFT_TS_ORIGIN:
s->ts.origin = time_add(&s->ts.origin, &private->offset.ts); break;
s->ts.origin = time_add(&s->ts.origin, &p->offset.ts); break;
case SHIFT_TS_RECEIVED:
s->ts.received = time_add(&s->ts.received, &private->offset.ts); break;
s->ts.received = time_add(&s->ts.received, &p->offset.ts); break;
case SHIFT_TS_SENT:
s->ts.origin = time_add(&s->ts.sent, &private->offset.ts); break;
s->ts.origin = time_add(&s->ts.sent, &p->offset.ts); break;
case SHIFT_SEQUENCE:
s->sequence += private->offset.seq; break;
s->sequence += p->offset.seq; break;
}
}
@ -106,6 +107,7 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.size = sizeof(struct shift),
.cb = hook_shift,
.when = HOOK_STORAGE | HOOK_READ
}

View file

@ -14,55 +14,95 @@
#include "plugin.h"
#include "timing.h"
struct skip_first {
enum {
HOOK_SKIP_FIRST_STATE_STARTED, /**< Path just started. First sample not received yet. */
HOOK_SKIP_FIRST_STATE_SKIPPING, /**< First sample received. Skipping samples now. */
HOOK_SKIP_FIRST_STATE_NORMAL, /**< All samples skipped. Normal operation. */
} state;
enum {
HOOK_SKIP_MODE_SECONDS,
HOOK_SKIP_MODE_SAMPLES
} mode;
union {
struct {
struct timespec until;
struct timespec wait; /**< Absolute point in time from where we accept samples. */
} seconds;
struct {
int until;
int wait;
} samples;
};
};
static int hook_skip_first(struct hook *h, int when, struct hook_info *j)
{
struct {
struct timespec skip; /**< Time to wait until first message is not skipped */
struct timespec until; /**< Absolute point in time from where we accept samples. */
enum {
HOOK_SKIP_FIRST_STATE_STARTED, /**< Path just started. First sample not received yet. */
HOOK_SKIP_FIRST_STATE_SKIPPING, /**< First sample received. Skipping samples now. */
HOOK_SKIP_FIRST_STATE_NORMAL, /**< All samples skipped. Normal operation. */
} state;
} *private = hook_storage(h, when, sizeof(*private), NULL, NULL);
struct skip_first *p = (struct skip_first *) h->_vd;
switch (when) {
case HOOK_PARSE: {
double wait;
double seconds;
if (!h->cfg)
error("Missing configuration for hook: '%s'", plugin_name(h->_vt));
if (!config_setting_lookup_float(h->cfg, "seconds", &wait))
cerror(h->cfg, "Missing setting 'seconds' for hook '%s'", plugin_name(h->_vt));
private->skip = time_from_double(wait);
if (config_setting_lookup_float(h->cfg, "seconds", &seconds)) {
p->seconds.wait = time_from_double(seconds);
p->mode = HOOK_SKIP_MODE_SECONDS;
}
else if (config_setting_lookup_int(h->cfg, "samples", &p->samples.wait)) {
p->mode = HOOK_SKIP_MODE_SAMPLES;
}
else
cerror(h->cfg, "Missing setting 'seconds' or 'samples' for hook '%s'", plugin_name(h->_vt));
break;
}
case HOOK_PATH_START:
case HOOK_PATH_RESTART:
private->state = HOOK_SKIP_FIRST_STATE_STARTED;
p->state = HOOK_SKIP_FIRST_STATE_STARTED;
break;
case HOOK_READ:
assert(j->samples);
if (private->state == HOOK_SKIP_FIRST_STATE_STARTED) {
private->until = time_add(&j->samples[0]->ts.received, &private->skip);
private->state = HOOK_SKIP_FIRST_STATE_SKIPPING;
if (p->state == HOOK_SKIP_FIRST_STATE_STARTED) {
switch (p->mode) {
case HOOK_SKIP_MODE_SAMPLES:
p->samples.until = j->samples[0]->sequence + p->samples.wait;
break;
case HOOK_SKIP_MODE_SECONDS:
p->seconds.until = time_add(&j->samples[0]->ts.received, &p->seconds.wait);
break;
}
p->state = HOOK_SKIP_FIRST_STATE_SKIPPING;
}
int i, ok;
for (i = 0, ok = 0; i < j->count; i++) {
if (time_delta(&private->until, &j->samples[i]->ts.received) > 0) {
bool skip;
switch (p->mode) {
case HOOK_SKIP_MODE_SAMPLES:
skip = p->samples.until >= j->samples[i]->sequence;
break;
case HOOK_SKIP_MODE_SECONDS:
skip = time_delta(&p->seconds.until, &j->samples[i]->ts.received) < 0;
break;
default:
skip = false;
}
if (!skip) {
struct sample *tmp;
tmp = j->samples[i];
j->samples[i] = j->samples[ok];
j->samples[ok++] = tmp;
}
/* To discard the first X samples in 'smps[]' we must
@ -84,6 +124,7 @@ static struct plugin p = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 99,
.size = sizeof(struct skip_first),
.cb = hook_skip_first,
.when = HOOK_STORAGE | HOOK_PARSE | HOOK_READ | HOOK_PATH
}

View file

@ -13,50 +13,108 @@
#include "stats.h"
#include "path.h"
struct stats_hook {
struct stats stats;
enum stats_format format;
int verbose;
FILE *output;
const char *uri;
};
static int hook_stats(struct hook *h, int when, struct hook_info *j)
{
struct stats *s = hook_storage(h, when, sizeof(struct stats), (ctor_cb_t) stats_init, (dtor_cb_t) stats_destroy);
struct stats_hook *p = (struct stats_hook *) h->_vd;
switch (when) {
case HOOK_INIT:
stats_init(&p->stats);
/* Register statistic object to path.
*
* This allows the path code to update statistics. */
if (j->path)
j->path->stats = s;
j->path->stats = &p->stats;
/* Set default values */
p->format = STATS_FORMAT_HUMAN;
p->verbose = 0;
p->uri = NULL;
p->output = stdout;
break;
case HOOK_PARSE: {
const char *format;
if (config_setting_lookup_string(h->cfg, "format", &format)) {
if (!strcmp(format, "human"))
p->format = STATS_FORMAT_HUMAN;
else if (!strcmp(format, "json"))
p->format = STATS_FORMAT_JSON;
else if (!strcmp(format, "matlab"))
p->format = STATS_FORMAT_MATLAB;
else
cerror(h->cfg, "Invalid statistic output format: %s", format);
}
config_setting_lookup_int(h->cfg, "verbose", &p->verbose);
config_setting_lookup_string(h->cfg, "output", &p->uri);
break;
}
case HOOK_DESTROY:
stats_destroy(&p->stats);
break;
case HOOK_READ:
assert(j->samples);
stats_collect(s->delta, j->samples, j->count);
stats_commit(s, s->delta);
stats_collect(p->stats.delta, j->samples, j->count);
stats_commit(&p->stats, p->stats.delta);
break;
case HOOK_PATH_START:
if (p->uri) {
p->output = fopen(p->uri, "w+");
if (!p->output)
error("Failed to open file %s for writing", p->uri);
}
break;
case HOOK_PATH_STOP:
stats_print(s, 1);
printf("%s", json_dumps(stats_json(s), 0));
stats_print(&p->stats, p->output, p->format, p->verbose);
if (p->uri)
fclose(p->output);
break;
case HOOK_PATH_RESTART:
stats_reset(s);
stats_reset(&p->stats);
break;
case HOOK_PERIODIC:
assert(j->path);
stats_print_periodic(s, j->path);
stats_print_periodic(&p->stats, p->output, p->format, p->verbose, j->path);
break;
}
return j->count;
}
struct stats_send {
struct node *dest;
struct stats *stats;
int ratio;
};
/** @todo This is untested */
static int hook_stats_send(struct hook *h, int when, struct hook_info *j)
{
struct private {
struct node *dest;
struct stats *stats;
int ratio;
} *private = hook_storage(h, when, sizeof(*private), NULL, NULL);
struct stats_send *p = (struct stats_send *) h->_vd;
switch (when) {
case HOOK_INIT:
@ -71,21 +129,21 @@ static int hook_stats_send(struct hook *h, int when, struct hook_info *j)
if (!config_setting_lookup_string(h->cfg, "destination", &dest))
cerror(h->cfg, "Missing setting 'destination' for hook '%s'", plugin_name(h->_vt));
private->dest = list_lookup(j->nodes, dest);
if (!private->dest)
p->dest = list_lookup(j->nodes, dest);
if (!p->dest)
cerror(h->cfg, "Invalid destination node '%s' for hook '%s'", dest, plugin_name(h->_vt));
break;
case HOOK_PATH_START:
node_start(private->dest);
node_start(p->dest);
break;
case HOOK_PATH_STOP:
node_stop(private->dest);
node_stop(p->dest);
break;
case HOOK_READ:
stats_send(private->stats, private->dest);
stats_send(p->stats, p->dest);
break;
}
@ -98,8 +156,9 @@ static struct plugin p1 = {
.type = PLUGIN_TYPE_HOOK,
.hook = {
.priority = 2,
.size = sizeof(struct stats_hook),
.cb = hook_stats,
.when = HOOK_STORAGE | HOOK_PATH | HOOK_READ | HOOK_PERIODIC
.when = HOOK_STORAGE | HOOK_PARSE | HOOK_PATH | HOOK_READ | HOOK_PERIODIC
}
};