1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

entangle hook and statistic collection system

This commit is contained in:
Steffen Vogel 2016-10-22 20:42:05 -04:00
parent bee825ae30
commit a9363dad44
8 changed files with 376 additions and 169 deletions

View file

@ -50,17 +50,33 @@ struct settings;
/** This is a list of hooks which can be used in the configuration file. */
extern struct list hooks;
/** Optional parameters to hook callbacks */
struct hook_info {
struct node *node;
struct path *path;
struct sample **smps;
size_t cnt;
struct list *paths;
struct list *nodes;
struct settings *settings;
};
/** Callback type of hook function
*
* @param p The path which is processing this message.
* @param h The hook datastructure which contains parameter, name and private context for the hook.
* @param m A pointer to the first message which should be processed by the hook.
* @param cnt The number of messages which should be processed by the hook.
* @param when Provides the type of hook for which this occurence of the callback function was executed. See hook_type for possible values.
* @param i The hook_info structure contains references to the current node, path or samples. Some fields of this structure can be NULL.
* @retval 0 Success. Continue processing and forwarding the message.
* @retval <0 Error. Drop the message.
*/
typedef int (*hook_cb_t)(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt);
typedef int (*hook_cb_t)(struct hook *h, int when, struct hook_info *i);
enum hook_state {
HOOK_DESTROYED,
HOOK_INITIALIZED
};
/** The type of a hook defines when a hook will be exectuted. This is used as a bitmask. */
enum hook_type {
@ -74,15 +90,15 @@ enum hook_type {
HOOK_ASYNC = 1 << 7, /**< Called asynchronously with fixed rate (see path::rate). */
HOOK_PERIODIC = 1 << 8, /**< Called periodically. Period is set by global 'stats' option in the configuration file. */
HOOK_INIT = 1 << 9, /**< Called before path is started to parse parameters. */
HOOK_DEINIT = 1 << 10, /**< Called after path has been stopped to release memory allocated by HOOK_INIT */
HOOK_INIT = 1 << 9, /**< Called before path is started to parseHOOK_DESTROYs. */
HOOK_DESTROY = 1 << 10, /**< Called after path has been stopped to release memory allocated by HOOK_INIT */
HOOK_INTERNAL = 1 << 11, /**< Internal hooks are added to every path implicitely. */
HOOK_PARSE = 1 << 12, /**< Called for parsing hook arguments. */
/** @{ Classes of hooks */
/** Hooks which are using private data must allocate and free them propery. */
HOOK_STORAGE = HOOK_INIT | HOOK_DEINIT,
HOOK_STORAGE = HOOK_INIT | HOOK_DESTROY,
/** All path related actions */
HOOK_PATH = HOOK_PATH_START | HOOK_PATH_STOP | HOOK_PATH_RESTART,
/** Hooks which are used to collect statistics. */
@ -111,8 +127,11 @@ struct hook {
hook_cb_t cb; /**< The hook callback function as a function pointer. */
};
/** Save references to global nodes, paths and settings */
void hook_init(struct list *nodes, struct list *paths, struct settings *set);
int hook_init(struct hook *h, struct list *nodes, struct list *paths, struct settings *settings);
void hook_destroy(struct hook *h);
int hook_copy(struct hook *h, struct hook *c);
/** Sort hook list according to the their priority. See hook::priority. */
int hooks_sort_priority(const void *a, const void *b);
@ -139,18 +158,17 @@ int hook_run(struct path *p, struct sample *smps[], size_t cnt, int when);
*/
void * hook_storage(struct hook *h, int when, size_t len);
int hook_print(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt);
int hook_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt);
int hook_convert(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt);
int hook_decimate(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt);
int hook_skip_first(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt);
int hook_print(struct hook *h, int when, struct hook_info *j);
int hook_ts(struct hook *h, int when, struct hook_info *j);
int hook_convert(struct hook *h, int when, struct hook_info *j);
int hook_decimate(struct hook *h, int when, struct hook_info *j);
int hook_skip_first(struct hook *h, int when, struct hook_info *j);
int hook_stats_send(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt);
int hook_stats(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt);
void hook_stats_header();
int hook_stats_send(struct hook *h, int when, struct hook_info *j);
int hook_stats(struct hook *h, int when, struct hook_info *j);
int hook_fix_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt);
int hook_restart(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt);
int hook_drop(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt);
int hook_fix_ts(struct hook *h, int when, struct hook_info *j);
int hook_restart(struct hook *h, int when, struct hook_info *j);
int hook_drop(struct hook *h, int when, struct hook_info *j);
#endif /** _HOOKS_H_ @} */

