From a9363dad44df9f2277217ba1898ea5baf68c4e35 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 22 Oct 2016 20:42:05 -0400 Subject: [PATCH] entangle hook and statistic collection system --- include/villas/hooks.h | 58 +++++++++------ include/villas/stats.h | 63 ++++++++++++++++ lib/Makefile.inc | 2 +- lib/hooks.c | 44 +++++++++--- lib/hooks/hooks-internal.c | 63 ++++++++-------- lib/hooks/hooks-other.c | 78 +++++++++++--------- lib/hooks/hooks-stats.c | 95 ++++++------------------- lib/stats.c | 142 +++++++++++++++++++++++++++++++++++++ 8 files changed, 376 insertions(+), 169 deletions(-) create mode 100644 include/villas/stats.h create mode 100644 lib/stats.c diff --git a/include/villas/hooks.h b/include/villas/hooks.h index 9eae7bb6b..7c69b0125 100644 --- a/include/villas/hooks.h +++ b/include/villas/hooks.h @@ -50,17 +50,33 @@ struct settings; /** This is a list of hooks which can be used in the configuration file. */ extern struct list hooks; +/** Optional parameters to hook callbacks */ +struct hook_info { + struct node *node; + struct path *path; + + struct sample **smps; + size_t cnt; + + struct list *paths; + struct list *nodes; + struct settings *settings; +}; + /** Callback type of hook function * - * @param p The path which is processing this message. * @param h The hook datastructure which contains parameter, name and private context for the hook. - * @param m A pointer to the first message which should be processed by the hook. - * @param cnt The number of messages which should be processed by the hook. * @param when Provides the type of hook for which this occurence of the callback function was executed. See hook_type for possible values. + * @param i The hook_info structure contains references to the current node, path or samples. Some fields of this structure can be NULL. * @retval 0 Success. Continue processing and forwarding the message. * @retval <0 Error. Drop the message. */ -typedef int (*hook_cb_t)(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); +typedef int (*hook_cb_t)(struct hook *h, int when, struct hook_info *i); + +enum hook_state { + HOOK_DESTROYED, + HOOK_INITIALIZED +}; /** The type of a hook defines when a hook will be exectuted. This is used as a bitmask. */ enum hook_type { @@ -74,15 +90,15 @@ enum hook_type { HOOK_ASYNC = 1 << 7, /**< Called asynchronously with fixed rate (see path::rate). */ HOOK_PERIODIC = 1 << 8, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */ - HOOK_INIT = 1 << 9, /**< Called before path is started to parse parameters. */ - HOOK_DEINIT = 1 << 10, /**< Called after path has been stopped to release memory allocated by HOOK_INIT */ + HOOK_INIT = 1 << 9, /**< Called before path is started to parseHOOK_DESTROYs. */ + HOOK_DESTROY = 1 << 10, /**< Called after path has been stopped to release memory allocated by HOOK_INIT */ HOOK_INTERNAL = 1 << 11, /**< Internal hooks are added to every path implicitely. */ HOOK_PARSE = 1 << 12, /**< Called for parsing hook arguments. */ /** @{ Classes of hooks */ /** Hooks which are using private data must allocate and free them propery. */ - HOOK_STORAGE = HOOK_INIT | HOOK_DEINIT, + HOOK_STORAGE = HOOK_INIT | HOOK_DESTROY, /** All path related actions */ HOOK_PATH = HOOK_PATH_START | HOOK_PATH_STOP | HOOK_PATH_RESTART, /** Hooks which are used to collect statistics. */ @@ -111,8 +127,11 @@ struct hook { hook_cb_t cb; /**< The hook callback function as a function pointer. */ }; -/** Save references to global nodes, paths and settings */ -void hook_init(struct list *nodes, struct list *paths, struct settings *set); +int hook_init(struct hook *h, struct list *nodes, struct list *paths, struct settings *settings); + +void hook_destroy(struct hook *h); + +int hook_copy(struct hook *h, struct hook *c); /** Sort hook list according to the their priority. See hook::priority. */ int hooks_sort_priority(const void *a, const void *b); @@ -139,18 +158,17 @@ int hook_run(struct path *p, struct sample *smps[], size_t cnt, int when); */ void * hook_storage(struct hook *h, int when, size_t len); -int hook_print(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); -int hook_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); -int hook_convert(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); -int hook_decimate(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); -int hook_skip_first(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); +int hook_print(struct hook *h, int when, struct hook_info *j); +int hook_ts(struct hook *h, int when, struct hook_info *j); +int hook_convert(struct hook *h, int when, struct hook_info *j); +int hook_decimate(struct hook *h, int when, struct hook_info *j); +int hook_skip_first(struct hook *h, int when, struct hook_info *j); -int hook_stats_send(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); -int hook_stats(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); -void hook_stats_header(); +int hook_stats_send(struct hook *h, int when, struct hook_info *j); +int hook_stats(struct hook *h, int when, struct hook_info *j); -int hook_fix_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); -int hook_restart(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); -int hook_drop(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt); +int hook_fix_ts(struct hook *h, int when, struct hook_info *j); +int hook_restart(struct hook *h, int when, struct hook_info *j); +int hook_drop(struct hook *h, int when, struct hook_info *j); #endif /** _HOOKS_H_ @} */ diff --git a/include/villas/stats.h b/include/villas/stats.h new file mode 100644 index 000000000..e6260833b --- /dev/null +++ b/include/villas/stats.h @@ -0,0 +1,63 @@ +/** Statistic collection. + * + * @file + * @author Steffen Vogel + * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC + * This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential. + * Unauthorized copying of this file, via any medium is strictly prohibited. + */ + +#ifndef _STATS_H_ +#define _STATS_H_ + +#include + +#ifdef WITH_JANSSON + #include +#endif + +#include "hist.h" + +/* Forward declarations */ +struct sample; +struct path; +struct node; + +struct stats { + struct { + uintmax_t invalid; /**< Counter for invalid messages */ + uintmax_t skipped; /**< Counter for skipped messages due to hooks */ + uintmax_t dropped; /**< Counter for dropped messages due to reordering */ + } counter; + + struct { + struct hist owd; /**< Histogram for one-way-delay (OWD) of received messages */ + struct hist gap_msg; /**< Histogram for inter message timestamps (as sent by remote) */ + struct hist gap_recv; /**< Histogram for inter message arrival time (as seen by this instance) */ + struct hist gap_seq; /**< Histogram of sequence number displacement of received messages */ + } histogram; + + struct sample *last; +}; + +int stats_init(struct stats *s); + +void stats_destroy(struct stats *s); + +void stats_collect(struct stats *s, struct sample *smps[], size_t cnt); + +#ifdef WITH_JANSSON +json_t * stats_json(struct stats *s); +#endif + +void stats_reset(struct stats *s); + +void stats_print_header(); + +void stats_print_periodic(struct stats *s, struct path *p); + +void stats_print(struct stats *s); + +void stats_send(struct stats *s, struct node *n); + +#endif /* _STATS_H_ */ \ No newline at end of file diff --git a/lib/Makefile.inc b/lib/Makefile.inc index 21b221772..258c50d14 100644 --- a/lib/Makefile.inc +++ b/lib/Makefile.inc @@ -6,7 +6,7 @@ LIB_SRCS = $(addprefix lib/nodes/, file.c cbuilder.c) \ $(addprefix lib/kernel/, kernel.c rt.c) \ $(addprefix lib/, sample.c path.c node.c hooks.c \ log.c utils.c cfg.c hist.c timing.c pool.c list.c \ - queue.c memory.c \ + queue.c memory.c stats.c \ ) \ $(wildcard lib/hooks/*.c) \ diff --git a/lib/hooks.c b/lib/hooks.c index ea4745e80..5ffcc9044 100644 --- a/lib/hooks.c +++ b/lib/hooks.c @@ -19,16 +19,32 @@ struct list hooks; -/* Those references can be used inside the hook callbacks after initializing them with hook_init() */ -struct list *hook_nodes = NULL; -struct list *hook_paths = NULL; -struct settings *hook_settings = NULL; - -void hook_init(struct list *nodes, struct list *paths, struct settings *set) +int hook_init(struct hook *h, struct list *nodes, struct list *paths, struct settings *settings) { - hook_nodes = nodes; - hook_paths = paths; - hook_settings = set; + struct hook_info i = { + .paths = paths, + .nodes = nodes, + .settings = settings + }; + + return h->cb(h, HOOK_INIT, &i); +} + +void hook_destroy(struct hook *h) +{ + struct hook_info i = { NULL }; + h->cb(h, HOOK_DESTROY, &i); +} + +int hook_copy(struct hook *h, struct hook *c) +{ + memcpy(c, h, sizeof(struct hook)); + + c->_vd = + c->prev = + c->last = NULL; + + return 0; } int hooks_sort_priority(const void *a, const void *b) { @@ -40,11 +56,17 @@ int hooks_sort_priority(const void *a, const void *b) { int hook_run(struct path *p, struct sample *smps[], size_t cnt, int when) { + struct hook_info i = { + .path = p, + .smps = smps, + .cnt = cnt + }; + list_foreach(struct hook *h, &p->hooks) { if (h->type & when) { debug(DBG_HOOK | 22, "Running hook when=%u '%s' prio=%u, cnt=%zu", when, h->name, h->priority, cnt); - cnt = h->cb(p, h, when, smps, cnt); + cnt = h->cb(h, when, &i); if (cnt == 0) break; } @@ -60,7 +82,7 @@ void * hook_storage(struct hook *h, int when, size_t len) h->_vd = alloc(len); break; - case HOOK_DEINIT: + case HOOK_DESTROY: free(h->_vd); h->_vd = NULL; break; diff --git a/lib/hooks/hooks-internal.c b/lib/hooks/hooks-internal.c index 26958703c..070e68359 100644 --- a/lib/hooks/hooks-internal.c +++ b/lib/hooks/hooks-internal.c @@ -13,75 +13,78 @@ #include "utils.h" REGISTER_HOOK("fix_ts", "Update timestamps of sample if not set", 0, 0, hook_fix_ts, HOOK_INTERNAL | HOOK_READ) -int hook_fix_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +int hook_fix_ts(struct hook *h, int when, struct hook_info *j) { struct timespec now = time_now(); + + assert(j->smps); - for (int i = 0; i < cnt; i++) { + for (int i = 0; i < j->cnt; i++) { /* Check for missing receive timestamp * Usually node_type::read() should update the receive timestamp. * An example would be to use hardware timestamp capabilities of * modern NICs. */ - if ((smps[i]->ts.received.tv_sec == 0 && smps[i]->ts.received.tv_nsec == 0) || - (smps[i]->ts.received.tv_sec == -1 && smps[i]->ts.received.tv_nsec == -1)) - smps[i]->ts.received = now; + if ((j->smps[i]->ts.received.tv_sec == 0 && j->smps[i]->ts.received.tv_nsec == 0) || + (j->smps[i]->ts.received.tv_sec == -1 && j->smps[i]->ts.received.tv_nsec == -1)) + j->smps[i]->ts.received = now; /* Check for missing origin timestamp */ - if ((smps[i]->ts.origin.tv_sec == 0 && smps[i]->ts.origin.tv_nsec == 0) || - (smps[i]->ts.origin.tv_sec == -1 && smps[i]->ts.origin.tv_nsec == -1)) - smps[i]->ts.origin = now; + if ((j->smps[i]->ts.origin.tv_sec == 0 && j->smps[i]->ts.origin.tv_nsec == 0) || + (j->smps[i]->ts.origin.tv_sec == -1 && j->smps[i]->ts.origin.tv_nsec == -1)) + j->smps[i]->ts.origin = now; } - return cnt; + return j->cnt; } REGISTER_HOOK("restart", "Call restart hooks for current path", 1, 1, hook_restart, HOOK_INTERNAL | HOOK_READ) -int hook_restart(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +int hook_restart(struct hook *h, int when, struct hook_info *j) { - for (int i = 0; i < cnt; i++) { - h->last = smps[i]; + assert(j->smps); + assert(j->path); + + for (int i = 0; i < j->cnt; i++) { + h->last = j->smps[i]; if (h->prev) { if (h->last->sequence == 0 && h->prev->sequence <= UINT32_MAX - 32) { warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u)", - path_name(p), h->prev->sequence, h->last->sequence); + path_name(j->path), h->prev->sequence, h->last->sequence); - p->invalid = - p->skipped = - p->dropped = 0; - - hook_run(p, &smps[i], cnt - i, HOOK_PATH_RESTART); + hook_run(j->path, &j->smps[i], j->cnt - i, HOOK_PATH_RESTART); } } h->prev = h->last; } - return cnt; + return j->cnt; } REGISTER_HOOK("drop", "Drop messages with reordered sequence numbers", 3, 1, hook_drop, HOOK_INTERNAL | HOOK_READ) -int hook_drop(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +int hook_drop(struct hook *h, int when, struct hook_info *j) { int i, ok, dist; + + assert(j->smps); - for (i = 0, ok = 0; i < cnt; i++) { - h->last = smps[i]; + for (i = 0, ok = 0; i < j->cnt; i++) { + h->last = j->smps[i]; if (h->prev) { dist = h->last->sequence - (int32_t) h->prev->sequence; if (dist <= 0) { - p->dropped++; + j->path->stats->counter.dropped++; warn("Dropped sample: dist = %d, i = %d", dist, i); } else { struct sample *tmp; - tmp = smps[i]; - smps[i] = smps[ok]; - smps[ok++] = tmp; + tmp = j->smps[i]; + j->smps[i] = j->smps[ok]; + j->smps[ok++] = tmp; } /* To discard the first X samples in 'smps[]' we must @@ -93,11 +96,11 @@ int hook_drop(struct path *p, struct hook *h, int when, struct sample *smps[], s else { struct sample *tmp; - tmp = smps[i]; - smps[i] = smps[ok]; - smps[ok++] = tmp; + tmp = j->smps[i]; + j->smps[i] = j->smps[ok]; + j->smps[ok++] = tmp; } - + h->prev = h->last; } diff --git a/lib/hooks/hooks-other.c b/lib/hooks/hooks-other.c index 21814ef29..86733cb19 100644 --- a/lib/hooks/hooks-other.c +++ b/lib/hooks/hooks-other.c @@ -17,25 +17,29 @@ #include "sample.h" REGISTER_HOOK("print", "Print the message to stdout", 99, 0, hook_print, HOOK_READ) -int hook_print(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +int hook_print(struct hook *h, int when, struct hook_info *j) { - for (int i = 0; i < cnt; i++) - sample_fprint(stdout, smps[i], SAMPLE_ALL); + assert(j->smps); + + for (int i = 0; i < j->cnt; i++) + sample_fprint(stdout, j->smps[i], SAMPLE_ALL); - return cnt; + return j->cnt; } REGISTER_HOOK("ts", "Update timestamp of message with current time", 99, 0, hook_ts, HOOK_READ) -int hook_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +int hook_ts(struct hook *h, int when, struct hook_info *j) { - for (int i = 0; i < cnt; i++) - smps[i]->ts.origin = smps[i]->ts.received; + assert(j->smps); - return cnt; + for (int i = 0; i < j->cnt; i++) + j->smps[i]->ts.origin = j->smps[i]->ts.received; + + return j->cnt; } -REGISTER_HOOK("convert", "Convert message from / to floating-point / integer", 99, 0, hook_convert, HOOK_STORAGE | HOOK_PARSE | HOOK_READ) -int hook_convert(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +REGISTER_HOOK("convert", "Convert message from / to floating-point / integer", 99, 0, hook_convert, HOOK_STORAGE | HOOK_DESTROY | HOOK_READ) +int hook_convert(struct hook *h, int when, struct hook_info *k) { struct { enum { @@ -45,7 +49,7 @@ int hook_convert(struct path *p, struct hook *h, int when, struct sample *smps[] } *private = hook_storage(h, when, sizeof(*private)); switch (when) { - case HOOK_PARSE: + case HOOK_DESTROY: if (!h->parameter) error("Missing parameter for hook: '%s'", h->name); @@ -58,30 +62,29 @@ int hook_convert(struct path *p, struct hook *h, int when, struct sample *smps[] break; case HOOK_READ: - for (int i = 0; i < cnt; i++) { - for (int j = 0; j < smps[0]->length; j++) { + for (int i = 0; i < k->cnt; i++) { + for (int j = 0; j < k->smps[0]->length; j++) { switch (private->mode) { - case TO_FIXED: smps[i]->data[j].i = smps[i]->data[j].f * 1e3; break; - case TO_FLOAT: smps[i]->data[j].f = smps[i]->data[j].i; break; + case TO_FIXED: k->smps[i]->data[j].i = k->smps[i]->data[j].f * 1e3; break; + case TO_FLOAT: k->smps[i]->data[j].f = k->smps[i]->data[j].i; break; } } } - break; + + return k->cnt; } return 0; } -REGISTER_HOOK("decimate", "Downsamping by integer factor", 99, 0, hook_decimate, HOOK_STORAGE | HOOK_PARSE | HOOK_READ) -int hook_decimate(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +REGISTER_HOOK("decimate", "Downsamping by integer factor", 99, 0, hook_decimate, HOOK_STORAGE | HOOK_DESTROY | HOOK_READ) +int hook_decimate(struct hook *h, int when, struct hook_info *j) { struct { unsigned ratio; unsigned counter; } *private = hook_storage(h, when, sizeof(*private)); - int ok; - switch (when) { case HOOK_PARSE: if (!h->parameter) @@ -95,24 +98,27 @@ int hook_decimate(struct path *p, struct hook *h, int when, struct sample *smps[ break; case HOOK_READ: - ok = 0; - for (int i = 0; i < cnt; i++) { + assert(j->smps); + + int i, ok; + for (i = 0, ok = 0; i < j->cnt; i++) { if (private->counter++ % private->ratio == 0) { struct sample *tmp; - tmp = smps[ok]; - smps[ok++] = smps[i]; - smps[i] = tmp; + tmp = j->smps[ok]; + j->smps[ok++] = j->smps[i]; + j->smps[i] = tmp; } } + return ok; } - return cnt; + return 0; } REGISTER_HOOK("skip_first", "Skip the first samples", 99, 0, hook_skip_first, HOOK_STORAGE | HOOK_PARSE | HOOK_READ | HOOK_PATH) -int hook_skip_first(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +int hook_skip_first(struct hook *h, int when, struct hook_info *j) { struct { struct timespec skip; /**< Time to wait until first message is not skipped */ @@ -121,7 +127,6 @@ int hook_skip_first(struct path *p, struct hook *h, int when, struct sample *smp char *endptr; double wait; - int i, ok; switch (when) { case HOOK_PARSE: @@ -135,19 +140,22 @@ int hook_skip_first(struct path *p, struct hook *h, int when, struct sample *smp private->skip = time_from_double(wait); break; + case HOOK_PATH_START: case HOOK_PATH_RESTART: - case HOOK_PATH_STOP: - private->until = time_add(&smps[0]->ts.received, &private->skip); + private->until = time_add(&j->smps[0]->ts.received, &private->skip); break; case HOOK_READ: - for (i = 0, ok = 0; i < cnt; i++) { - if (time_delta(&private->until, &smps[i]->ts.received) > 0) { + assert(j->smps); + + int i, ok; + for (i = 0, ok = 0; i < j->cnt; i++) { + if (time_delta(&private->until, &j->smps[i]->ts.received) > 0) { struct sample *tmp; - tmp = smps[i]; - smps[i] = smps[ok]; - smps[ok++] = tmp; + tmp = j->smps[i]; + j->smps[i] = j->smps[ok]; + j->smps[ok++] = tmp; } diff --git a/lib/hooks/hooks-stats.c b/lib/hooks/hooks-stats.c index ff5fbd268..4b787b78f 100644 --- a/lib/hooks/hooks-stats.c +++ b/lib/hooks/hooks-stats.c @@ -10,88 +10,51 @@ #include "sample.h" #include "path.h" #include "utils.h" -#include "timing.h" +#include "stats.h" extern struct list *hook_nodes; -void hook_stats_header() -{ - #define UNIT(u) "(" YEL(u) ")" - - stats("%-40s|%19s|%19s|%19s|%19s|%19s|", "Source " MAG("=>") " Destination", - "OWD" UNIT("S") " ", - "Rate" UNIT("p/S") " ", - "Drop" UNIT("p") " ", - "Skip" UNIT("p") " ", - "Inval" UNIT("p") " " - ); - line(); -} - REGISTER_HOOK("stats", "Collect statistics for the current path", 2, 1, hook_stats, HOOK_STATS) -int hook_stats(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +int hook_stats(struct hook *h, int when, struct hook_info *j) { + struct stats *s = hook_storage(h, when, sizeof(struct stats)); + switch (when) { case HOOK_INIT: - /** @todo Allow configurable bounds for histograms */ - hist_create(&p->hist.owd, 0, 1, 100e-3); - hist_create(&p->hist.gap_msg, 90e-3, 110e-3, 1e-3); - hist_create(&p->hist.gap_recv, 90e-3, 110e-3, 1e-3); - hist_create(&p->hist.gap_seq, -HIST_SEQ, +HIST_SEQ, 1); + stats_init(s); break; case HOOK_READ: - for (int i = 0; i < cnt; i++) { - h->last = smps[i]; - - if (h->prev) { - int gap_seq = h->last->sequence - (int32_t) h->prev->sequence; - double owd = time_delta(&h->last->ts.origin, &h->last->ts.received); - double gap = time_delta(&h->prev->ts.origin, &h->last->ts.origin); - double gap_recv = time_delta(&h->prev->ts.received, &h->last->ts.received); - - hist_put(&p->hist.gap_msg, gap); - hist_put(&p->hist.gap_seq, gap_seq); - hist_put(&p->hist.owd, owd); - hist_put(&p->hist.gap_recv, gap_recv); - } - - h->prev = h->last; - } + assert(j->smps); + + stats_collect(s, j->smps, j->cnt); break; case HOOK_PATH_STOP: - if (p->hist.owd.total) { info("One-way delay:"); hist_print(&p->hist.owd); } - if (p->hist.gap_recv.total){ info("Inter-message arrival time:"); hist_print(&p->hist.gap_recv); } - if (p->hist.gap_msg.total) { info("Inter-message ts gap:"); hist_print(&p->hist.gap_msg); } - if (p->hist.gap_seq.total) { info("Inter-message sequence number gaps:"); hist_print(&p->hist.gap_seq); } + stats_print(s); break; - case HOOK_DEINIT: - hist_destroy(&p->hist.owd); - hist_destroy(&p->hist.gap_msg); - hist_destroy(&p->hist.gap_recv); - hist_destroy(&p->hist.gap_seq); + case HOOK_DESTROY: + stats_destroy(s); break; case HOOK_PATH_RESTART: - hist_reset(&p->hist.owd); - hist_reset(&p->hist.gap_seq); - hist_reset(&p->hist.gap_msg); - hist_reset(&p->hist.gap_recv); + stats_reset(s); break; case HOOK_PERIODIC: - stats("%-40.40s|%10s|%10s|%10ju|%10ju|%10ju|", path_name(p), "", "", - p->dropped, p->skipped, p->invalid); + assert(j->path); + + stats_print_periodic(s, j->path); break; } - return cnt; + return j->cnt; } -REGISTER_HOOK("stats_send", "Send path statistics to another node", 99, 0, hook_stats_send, HOOK_STORAGE | HOOK_PARSE | HOOK_PERIODIC | HOOK_PATH) -int hook_stats_send(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt) +#if 0 /* currently broken */ +REGISTER_HOOK("stats_send", "Send path statistics to another node", 99, 0, hook_stats_send, HOOK_STORAGE | HOOK_DESTROY | HOOK_PERIODIC | HOOK_PATH) +int hook_stats_send(struct hook *h, int when, struct hook_info *i) { struct private { struct node *dest; @@ -99,7 +62,7 @@ int hook_stats_send(struct path *p, struct hook *h, int when, struct sample *smp } *private = hook_storage(h, when, sizeof(*private)); switch (when) { - case HOOK_PARSE: + case HOOK_DESTROY: if (!h->parameter) error("Missing parameter for hook '%s'", h->name); @@ -115,23 +78,11 @@ int hook_stats_send(struct path *p, struct hook *h, int when, struct sample *smp break; case HOOK_PERIODIC: { - int i; - char buf[SAMPLE_LEN(16)]; - struct sample *smp = (struct sample *) buf; - - i = 0; - smp->data[i++].f = p->invalid; - smp->data[i++].f = p->skipped; - smp->data[i++].f = p->dropped; - smp->data[i++].f = p->hist.owd.last, - smp->data[i++].f = 1.0 / p->hist.gap_msg.last; - smp->data[i++].f = 1.0 / p->hist.gap_recv.last; - smp->length = i; - - node_write(private->dest, &smp, 1); /* Send single message with statistics to destination node */ + stats_send(s, node); break; } } return 0; -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/lib/stats.c b/lib/stats.c new file mode 100644 index 000000000..f25675d1d --- /dev/null +++ b/lib/stats.c @@ -0,0 +1,142 @@ +/** Statistic collection. + * + * @author Steffen Vogel + * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC + * This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential. + * Unauthorized copying of this file, via any medium is strictly prohibited. + */ + +#include "stats.h" +#include "hist.h" +#include "timing.h" +#include "path.h" +#include "sample.h" +#include "log.h" + +int stats_init(struct stats *s) +{ + /** @todo Allow configurable bounds for histograms */ + hist_create(&s->histogram.owd, 0, 1, 100e-3); + hist_create(&s->histogram.gap_msg, 90e-3, 110e-3, 1e-3); + hist_create(&s->histogram.gap_recv, 90e-3, 110e-3, 1e-3); + hist_create(&s->histogram.gap_seq, -HIST_SEQ, +HIST_SEQ, 1); + + return 0; +} + +void stats_destroy(struct stats *s) +{ + hist_destroy(&s->histogram.owd); + hist_destroy(&s->histogram.gap_msg); + hist_destroy(&s->histogram.gap_recv); + hist_destroy(&s->histogram.gap_seq); +} + +void stats_collect(struct stats *s, struct sample *smps[], size_t cnt) +{ + for (int i = 0; i < cnt; i++) { + if (s->last) { + int gap_seq = smps[i]->sequence - (int32_t) s->last->sequence; + double owd = time_delta(&smps[i]->ts.origin, &smps[i]->ts.received); + double gap = time_delta(&s->last->ts.origin, &smps[i]->ts.origin); + double gap_recv = time_delta(&s->last->ts.received, &smps[i]->ts.received); + + hist_put(&s->histogram.owd, owd); + hist_put(&s->histogram.gap_msg, gap); + hist_put(&s->histogram.gap_seq, gap_seq); + hist_put(&s->histogram.gap_recv, gap_recv); + } + + if (i == 0 && s->last) + sample_put(s->last); + if (i == cnt - 1) + sample_get(smps[i]); + + s->last = smps[i]; + } +} + +#ifdef WITH_JANSSON +json_t * stats_json(struct stats *s) +{ + return json_pack("{ s: { s: i, s: i, s: i }, s: { s: o, s: o, s: o } }", + "counter", + "dropped", s->counter.dropped, + "invalid", s->counter.invalid, + "skipped", s->counter.skipped, + "histogram", + "owd", hist_json(&s->histogram.owd), + "gap_msg", hist_json(&s->histogram.gap_msg), + "gap_recv",hist_json(&s->histogram.gap_recv), + "gap_seq", hist_json(&s->histogram.gap_seq) + ); +} +#endif + +void stats_reset(struct stats *s) +{ + s->counter.invalid = + s->counter.skipped = + s->counter.dropped = 0; + + hist_reset(&s->histogram.owd); + hist_reset(&s->histogram.gap_seq); + hist_reset(&s->histogram.gap_msg); + hist_reset(&s->histogram.gap_recv); +} + +void stats_print_header() +{ + #define UNIT(u) "(" YEL(u) ")" + + stats("%-40s|%19s|%19s|%19s|%19s|%19s|", "Source " MAG("=>") " Destination", + "OWD" UNIT("S") " ", + "Rate" UNIT("p/S") " ", + "Drop" UNIT("p") " ", + "Skip" UNIT("p") " ", + "Inval" UNIT("p") " " + ); + line(); +} + +void stats_print_periodic(struct stats *s, struct path *p) +{ + stats("%-40.40s|%10s|%10s|%10ju|%10ju|%10ju|", path_name(p), "", "", + s->counter.dropped, s->counter.skipped, s->counter.invalid); +} + +void stats_print(struct stats *s) +{ + stats("Dropped samples: %ju", s->counter.dropped); + stats("Skipped samples: %ju", s->counter.skipped); + stats("Invalid samples: %ju", s->counter.invalid); + + stats("One-way delay:"); + hist_print(&s->histogram.owd); + + stats("Inter-message arrival time:"); + hist_print(&s->histogram.gap_recv); + + stats("Inter-message ts gap:"); + hist_print(&s->histogram.gap_msg); + + stats("Inter-message sequence number gaps:"); + hist_print(&s->histogram.gap_seq); +} + +void stats_send(struct stats *s, struct node *n) +{ + char buf[SAMPLE_LEN(16)]; + struct sample *smp = (struct sample *) buf; + + int i = 0; + smp->data[i++].f = s->counter.invalid; /**< Use integer here? */ + smp->data[i++].f = s->counter.skipped; + smp->data[i++].f = s->counter.dropped; + smp->data[i++].f = s->histogram.owd.last, + smp->data[i++].f = 1.0 / s->histogram.gap_msg.last; + smp->data[i++].f = 1.0 / s->histogram.gap_recv.last; + smp->length = i; + + node_write(n, &smp, 1); /* Send single message with statistics to destination node */ +} \ No newline at end of file