diff --git a/include/villas/nodes/stats.h b/include/villas/nodes/stats.h new file mode 100644 index 000000000..23c4c82e1 --- /dev/null +++ b/include/villas/nodes/stats.h @@ -0,0 +1,64 @@ +/** Node-type for stats streaming. + * + * @file + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** + * @ingroup node + * @addtogroup stats Sending stats + * @{ + */ + +#pragma once + +#include "task.h" + +/* Forward declarations */ +struct node; +struct path; +struct sample; + +struct stats_node { + struct task task; + double rate; + + struct node *node; +}; + +/** @see node_type::print */ +int stats_node_init(struct super_node *sn); + +/** @see node_type::print */ +char * stats_node_print(struct node *n); + +/** @see node_type::parse */ +int stats_node_parse(struct node *n, json_t *cfg); + +/** @see node_type::start */ +int stats_node_start(struct node *n); + +/** @see node_type::stop */ +int stats_node_stop(struct node *n); + +/** @see node_type::read */ +int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt); + +/** @} */ diff --git a/include/villas/stats.h b/include/villas/stats.h index a3c377d63..e0e46f941 100644 --- a/include/villas/stats.h +++ b/include/villas/stats.h @@ -31,7 +31,6 @@ /* Forward declarations */ struct sample; -struct path; struct node; enum stats_format { @@ -42,6 +41,7 @@ enum stats_format { enum stats_id { STATS_SKIPPED, /**< Counter for skipped samples due to hooks. */ + STATS_TIME, /**< The processing time per sample within VILLAsnode. */ STATS_REORDERED, /**< Counter for reordered samples. */ STATS_GAP_SAMPLE, /**< Histogram for inter sample timestamps (as sent by remote). */ STATS_GAP_RECEIVED, /**< Histogram for inter sample arrival time (as seen by this instance). */ @@ -53,7 +53,6 @@ struct stats_delta { double values[STATS_COUNT]; int update; /**< Bitmask of stats_id. Only those which are masked will be updated */ - struct sample *last; }; struct stats { @@ -68,11 +67,11 @@ int stats_init(struct stats *s, int buckets, int warmup); int stats_destroy(struct stats *s); -void stats_update(struct stats_delta *s, enum stats_id id, double val); +void stats_update(struct stats *s, enum stats_id id, double val); -void stats_collect(struct stats_delta *s, struct sample *smps[], size_t cnt); +void stats_collect(struct stats *s, struct sample *smps[], size_t cnt); -int stats_commit(struct stats *s, struct stats_delta *d); +int stats_commit(struct stats *s); json_t * stats_json(struct stats *s); @@ -81,12 +80,10 @@ void stats_reset(struct stats *s); void stats_print_header(); void stats_print_footer(); -void stats_print_periodic(struct stats *s, FILE *f, enum stats_format fmt, int verbose, struct path *p); +void stats_print_periodic(struct stats *s, FILE *f, enum stats_format fmt, int verbose, struct node *p); void stats_print(struct stats *s, FILE *f, enum stats_format fmt, int verbose); -void stats_send(struct stats *s, struct node *n); - enum stats_id stats_lookup_id(const char *name); #endif /* _STATS_H_ */ diff --git a/lib/api/actions/nodes.c b/lib/api/actions/nodes.c index d5679d2aa..220a3a2dd 100644 --- a/lib/api/actions/nodes.c +++ b/lib/api/actions/nodes.c @@ -26,6 +26,8 @@ #include "node.h" #include "super_node.h" #include "utils.h" +#include "stats.h" + #include "api.h" static int api_nodes(struct api_action *r, json_t *args, json_t **resp, struct api_session *s) @@ -43,6 +45,9 @@ static int api_nodes(struct api_action *r, json_t *args, json_t **resp, struct a "id", i ); + if (n->stats) + json_object_set(json_node, "stats", stats_json(n->stats)); + /* Add all additional fields of node here. * This can be used for metadata */ json_object_update(json_node, n->cfg); diff --git a/lib/api/actions/paths.c b/lib/api/actions/paths.c index 922ebe067..2633f08bb 100644 --- a/lib/api/actions/paths.c +++ b/lib/api/actions/paths.c @@ -25,7 +25,6 @@ #include "plugin.h" #include "path.h" #include "utils.h" -#include "stats.h" #include "super_node.h" #include "api.h" @@ -41,9 +40,6 @@ static int api_paths(struct api_action *r, json_t *args, json_t **resp, struct a "state", p->state ); - if (p->stats) - json_object_set(json_path, "stats", stats_json(p->stats)); - /* Add all additional fields of node here. * This can be used for metadata */ json_object_update(json_path, p->cfg); diff --git a/lib/hooks/drop.c b/lib/hooks/drop.c index abf10a96e..983182e6d 100644 --- a/lib/hooks/drop.c +++ b/lib/hooks/drop.c @@ -68,7 +68,7 @@ static int drop_read(struct hook *h, struct sample *smps[], unsigned *cnt) if (dist <= 0) { debug(10, "Reordered sample: sequence=%u, distance=%d", cur->sequence, dist); if (h->node && h->node->stats) - stats_update(h->node->stats->delta, STATS_REORDERED, dist); + stats_update(h->node->stats, STATS_REORDERED, dist); } else goto ok; diff --git a/lib/hooks/stats_collect.c b/lib/hooks/stats_collect.c index 4c27ca0ed..ec61eac70 100644 --- a/lib/hooks/stats_collect.c +++ b/lib/hooks/stats_collect.c @@ -30,6 +30,7 @@ #include "plugin.h" #include "stats.h" #include "node.h" +#include "timing.h" struct stats_collect { struct stats stats; @@ -41,6 +42,8 @@ struct stats_collect { AFILE *output; char *uri; + + struct sample *last; }; static int stats_collect_init(struct hook *h) @@ -50,8 +53,7 @@ static int stats_collect_init(struct hook *h) /* Register statistic object to path. * * This allows the path code to update statistics. */ - if (h->node) - h->node->stats = &p->stats; + h->node->stats = &p->stats; /* Set default values */ p->format = STATS_FORMAT_HUMAN; @@ -111,7 +113,7 @@ static int stats_collect_periodic(struct hook *h) { struct stats_collect *p = h->_vd; - stats_print_periodic(&p->stats, p->uri ? p->output->file : stdout, p->format, p->verbose, h->path); + stats_print_periodic(&p->stats, p->uri ? p->output->file : stdout, p->format, p->verbose, h->node); return 0; } @@ -153,9 +155,48 @@ static int stats_collect_parse(struct hook *h, json_t *cfg) static int stats_collect_read(struct hook *h, struct sample *smps[], unsigned *cnt) { struct stats_collect *p = h->_vd; + struct stats *s = &p->stats; - stats_collect(p->stats.delta, smps, *cnt); - stats_commit(&p->stats, p->stats.delta); + int dist; + struct sample *previous = p->last; + + for (int i = 0; i < *cnt; i++) { + if (previous) { + stats_update(s, STATS_GAP_RECEIVED, time_delta(&previous->ts.received, &smps[i]->ts.received)); + stats_update(s, STATS_GAP_SAMPLE, time_delta(&previous->ts.origin, &smps[i]->ts.origin)); + stats_update(s, STATS_OWD, time_delta(&smps[i]->ts.origin, &smps[i]->ts.received)); + + dist = smps[i]->sequence - (int32_t) previous->sequence; + if (dist != 1) + stats_update(s, STATS_REORDERED, dist); + } + + previous = smps[i]; + } + + if (p->last) + sample_put(p->last); + + if (previous) + sample_get(previous); + + p->last = previous; + + stats_commit(&p->stats); + + return 0; +} + +static int stats_collect_write(struct hook *h, struct sample *smps[], unsigned *cnt) +{ + struct stats_collect *p = h->_vd; + + struct timespec ts_sent = time_now(); + + for (int i = 0; i < *cnt; i++) + stats_update(&p->stats, STATS_TIME, time_delta(&smps[i]->ts.received, &ts_sent)); + + stats_commit(&p->stats); return 0; } @@ -172,6 +213,7 @@ static struct plugin p = { .start = stats_collect_start, .stop = stats_collect_stop, .read = stats_collect_read, + .write = stats_collect_write, .restart= stats_collect_restart, .periodic= stats_collect_periodic, .parse = stats_collect_parse, diff --git a/lib/hooks/stats_send.c b/lib/hooks/stats_send.c deleted file mode 100644 index 4bfc72241..000000000 --- a/lib/hooks/stats_send.c +++ /dev/null @@ -1,161 +0,0 @@ -/** Sending statistics to another node. - * - * @author Steffen Vogel - * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC - * @license GNU General Public License (version 3) - * - * VILLASnode - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - *********************************************************************************/ - -/** @addtogroup hooks Hook functions - * @{ - */ - -#include "hook.h" -#include "plugin.h" -#include "stats.h" -#include "path.h" -#include "super_node.h" -#include "sample.h" -#include "node.h" - -struct stats_send { - struct node *dest; - - enum { - STATS_SEND_MODE_PERIODIC, - STATS_SEND_MODE_READ - } mode; - - int decimation; -}; - -static int stats_send_init(struct hook *h) -{ - struct stats_send *p = h->_vd; - - p->decimation = 1; - p->mode = STATS_SEND_MODE_PERIODIC; - - return 0; -} - -static int stats_send_parse(struct hook *h, json_t *cfg) -{ - struct stats_send *p = h->_vd; - - assert(h->path && h->path->super_node); - - const char *dest = NULL; - const char *mode = NULL; - - int ret; - json_error_t err; - - ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s?: s, s?: i }" - "destination", &dest, - "mode", &mode, - "decimation", &p->decimation - ); - if (ret) - jerror(&err, "Failed to parse configuration of hook '%s'", plugin_name(h->_vt)); - - if (dest) { - assert(h->path); - - p->dest = list_lookup(&h->path->super_node->nodes, dest); - if (!p->dest) - jerror(&err, "Invalid destination node '%s' for hook '%s'", dest, plugin_name(h->_vt)); - } - else - jerror(&err, "Missing setting 'destination' for hook '%s'", plugin_name(h->_vt)); - - if (mode) { - if (!strcmp(mode, "periodic")) - p->mode = STATS_SEND_MODE_PERIODIC; - else if (!strcmp(mode, "read")) - p->mode = STATS_SEND_MODE_READ; - else - jerror(&err, "Invalid value '%s' for setting 'mode' of hook '%s'", mode, plugin_name(h->_vt)); - } - - return 0; -} - -static int stats_send_start(struct hook *h) -{ - struct stats_send *p = h->_vd; - - if (p->dest->state != STATE_STOPPED) - node_start(p->dest); - - return 0; -} - -static int stats_send_stop(struct hook *h) -{ - struct stats_send *p = h->_vd; - - if (p->dest->state != STATE_STOPPED) - node_stop(p->dest); - - return 0; -} - -static int stats_send_periodic(struct hook *h) -{ - struct stats_send *p = h->_vd; - - if (p->mode == STATS_SEND_MODE_PERIODIC) - stats_send(h->path->stats, p->dest); - - return 0; -} - -static int stats_send_read(struct hook *h, struct sample *smps[], unsigned *cnt) -{ - struct stats_send *p = h->_vd; - - assert(h->path->stats); - - if (p->mode == STATS_SEND_MODE_READ) { - size_t processed = h->path->stats->histograms[STATS_OWD].total; - if (processed % p->decimation == 0) - stats_send(h->path->stats, p->dest); - } - - return 0; -} - -static struct plugin p = { - .name = "stats_send", - .description = "Send path statistics to another node", - .type = PLUGIN_TYPE_HOOK, - .hook = { - .priority = 99, - .init = stats_send_init, - .parse = stats_send_parse, - .start = stats_send_start, - .stop = stats_send_stop, - .periodic= stats_send_periodic, - .read = stats_send_read, - .size = sizeof(struct stats_send) - } -}; - -REGISTER_PLUGIN(&p) - -/** @} */ diff --git a/lib/node.c b/lib/node.c index 747181ee8..2d6e92c7b 100644 --- a/lib/node.c +++ b/lib/node.c @@ -288,7 +288,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt) debug(LOG_NODES | 10, "Hooks skipped %u out of %u samples for node %s", skipped, nread, node_name(n)); if (n->stats) - stats_update(n->stats->delta, STATS_SKIPPED, skipped); + stats_update(n->stats, STATS_SKIPPED, skipped); } return rread; diff --git a/lib/nodes/stats.c b/lib/nodes/stats.c new file mode 100644 index 000000000..167faf445 --- /dev/null +++ b/lib/nodes/stats.c @@ -0,0 +1,152 @@ +/** Sending statistics to another node. + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** @addtogroup hooks Hook functions + * @{ + */ + +#include "hook.h" +#include "plugin.h" +#include "stats.h" +#include "super_node.h" +#include "sample.h" +#include "node.h" + +static struct list *nodes; /** The global list of nodes */ + +int stats_node_init(struct super_node *sn) +{ + if (!sn) + return -1; + + nodes = &sn->nodes; + + return 0; +} + +int stats_node_start(struct node *n) +{ + struct stats_node *s = n->_vd; + int ret; + + if (!s->node->stats) + return -1; + + if (s->node->stats->state != STATE_INITIALIZED) + return -2; + + ret = task_init(&s->task, s->rate, CLOCK_MONOTONIC); + if (ret) + serror("Failed to create task"); + + return 0; +} + +int stats_node_start(struct node *n) +{ + struct stats_node *s = n->_vd; + int ret; + + ret = task_destroy(&s->task); + if (ret) + return ret; + + return 0; +} + +char * stats_node_print(struct node *n) +{ + struct stats_node *p = h->_vd; + + return strf("node=%s, rate=%f", node_name_short(s->node), s->rate); +} + +int stats_node_parse(struct node *n, json_t *cfg) +{ + struct stats_node *p = h->_vd; + + int ret; + json_error_t err; + + const char *node; + + ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s: f }", + "node", &node, + "rate", &s->rate + ); + if (ret) + jerror(&err, "Failed to parse configuration of node %s", node_name(n)); + + if (s->rate <= 0) + error("Setting 'rate' of node %s must be positive", node_name(n)); + + return 0; +} + +int stats_node_read(struct hook *h, struct sample *smps[], unsigned *cnt) +{ + struct stats_node *sn = h->_vd; + struct stats *s = sn->node->stats; + + if (*cnt == 0) + return 0; + + for (int i = 0; j < MIN(STATS_COUNT, smps[0]->capacity); i++) { + smps[0]->data[i++].f = hist_last(&s->histograms[i]); + smps[0]->data[i++].f = hist_highest(&s->histograms[i]); + smps[0]->data[i++].f = hist_lowest(&s->histograms[i]); + smps[0]->data[i++].f = hist_mean(&s->histograms[i]); + smps[0]->data[i++].f = hist_var(&s->histograms[i]); + + smps[0]->length = i; + } + + return 1; +} + +int stats_node_fd(struct node *n) +{ + struct stats_node *p = h->_vd; + + return task_fd(&s->task); +} + +static struct plugin p = { + .name = "stats", + .description = "Send statistics to another node", + .type = PLUGIN_TYPE_NODE, + .hook = { + .vecotrize = 1, + .size = sizeof(struct stats_node), + .init = stats_node_init, + .parse = stats_node_parse, + .print = stats_node_print, + .start = stats_node_start, + .stop = stats_node_stop, + .read = stats_node_read, + .fd = stats_node_fd + } +}; + +REGISTER_PLUGIN(&p) + +/** @} */ diff --git a/lib/stats.c b/lib/stats.c index b7d3e7ab0..29531e2ea 100644 --- a/lib/stats.c +++ b/lib/stats.c @@ -25,7 +25,7 @@ #include "stats.h" #include "hist.h" #include "timing.h" -#include "path.h" +#include "node.h" #include "sample.h" #include "utils.h" #include "log.h" @@ -38,6 +38,7 @@ static struct stats_desc { int hist_buckets; } stats_metrics[] = { { "skipped", "samples", "Skipped samples and the distance between them", 25 }, + { "time", "seconds", "The processing time per sample within VILLAsnode", 25 }, { "reordered", "samples", "Reordered samples and the distance between them", 25 }, { "gap_sample", "seconds", "Inter-message timestamps (as sent by remote)", 25 }, { "gap_received", "seconds", "Inter-message arrival time (as seen by this instance)", 25 }, @@ -76,54 +77,24 @@ int stats_destroy(struct stats *s) return 0; } -void stats_update(struct stats_delta *d, enum stats_id id, double val) +void stats_update(struct stats *s, enum stats_id id, double val) { - assert(id >= 0 && id < STATS_COUNT); - - d->values[id] = val; - d->update |= 1 << id; + s->delta->values[id] = val; + s->delta->update |= 1 << id; } -int stats_commit(struct stats *s, struct stats_delta *d) +int stats_commit(struct stats *s) { for (int i = 0; i < STATS_COUNT; i++) { - if (d->update & 1 << i) { - hist_put(&s->histograms[i], d->values[i]); - d->update &= ~(1 << i); + if (s->delta->update & 1 << i) { + hist_put(&s->histograms[i], s->delta->values[i]); + s->delta->update &= ~(1 << i); } } return 0; } -void stats_collect(struct stats_delta *s, struct sample *smps[], size_t cnt) -{ - int dist; - struct sample *previous = s->last; - - for (int i = 0; i < cnt; i++) { - if (previous) { - stats_update(s, STATS_GAP_RECEIVED, time_delta(&previous->ts.received, &smps[i]->ts.received)); - stats_update(s, STATS_GAP_SAMPLE, time_delta(&previous->ts.origin, &smps[i]->ts.origin)); - stats_update(s, STATS_OWD, time_delta(&smps[i]->ts.origin, &smps[i]->ts.received)); - - dist = smps[i]->sequence - (int32_t) previous->sequence; - if (dist != 1) - stats_update(s, STATS_REORDERED, dist); - } - - previous = smps[i]; - } - - if (s->last) - sample_put(s->last); - - if (previous) - sample_get(previous); - - s->last = previous; -} - json_t * stats_json(struct stats *s) { json_t *obj = json_object(); @@ -139,10 +110,12 @@ json_t * stats_json(struct stats *s) return obj; } -json_t * stats_json_periodic(struct stats *s, struct path *p) +json_t * stats_json_periodic(struct stats *s, struct node *n) { - return json_pack("{ s: s, s: f, s: f, s: i, s: i }" - "path", path_name(p), + return json_pack("{ s: s, s: i, s: i, s: f, s: f, s: i, s: i }", + "node", node_name(n), + "received", hist_total(&s->histograms[STATS_OWD]), + "sent", hist_total(&s->histograms[STATS_TIME]), "owd", hist_last(&s->histograms[STATS_OWD]), "rate", 1.0 / hist_last(&s->histograms[STATS_GAP_SAMPLE]), "dropped", hist_total(&s->histograms[STATS_REORDERED]), @@ -152,20 +125,21 @@ json_t * stats_json_periodic(struct stats *s, struct path *p) void stats_reset(struct stats *s) { - for (int i = 0; i < STATS_COUNT; i++) { + for (int i = 0; i < STATS_COUNT; i++) hist_reset(&s->histograms[i]); - } } static struct table_column stats_cols[] = { - { 35, "Path", "%s", NULL, TABLE_ALIGN_LEFT }, - { 10, "Cnt", "%ju", "p", TABLE_ALIGN_RIGHT }, + { 10, "Node", "%s", NULL, TABLE_ALIGN_LEFT }, + { 10, "Recv", "%ju", "p", TABLE_ALIGN_RIGHT }, + { 10, "Sent", "%ju", "p", TABLE_ALIGN_RIGHT }, { 10, "OWD last", "%f", "S", TABLE_ALIGN_RIGHT }, { 10, "OWD mean", "%f", "S", TABLE_ALIGN_RIGHT }, { 10, "Rate last", "%f", "p/S", TABLE_ALIGN_RIGHT }, { 10, "Rate mean", "%f", "p/S", TABLE_ALIGN_RIGHT }, { 10, "Drop", "%ju", "p", TABLE_ALIGN_RIGHT }, - { 10, "Skip", "%ju", "p", TABLE_ALIGN_RIGHT } + { 10, "Skip", "%ju", "p", TABLE_ALIGN_RIGHT }, + { 10, "Time", "%f", "S", TABLE_ALIGN_RIGHT } }; static struct table stats_table = { @@ -195,24 +169,26 @@ void stats_print_footer(enum stats_format fmt) } } -void stats_print_periodic(struct stats *s, FILE *f, enum stats_format fmt, int verbose, struct path *p) +void stats_print_periodic(struct stats *s, FILE *f, enum stats_format fmt, int verbose, struct node *n) { switch (fmt) { case STATS_FORMAT_HUMAN: table_row(&stats_table, - path_name(p), + node_name_short(n), hist_total(&s->histograms[STATS_OWD]), + hist_total(&s->histograms[STATS_TIME]), hist_last(&s->histograms[STATS_OWD]), hist_mean(&s->histograms[STATS_OWD]), 1.0 / hist_last(&s->histograms[STATS_GAP_RECEIVED]), 1.0 / hist_mean(&s->histograms[STATS_GAP_RECEIVED]), hist_total(&s->histograms[STATS_REORDERED]), - hist_total(&s->histograms[STATS_SKIPPED]) + hist_total(&s->histograms[STATS_SKIPPED]), + hist_mean(&s->histograms[STATS_TIME]) ); break; case STATS_FORMAT_JSON: { - json_t *json_stats = stats_json_periodic(s, p); + json_t *json_stats = stats_json_periodic(s, n); json_dumpf(json_stats, f, 0); break; } @@ -244,25 +220,6 @@ void stats_print(struct stats *s, FILE *f, enum stats_format fmt, int verbose) } } -void stats_send(struct stats *s, struct node *n) -{ - char buf[SAMPLE_LEN(STATS_COUNT * 5)]; - struct sample *smp = (struct sample *) buf; - - int i = 0; - - for (int j = 0; j < STATS_COUNT; j++) { - smp->data[i++].f = hist_last(&s->histograms[j]); - smp->data[i++].f = hist_highest(&s->histograms[j]); - smp->data[i++].f = hist_lowest(&s->histograms[j]); - smp->data[i++].f = hist_mean(&s->histograms[j]); - smp->data[i++].f = hist_var(&s->histograms[j]); - } - smp->length = i; - - node_write(n, &smp, 1); /* Send single message with statistics to destination node */ -} - enum stats_id stats_lookup_id(const char *name) { for (int i = 0; i < STATS_COUNT; i++) {