63
include/villas/stats.h Normal file
View file

@ -0,0 +1,63 @@
/** Statistic collection.
*
* @file
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
* This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*/
#ifndef _STATS_H_
#define _STATS_H_
#include <stdint.h>
#ifdef WITH_JANSSON
#include <jansson.h>
#endif
#include "hist.h"
/* Forward declarations */
struct sample;
struct path;
struct node;
struct stats {
struct {
uintmax_t invalid; /**< Counter for invalid messages */
uintmax_t skipped; /**< Counter for skipped messages due to hooks */
uintmax_t dropped; /**< Counter for dropped messages due to reordering */
} counter;
struct {
struct hist owd; /**< Histogram for one-way-delay (OWD) of received messages */
struct hist gap_msg; /**< Histogram for inter message timestamps (as sent by remote) */
struct hist gap_recv; /**< Histogram for inter message arrival time (as seen by this instance) */
struct hist gap_seq; /**< Histogram of sequence number displacement of received messages */
} histogram;
struct sample *last;
};
int stats_init(struct stats *s);
void stats_destroy(struct stats *s);
void stats_collect(struct stats *s, struct sample *smps[], size_t cnt);
#ifdef WITH_JANSSON
json_t * stats_json(struct stats *s);
#endif
void stats_reset(struct stats *s);
void stats_print_header();
void stats_print_periodic(struct stats *s, struct path *p);
void stats_print(struct stats *s);
void stats_send(struct stats *s, struct node *n);
#endif /* _STATS_H_ */

View file

@ -6,7 +6,7 @@ LIB_SRCS = $(addprefix lib/nodes/, file.c cbuilder.c) \
$(addprefix lib/kernel/, kernel.c rt.c) \
$(addprefix lib/, sample.c path.c node.c hooks.c \
log.c utils.c cfg.c hist.c timing.c pool.c list.c \
queue.c memory.c \
queue.c memory.c stats.c \
) \
$(wildcard lib/hooks/*.c) \

View file

@ -19,16 +19,32 @@
struct list hooks;
/* Those references can be used inside the hook callbacks after initializing them with hook_init() */
struct list *hook_nodes = NULL;
struct list *hook_paths = NULL;
struct settings *hook_settings = NULL;
void hook_init(struct list *nodes, struct list *paths, struct settings *set)
int hook_init(struct hook *h, struct list *nodes, struct list *paths, struct settings *settings)
{
hook_nodes = nodes;
hook_paths = paths;
hook_settings = set;
struct hook_info i = {
.paths = paths,
.nodes = nodes,
.settings = settings
};
return h->cb(h, HOOK_INIT, &i);
}
void hook_destroy(struct hook *h)
{
struct hook_info i = { NULL };
h->cb(h, HOOK_DESTROY, &i);
}
int hook_copy(struct hook *h, struct hook *c)
{
memcpy(c, h, sizeof(struct hook));
c->_vd =
c->prev =
c->last = NULL;
return 0;
}
int hooks_sort_priority(const void *a, const void *b) {
@ -40,11 +56,17 @@ int hooks_sort_priority(const void *a, const void *b) {
int hook_run(struct path *p, struct sample *smps[], size_t cnt, int when)
{
struct hook_info i = {
.path = p,
.smps = smps,
.cnt = cnt
};
list_foreach(struct hook *h, &p->hooks) {
if (h->type & when) {
debug(DBG_HOOK | 22, "Running hook when=%u '%s' prio=%u, cnt=%zu", when, h->name, h->priority, cnt);
cnt = h->cb(p, h, when, smps, cnt);
cnt = h->cb(h, when, &i);
if (cnt == 0)
break;
}
@ -60,7 +82,7 @@ void * hook_storage(struct hook *h, int when, size_t len)
h->_vd = alloc(len);
break;
case HOOK_DEINIT:
case HOOK_DESTROY:
free(h->_vd);
h->_vd = NULL;
break;

View file

@ -13,75 +13,78 @@
#include "utils.h"
REGISTER_HOOK("fix_ts", "Update timestamps of sample if not set", 0, 0, hook_fix_ts, HOOK_INTERNAL | HOOK_READ)
int hook_fix_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt)
int hook_fix_ts(struct hook *h, int when, struct hook_info *j)
{
struct timespec now = time_now();
assert(j->smps);
for (int i = 0; i < cnt; i++) {
for (int i = 0; i < j->cnt; i++) {
/* Check for missing receive timestamp
* Usually node_type::read() should update the receive timestamp.
* An example would be to use hardware timestamp capabilities of
* modern NICs.
*/
if ((smps[i]->ts.received.tv_sec == 0 && smps[i]->ts.received.tv_nsec == 0) ||
(smps[i]->ts.received.tv_sec == -1 && smps[i]->ts.received.tv_nsec == -1))
smps[i]->ts.received = now;
if ((j->smps[i]->ts.received.tv_sec == 0 && j->smps[i]->ts.received.tv_nsec == 0) ||
(j->smps[i]->ts.received.tv_sec == -1 && j->smps[i]->ts.received.tv_nsec == -1))
j->smps[i]->ts.received = now;
/* Check for missing origin timestamp */
if ((smps[i]->ts.origin.tv_sec == 0 && smps[i]->ts.origin.tv_nsec == 0) ||
(smps[i]->ts.origin.tv_sec == -1 && smps[i]->ts.origin.tv_nsec == -1))
smps[i]->ts.origin = now;
if ((j->smps[i]->ts.origin.tv_sec == 0 && j->smps[i]->ts.origin.tv_nsec == 0) ||
(j->smps[i]->ts.origin.tv_sec == -1 && j->smps[i]->ts.origin.tv_nsec == -1))
j->smps[i]->ts.origin = now;
}
return cnt;
return j->cnt;
}
REGISTER_HOOK("restart", "Call restart hooks for current path", 1, 1, hook_restart, HOOK_INTERNAL | HOOK_READ)
int hook_restart(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt)
int hook_restart(struct hook *h, int when, struct hook_info *j)
{
for (int i = 0; i < cnt; i++) {
h->last = smps[i];
assert(j->smps);
assert(j->path);
for (int i = 0; i < j->cnt; i++) {
h->last = j->smps[i];
if (h->prev) {
if (h->last->sequence == 0 &&
h->prev->sequence <= UINT32_MAX - 32) {
warn("Simulation for path %s restarted (prev->seq=%u, current->seq=%u)",
path_name(p), h->prev->sequence, h->last->sequence);
path_name(j->path), h->prev->sequence, h->last->sequence);
p->invalid =
p->skipped =
p->dropped = 0;
hook_run(p, &smps[i], cnt - i, HOOK_PATH_RESTART);
hook_run(j->path, &j->smps[i], j->cnt - i, HOOK_PATH_RESTART);
}
}
h->prev = h->last;
}
return cnt;
return j->cnt;
}
REGISTER_HOOK("drop", "Drop messages with reordered sequence numbers", 3, 1, hook_drop, HOOK_INTERNAL | HOOK_READ)
int hook_drop(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt)
int hook_drop(struct hook *h, int when, struct hook_info *j)
{
int i, ok, dist;
assert(j->smps);
for (i = 0, ok = 0; i < cnt; i++) {
h->last = smps[i];
for (i = 0, ok = 0; i < j->cnt; i++) {
h->last = j->smps[i];
if (h->prev) {
dist = h->last->sequence - (int32_t) h->prev->sequence;
if (dist <= 0) {
p->dropped++;
j->path->stats->counter.dropped++;
warn("Dropped sample: dist = %d, i = %d", dist, i);
}
else {
struct sample *tmp;
tmp = smps[i];
smps[i] = smps[ok];
smps[ok++] = tmp;
tmp = j->smps[i];
j->smps[i] = j->smps[ok];
j->smps[ok++] = tmp;
}
/* To discard the first X samples in 'smps[]' we must
@ -93,11 +96,11 @@ int hook_drop(struct path *p, struct hook *h, int when, struct sample *smps[], s
else {
struct sample *tmp;
tmp = smps[i];
smps[i] = smps[ok];
smps[ok++] = tmp;
tmp = j->smps[i];
j->smps[i] = j->smps[ok];
j->smps[ok++] = tmp;
}
h->prev = h->last;
}

View file

@ -17,25 +17,29 @@
#include "sample.h"
REGISTER_HOOK("print", "Print the message to stdout", 99, 0, hook_print, HOOK_READ)
int hook_print(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt)
int hook_print(struct hook *h, int when, struct hook_info *j)
{
for (int i = 0; i < cnt; i++)
sample_fprint(stdout, smps[i], SAMPLE_ALL);
assert(j->smps);
for (int i = 0; i < j->cnt; i++)
sample_fprint(stdout, j->smps[i], SAMPLE_ALL);
return cnt;
return j->cnt;
}
REGISTER_HOOK("ts", "Update timestamp of message with current time", 99, 0, hook_ts, HOOK_READ)
int hook_ts(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt)
int hook_ts(struct hook *h, int when, struct hook_info *j)
{
for (int i = 0; i < cnt; i++)
smps[i]->ts.origin = smps[i]->ts.received;
assert(j->smps);
return cnt;
for (int i = 0; i < j->cnt; i++)
j->smps[i]->ts.origin = j->smps[i]->ts.received;
return j->cnt;
}
REGISTER_HOOK("convert", "Convert message from / to floating-point / integer", 99, 0, hook_convert, HOOK_STORAGE | HOOK_PARSE | HOOK_READ)
int hook_convert(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt)
REGISTER_HOOK("convert", "Convert message from / to floating-point / integer", 99, 0, hook_convert, HOOK_STORAGE | HOOK_DESTROY | HOOK_READ)
int hook_convert(struct hook *h, int when, struct hook_info *k)
{
struct {
enum {
@ -45,7 +49,7 @@ int hook_convert(struct path *p, struct hook *h, int when, struct sample *smps[]
} *private = hook_storage(h, when, sizeof(*private));
switch (when) {
case HOOK_PARSE:
case HOOK_DESTROY:
if (!h->parameter)
error("Missing parameter for hook: '%s'", h->name);
@ -58,30 +62,29 @@ int hook_convert(struct path *p, struct hook *h, int when, struct sample *smps[]
break;
case HOOK_READ:
for (int i = 0; i < cnt; i++) {
for (int j = 0; j < smps[0]->length; j++) {
for (int i = 0; i < k->cnt; i++) {
for (int j = 0; j < k->smps[0]->length; j++) {
switch (private->mode) {
case TO_FIXED: smps[i]->data[j].i = smps[i]->data[j].f * 1e3; break;
case TO_FLOAT: smps[i]->data[j].f = smps[i]->data[j].i; break;
case TO_FIXED: k->smps[i]->data[j].i = k->smps[i]->data[j].f * 1e3; break;
case TO_FLOAT: k->smps[i]->data[j].f = k->smps[i]->data[j].i; break;
}
}
}
break;
return k->cnt;
}
return 0;
}
REGISTER_HOOK("decimate", "Downsamping by integer factor", 99, 0, hook_decimate, HOOK_STORAGE | HOOK_PARSE | HOOK_READ)
int hook_decimate(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt)
REGISTER_HOOK("decimate", "Downsamping by integer factor", 99, 0, hook_decimate, HOOK_STORAGE | HOOK_DESTROY | HOOK_READ)
int hook_decimate(struct hook *h, int when, struct hook_info *j)
{
struct {
unsigned ratio;
unsigned counter;
} *private = hook_storage(h, when, sizeof(*private));
int ok;
switch (when) {
case HOOK_PARSE:
if (!h->parameter)
@ -95,24 +98,27 @@ int hook_decimate(struct path *p, struct hook *h, int when, struct sample *smps[
break;
case HOOK_READ:
ok = 0;
for (int i = 0; i < cnt; i++) {
assert(j->smps);
int i, ok;
for (i = 0, ok = 0; i < j->cnt; i++) {
if (private->counter++ % private->ratio == 0) {
struct sample *tmp;
tmp = smps[ok];
smps[ok++] = smps[i];
smps[i] = tmp;
tmp = j->smps[ok];
j->smps[ok++] = j->smps[i];
j->smps[i] = tmp;
}
}
return ok;
}
return cnt;
return 0;
}
REGISTER_HOOK("skip_first", "Skip the first samples", 99, 0, hook_skip_first, HOOK_STORAGE | HOOK_PARSE | HOOK_READ | HOOK_PATH)
int hook_skip_first(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt)
int hook_skip_first(struct hook *h, int when, struct hook_info *j)
{
struct {
struct timespec skip; /**< Time to wait until first message is not skipped */
@ -121,7 +127,6 @@ int hook_skip_first(struct path *p, struct hook *h, int when, struct sample *smp
char *endptr;
double wait;
int i, ok;
switch (when) {
case HOOK_PARSE:
@ -135,19 +140,22 @@ int hook_skip_first(struct path *p, struct hook *h, int when, struct sample *smp
private->skip = time_from_double(wait);
break;
case HOOK_PATH_START:
case HOOK_PATH_RESTART:
case HOOK_PATH_STOP:
private->until = time_add(&smps[0]->ts.received, &private->skip);
private->until = time_add(&j->smps[0]->ts.received, &private->skip);
break;
case HOOK_READ:
for (i = 0, ok = 0; i < cnt; i++) {
if (time_delta(&private->until, &smps[i]->ts.received) > 0) {
assert(j->smps);
int i, ok;
for (i = 0, ok = 0; i < j->cnt; i++) {
if (time_delta(&private->until, &j->smps[i]->ts.received) > 0) {
struct sample *tmp;
tmp = smps[i];
smps[i] = smps[ok];
smps[ok++] = tmp;
tmp = j->smps[i];
j->smps[i] = j->smps[ok];
j->smps[ok++] = tmp;
}

View file

@ -10,88 +10,51 @@
#include "sample.h"
#include "path.h"
#include "utils.h"
#include "timing.h"
#include "stats.h"
extern struct list *hook_nodes;
void hook_stats_header()
{
#define UNIT(u) "(" YEL(u) ")"
stats("%-40s|%19s|%19s|%19s|%19s|%19s|", "Source " MAG("=>") " Destination",
"OWD" UNIT("S") " ",
"Rate" UNIT("p/S") " ",
"Drop" UNIT("p") " ",
"Skip" UNIT("p") " ",
"Inval" UNIT("p") " "
);
line();
}
REGISTER_HOOK("stats", "Collect statistics for the current path", 2, 1, hook_stats, HOOK_STATS)
int hook_stats(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt)
int hook_stats(struct hook *h, int when, struct hook_info *j)
{
struct stats *s = hook_storage(h, when, sizeof(struct stats));
switch (when) {
case HOOK_INIT:
/** @todo Allow configurable bounds for histograms */
hist_create(&p->hist.owd, 0, 1, 100e-3);
hist_create(&p->hist.gap_msg, 90e-3, 110e-3, 1e-3);
hist_create(&p->hist.gap_recv, 90e-3, 110e-3, 1e-3);
hist_create(&p->hist.gap_seq, -HIST_SEQ, +HIST_SEQ, 1);
stats_init(s);
break;
case HOOK_READ:
for (int i = 0; i < cnt; i++) {
h->last = smps[i];
if (h->prev) {
int gap_seq = h->last->sequence - (int32_t) h->prev->sequence;
double owd = time_delta(&h->last->ts.origin, &h->last->ts.received);
double gap = time_delta(&h->prev->ts.origin, &h->last->ts.origin);
double gap_recv = time_delta(&h->prev->ts.received, &h->last->ts.received);
hist_put(&p->hist.gap_msg, gap);
hist_put(&p->hist.gap_seq, gap_seq);
hist_put(&p->hist.owd, owd);
hist_put(&p->hist.gap_recv, gap_recv);
}
h->prev = h->last;
}
assert(j->smps);
stats_collect(s, j->smps, j->cnt);
break;
case HOOK_PATH_STOP:
if (p->hist.owd.total) { info("One-way delay:"); hist_print(&p->hist.owd); }
if (p->hist.gap_recv.total){ info("Inter-message arrival time:"); hist_print(&p->hist.gap_recv); }
if (p->hist.gap_msg.total) { info("Inter-message ts gap:"); hist_print(&p->hist.gap_msg); }
if (p->hist.gap_seq.total) { info("Inter-message sequence number gaps:"); hist_print(&p->hist.gap_seq); }
stats_print(s);
break;
case HOOK_DEINIT:
hist_destroy(&p->hist.owd);
hist_destroy(&p->hist.gap_msg);
hist_destroy(&p->hist.gap_recv);
hist_destroy(&p->hist.gap_seq);
case HOOK_DESTROY:
stats_destroy(s);
break;
case HOOK_PATH_RESTART:
hist_reset(&p->hist.owd);
hist_reset(&p->hist.gap_seq);
hist_reset(&p->hist.gap_msg);
hist_reset(&p->hist.gap_recv);
stats_reset(s);
break;
case HOOK_PERIODIC:
stats("%-40.40s|%10s|%10s|%10ju|%10ju|%10ju|", path_name(p), "", "",
p->dropped, p->skipped, p->invalid);
assert(j->path);
stats_print_periodic(s, j->path);
break;
}
return cnt;
return j->cnt;
}
REGISTER_HOOK("stats_send", "Send path statistics to another node", 99, 0, hook_stats_send, HOOK_STORAGE | HOOK_PARSE | HOOK_PERIODIC | HOOK_PATH)
int hook_stats_send(struct path *p, struct hook *h, int when, struct sample *smps[], size_t cnt)
#if 0 /* currently broken */
REGISTER_HOOK("stats_send", "Send path statistics to another node", 99, 0, hook_stats_send, HOOK_STORAGE | HOOK_DESTROY | HOOK_PERIODIC | HOOK_PATH)
int hook_stats_send(struct hook *h, int when, struct hook_info *i)
{
struct private {
struct node *dest;
@ -99,7 +62,7 @@ int hook_stats_send(struct path *p, struct hook *h, int when, struct sample *smp
} *private = hook_storage(h, when, sizeof(*private));
switch (when) {
case HOOK_PARSE:
case HOOK_DESTROY:
if (!h->parameter)
error("Missing parameter for hook '%s'", h->name);
@ -115,23 +78,11 @@ int hook_stats_send(struct path *p, struct hook *h, int when, struct sample *smp
break;
case HOOK_PERIODIC: {
int i;
char buf[SAMPLE_LEN(16)];
struct sample *smp = (struct sample *) buf;
i = 0;
smp->data[i++].f = p->invalid;
smp->data[i++].f = p->skipped;
smp->data[i++].f = p->dropped;
smp->data[i++].f = p->hist.owd.last,
smp->data[i++].f = 1.0 / p->hist.gap_msg.last;
smp->data[i++].f = 1.0 / p->hist.gap_recv.last;
smp->length = i;
node_write(private->dest, &smp, 1); /* Send single message with statistics to destination node */
stats_send(s, node);
break;
}
}
return 0;
}
}
#endif

142
lib/stats.c Normal file
View file

@ -0,0 +1,142 @@
/** Statistic collection.
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
* This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*/
#include "stats.h"
#include "hist.h"
#include "timing.h"
#include "path.h"
#include "sample.h"
#include "log.h"
int stats_init(struct stats *s)
{
/** @todo Allow configurable bounds for histograms */
hist_create(&s->histogram.owd, 0, 1, 100e-3);
hist_create(&s->histogram.gap_msg, 90e-3, 110e-3, 1e-3);
hist_create(&s->histogram.gap_recv, 90e-3, 110e-3, 1e-3);
hist_create(&s->histogram.gap_seq, -HIST_SEQ, +HIST_SEQ, 1);
return 0;
}
void stats_destroy(struct stats *s)
{
hist_destroy(&s->histogram.owd);
hist_destroy(&s->histogram.gap_msg);
hist_destroy(&s->histogram.gap_recv);
hist_destroy(&s->histogram.gap_seq);
}
void stats_collect(struct stats *s, struct sample *smps[], size_t cnt)
{
for (int i = 0; i < cnt; i++) {
if (s->last) {
int gap_seq = smps[i]->sequence - (int32_t) s->last->sequence;
double owd = time_delta(&smps[i]->ts.origin, &smps[i]->ts.received);
double gap = time_delta(&s->last->ts.origin, &smps[i]->ts.origin);
double gap_recv = time_delta(&s->last->ts.received, &smps[i]->ts.received);
hist_put(&s->histogram.owd, owd);
hist_put(&s->histogram.gap_msg, gap);
hist_put(&s->histogram.gap_seq, gap_seq);
hist_put(&s->histogram.gap_recv, gap_recv);
}
if (i == 0 && s->last)
sample_put(s->last);
if (i == cnt - 1)
sample_get(smps[i]);
s->last = smps[i];
}
}
#ifdef WITH_JANSSON
json_t * stats_json(struct stats *s)
{
return json_pack("{ s: { s: i, s: i, s: i }, s: { s: o, s: o, s: o } }",
"counter",
"dropped", s->counter.dropped,
"invalid", s->counter.invalid,
"skipped", s->counter.skipped,
"histogram",
"owd", hist_json(&s->histogram.owd),
"gap_msg", hist_json(&s->histogram.gap_msg),
"gap_recv",hist_json(&s->histogram.gap_recv),
"gap_seq", hist_json(&s->histogram.gap_seq)
);
}
#endif
void stats_reset(struct stats *s)
{
s->counter.invalid =
s->counter.skipped =
s->counter.dropped = 0;
hist_reset(&s->histogram.owd);
hist_reset(&s->histogram.gap_seq);
hist_reset(&s->histogram.gap_msg);
hist_reset(&s->histogram.gap_recv);
}
void stats_print_header()
{
#define UNIT(u) "(" YEL(u) ")"
stats("%-40s|%19s|%19s|%19s|%19s|%19s|", "Source " MAG("=>") " Destination",
"OWD" UNIT("S") " ",
"Rate" UNIT("p/S") " ",
"Drop" UNIT("p") " ",
"Skip" UNIT("p") " ",
"Inval" UNIT("p") " "
);
line();
}
void stats_print_periodic(struct stats *s, struct path *p)
{
stats("%-40.40s|%10s|%10s|%10ju|%10ju|%10ju|", path_name(p), "", "",
s->counter.dropped, s->counter.skipped, s->counter.invalid);
}
void stats_print(struct stats *s)
{
stats("Dropped samples: %ju", s->counter.dropped);
stats("Skipped samples: %ju", s->counter.skipped);
stats("Invalid samples: %ju", s->counter.invalid);
stats("One-way delay:");
hist_print(&s->histogram.owd);
stats("Inter-message arrival time:");
hist_print(&s->histogram.gap_recv);
stats("Inter-message ts gap:");
hist_print(&s->histogram.gap_msg);
stats("Inter-message sequence number gaps:");
hist_print(&s->histogram.gap_seq);
}
void stats_send(struct stats *s, struct node *n)
{
char buf[SAMPLE_LEN(16)];
struct sample *smp = (struct sample *) buf;
int i = 0;
smp->data[i++].f = s->counter.invalid; /**< Use integer here? */
smp->data[i++].f = s->counter.skipped;
smp->data[i++].f = s->counter.dropped;
smp->data[i++].f = s->histogram.owd.last,
smp->data[i++].f = 1.0 / s->histogram.gap_msg.last;
smp->data[i++].f = 1.0 / s->histogram.gap_recv.last;
smp->length = i;
node_write(n, &smp, 1); /* Send single message with statistics to destination node */
}