mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
stats: refactor stats system and allow for more flexible configuration of stats node
This commit is contained in:
parent
b1028e9829
commit
2936cd3ddc
11 changed files with 293 additions and 252 deletions
|
@ -74,7 +74,7 @@ struct mapping_entry {
|
|||
} data;
|
||||
|
||||
struct {
|
||||
enum stats_id id;
|
||||
enum stats_metric metric;
|
||||
enum stats_type type;
|
||||
} stats;
|
||||
|
||||
|
|
|
@ -31,7 +31,9 @@
|
|||
|
||||
#include <jansson.h>
|
||||
|
||||
#include <villas/stats.h>
|
||||
#include <villas/task.h>
|
||||
#include <villas/list.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -42,13 +44,20 @@ struct node;
|
|||
struct sample;
|
||||
struct super_node;
|
||||
|
||||
struct stats_node_signal {
|
||||
struct node *node;
|
||||
char *node_str;
|
||||
|
||||
enum stats_metric metric;
|
||||
enum stats_type type;
|
||||
};
|
||||
|
||||
struct stats_node {
|
||||
double rate;
|
||||
char *node_str;
|
||||
|
||||
struct task task;
|
||||
|
||||
struct node *node;
|
||||
struct vlist signals; /** List of type struct stats_node_signal */
|
||||
};
|
||||
|
||||
/** @see node_type::print */
|
||||
|
@ -60,6 +69,8 @@ char *stats_node_print(struct node *n);
|
|||
/** @see node_type::parse */
|
||||
int stats_node_parse(struct node *n, json_t *cfg);
|
||||
|
||||
int stats_node_parse_signal(struct stats_node_signal *s, json_t *cfg);
|
||||
|
||||
/** @see node_type::start */
|
||||
int stats_node_start(struct node *n);
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include <jansson.h>
|
||||
|
||||
#include <villas/hist.h>
|
||||
#include <villas/signal.h>
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -42,13 +43,13 @@ enum stats_format {
|
|||
STATS_FORMAT_MATLAB
|
||||
};
|
||||
|
||||
enum stats_id {
|
||||
STATS_SKIPPED, /**< Counter for skipped samples due to hooks. */
|
||||
STATS_REORDERED, /**< Counter for reordered samples. */
|
||||
STATS_GAP_SAMPLE, /**< Histogram for inter sample timestamps (as sent by remote). */
|
||||
STATS_GAP_RECEIVED, /**< Histogram for inter sample arrival time (as seen by this instance). */
|
||||
STATS_OWD, /**< Histogram for one-way-delay (OWD) of received samples. */
|
||||
STATS_COUNT /**< Just here to have an updated number of statistics. */
|
||||
enum stats_metric {
|
||||
STATS_METRIC_SKIPPED, /**< Counter for skipped samples due to hooks. */
|
||||
STATS_METRIC_REORDERED, /**< Counter for reordered samples. */
|
||||
STATS_METRIC_GAP_SAMPLE, /**< Histogram for inter sample timestamps (as sent by remote). */
|
||||
STATS_METRIC_GAP_RECEIVED, /**< Histogram for inter sample arrival time (as seen by this instance). */
|
||||
STATS_METRIC_OWD, /**< Histogram for one-way-delay (OWD) of received samples. */
|
||||
STATS_METRIC_COUNT /**< Just here to have an updated number of statistics. */
|
||||
};
|
||||
|
||||
enum stats_type {
|
||||
|
@ -58,38 +59,50 @@ enum stats_type {
|
|||
STATS_TYPE_MEAN,
|
||||
STATS_TYPE_VAR,
|
||||
STATS_TYPE_STDDEV,
|
||||
STATS_TYPE_TOTAL
|
||||
STATS_TYPE_TOTAL,
|
||||
STATS_TYPE_COUNT
|
||||
};
|
||||
|
||||
struct stats_desc {
|
||||
struct stats_metric_description {
|
||||
const char *name;
|
||||
enum stats_metric metric;
|
||||
const char *unit;
|
||||
const char *desc;
|
||||
int hist_buckets;
|
||||
};
|
||||
|
||||
struct stats_delta {
|
||||
double values[STATS_COUNT];
|
||||
struct stats_type_description {
|
||||
const char *name;
|
||||
enum stats_type type;
|
||||
enum signal_type signal_type;
|
||||
};
|
||||
|
||||
int update; /**< Bitmask of stats_id. Only those which are masked will be updated */
|
||||
struct stats_delta {
|
||||
double values[STATS_METRIC_COUNT];
|
||||
|
||||
int update; /**< Bitmask of stats_metric. Only those which are masked will be updated */
|
||||
};
|
||||
|
||||
struct stats {
|
||||
struct hist histograms[STATS_COUNT];
|
||||
struct hist histograms[STATS_METRIC_COUNT];
|
||||
|
||||
struct stats_delta *delta;
|
||||
};
|
||||
|
||||
extern
|
||||
struct stats_desc stats_metrics[];
|
||||
extern struct stats_metric_description stats_metrics[];
|
||||
extern struct stats_type_description stats_types[];
|
||||
|
||||
int stats_lookup_format(const char *str);
|
||||
|
||||
enum stats_metric stats_lookup_metric(const char *str);
|
||||
|
||||
enum stats_type stats_lookup_type(const char *str);
|
||||
|
||||
int stats_init(struct stats *s, int buckets, int warmup);
|
||||
|
||||
int stats_destroy(struct stats *s);
|
||||
|
||||
void stats_update(struct stats *s, enum stats_id id, double val);
|
||||
void stats_update(struct stats *s, enum stats_metric id, double val);
|
||||
|
||||
void stats_collect(struct stats *s, struct sample *smps[], size_t *cnt);
|
||||
|
||||
|
@ -105,7 +118,7 @@ void stats_print_periodic(struct stats *s, FILE *f, enum stats_format fmt, int v
|
|||
|
||||
void stats_print(struct stats *s, FILE *f, enum stats_format fmt, int verbose);
|
||||
|
||||
enum stats_id stats_lookup_id(const char *name);
|
||||
union signal_data stats_get_value(const struct stats *s, enum stats_metric sm, enum stats_type st);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -168,18 +168,18 @@ static int stats_collect_process(struct hook *h, struct sample *smps[], unsigned
|
|||
|
||||
if (previous) {
|
||||
if (current->flags & previous->flags & SAMPLE_HAS_TS_RECEIVED)
|
||||
stats_update(s, STATS_GAP_RECEIVED, time_delta(&previous->ts.received, ¤t->ts.received));
|
||||
stats_update(s, STATS_METRIC_GAP_RECEIVED, time_delta(&previous->ts.received, ¤t->ts.received));
|
||||
|
||||
if (current->flags & previous->flags & SAMPLE_HAS_TS_ORIGIN)
|
||||
stats_update(s, STATS_GAP_SAMPLE, time_delta(&previous->ts.origin, ¤t->ts.origin));
|
||||
stats_update(s, STATS_METRIC_GAP_SAMPLE, time_delta(&previous->ts.origin, ¤t->ts.origin));
|
||||
|
||||
if ((current->flags & SAMPLE_HAS_TS_ORIGIN) && (current->flags & SAMPLE_HAS_TS_RECEIVED))
|
||||
stats_update(s, STATS_OWD, time_delta(¤t->ts.origin, ¤t->ts.received));
|
||||
stats_update(s, STATS_METRIC_OWD, time_delta(¤t->ts.origin, ¤t->ts.received));
|
||||
|
||||
if (current->flags & previous->flags & SAMPLE_HAS_SEQUENCE) {
|
||||
dist = current->sequence - (int32_t) previous->sequence;
|
||||
if (dist != 1)
|
||||
stats_update(s, STATS_REORDERED, dist);
|
||||
stats_update(s, STATS_METRIC_REORDERED, dist);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
112
lib/mapping.c
112
lib/mapping.c
|
@ -32,8 +32,7 @@
|
|||
|
||||
int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *nodes)
|
||||
{
|
||||
int id;
|
||||
char *cpy, *node, *type, *field, *subfield, *end;
|
||||
char *cpy, *node, *type, *field, *end;
|
||||
|
||||
cpy = strdup(str);
|
||||
if (!cpy)
|
||||
|
@ -68,44 +67,21 @@ int mapping_parse_str(struct mapping_entry *me, const char *str, struct vlist *n
|
|||
me->type = MAPPING_TYPE_STATS;
|
||||
me->length = 1;
|
||||
|
||||
field = strtok(NULL, ".");
|
||||
if (!field) {
|
||||
warning("Missing stats type");
|
||||
char *metric = strtok(NULL, ".");
|
||||
if (!metric)
|
||||
goto invalid_format;
|
||||
}
|
||||
|
||||
subfield = strtok(NULL, ".");
|
||||
if (!subfield) {
|
||||
warning("Missing stats sub-type");
|
||||
type = strtok(NULL, ".");
|
||||
if (!type)
|
||||
goto invalid_format;
|
||||
}
|
||||
|
||||
id = stats_lookup_id(field);
|
||||
if (id < 0) {
|
||||
warning("Invalid stats type");
|
||||
me->stats.metric = stats_lookup_metric(metric);
|
||||
if (me->stats.metric < 0)
|
||||
goto invalid_format;
|
||||
}
|
||||
|
||||
me->stats.id = id;
|
||||
|
||||
if (!strcmp(subfield, "total"))
|
||||
me->stats.type = STATS_TYPE_TOTAL;
|
||||
else if (!strcmp(subfield, "last"))
|
||||
me->stats.type = STATS_TYPE_LAST;
|
||||
else if (!strcmp(subfield, "lowest"))
|
||||
me->stats.type = STATS_TYPE_LOWEST;
|
||||
else if (!strcmp(subfield, "highest"))
|
||||
me->stats.type = STATS_TYPE_HIGHEST;
|
||||
else if (!strcmp(subfield, "mean"))
|
||||
me->stats.type = STATS_TYPE_MEAN;
|
||||
else if (!strcmp(subfield, "var"))
|
||||
me->stats.type = STATS_TYPE_VAR;
|
||||
else if (!strcmp(subfield, "stddev"))
|
||||
me->stats.type = STATS_TYPE_STDDEV;
|
||||
else {
|
||||
warning("Invalid stats sub-type");
|
||||
me->stats.type = stats_lookup_type(type);
|
||||
if (me->stats.type < 0)
|
||||
goto invalid_format;
|
||||
}
|
||||
}
|
||||
else if (!strcmp(type, "hdr")) {
|
||||
me->type = MAPPING_TYPE_HEADER;
|
||||
|
@ -276,35 +252,9 @@ int mapping_update(const struct mapping_entry *me, struct sample *remapped, cons
|
|||
return -1;
|
||||
|
||||
switch (me->type) {
|
||||
case MAPPING_TYPE_STATS: {
|
||||
const struct hist *h = &s->histograms[me->stats.id];
|
||||
|
||||
switch (me->stats.type) {
|
||||
case STATS_TYPE_TOTAL:
|
||||
remapped->data[off++].i = h->total;
|
||||
break;
|
||||
case STATS_TYPE_LAST:
|
||||
remapped->data[off++].f = h->last;
|
||||
break;
|
||||
case STATS_TYPE_HIGHEST:
|
||||
remapped->data[off++].f = h->highest;
|
||||
break;
|
||||
case STATS_TYPE_LOWEST:
|
||||
remapped->data[off++].f = h->lowest;
|
||||
break;
|
||||
case STATS_TYPE_MEAN:
|
||||
remapped->data[off++].f = hist_mean(h);
|
||||
break;
|
||||
case STATS_TYPE_STDDEV:
|
||||
remapped->data[off++].f = hist_stddev(h);
|
||||
break;
|
||||
case STATS_TYPE_VAR:
|
||||
remapped->data[off++].f = hist_var(h);
|
||||
break;
|
||||
default:
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
case MAPPING_TYPE_STATS:
|
||||
remapped->data[off++] = stats_get_value(s, me->stats.metric, me->stats.type);
|
||||
break;
|
||||
|
||||
case MAPPING_TYPE_TIMESTAMP: {
|
||||
const struct timespec *ts;
|
||||
|
@ -380,40 +330,10 @@ int mapping_to_str(const struct mapping_entry *me, unsigned index, char **str)
|
|||
|
||||
switch (me->type) {
|
||||
case MAPPING_TYPE_STATS:
|
||||
switch (me->stats.type) {
|
||||
case STATS_TYPE_TOTAL:
|
||||
type = "total";
|
||||
break;
|
||||
|
||||
case STATS_TYPE_LAST:
|
||||
type = "last";
|
||||
break;
|
||||
|
||||
case STATS_TYPE_LOWEST:
|
||||
type = "lowest";
|
||||
break;
|
||||
|
||||
case STATS_TYPE_HIGHEST:
|
||||
type = "highest";
|
||||
break;
|
||||
|
||||
case STATS_TYPE_MEAN:
|
||||
type = "mean";
|
||||
break;
|
||||
|
||||
case STATS_TYPE_VAR:
|
||||
type = "var";
|
||||
break;
|
||||
|
||||
case STATS_TYPE_STDDEV:
|
||||
type = "stddev";
|
||||
break;
|
||||
|
||||
default:
|
||||
type = NULL;
|
||||
}
|
||||
|
||||
strcatf(str, "stats.%s", type);
|
||||
strcatf(str, "stats.%s.%s",
|
||||
stats_metrics[me->stats.metric].name,
|
||||
stats_types[me->stats.type].name
|
||||
);
|
||||
break;
|
||||
|
||||
case MAPPING_TYPE_HEADER:
|
||||
|
|
|
@ -578,7 +578,7 @@ int node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rel
|
|||
int skipped = nread - rread;
|
||||
|
||||
if (skipped > 0 && n->stats != NULL) {
|
||||
stats_update(n->stats, STATS_SKIPPED, skipped);
|
||||
stats_update(n->stats, STATS_METRIC_SKIPPED, skipped);
|
||||
}
|
||||
|
||||
debug(LOG_NODE | 5, "Received %u samples from node %s of which %d have been skipped", nread, node_name(n), skipped);
|
||||
|
|
|
@ -38,55 +38,57 @@
|
|||
|
||||
static struct vlist *nodes; /** The global list of nodes */
|
||||
|
||||
static void stats_init_signals(struct node *n)
|
||||
int stats_node_signal_destroy(struct stats_node_signal *s)
|
||||
{
|
||||
struct stats_desc *desc;
|
||||
struct signal *sig;
|
||||
free(s->node_str);
|
||||
|
||||
for (int i = 0; i < STATS_COUNT; i++) {
|
||||
desc = &stats_metrics[i];
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Total */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "total");
|
||||
sig->type = SIGNAL_TYPE_INTEGER;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
int stats_node_signal_parse(struct stats_node_signal *s, json_t *cfg)
|
||||
{
|
||||
json_error_t err;
|
||||
|
||||
/* Last */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "last");
|
||||
sig->unit = strdup(desc->unit);
|
||||
sig->type = SIGNAL_TYPE_FLOAT;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
int ret;
|
||||
const char *stats;
|
||||
char *metric, *type, *node, *cpy;
|
||||
|
||||
/* Highest */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "highest");
|
||||
sig->unit = strdup(desc->unit);
|
||||
sig->type = SIGNAL_TYPE_FLOAT;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s: s }",
|
||||
"stats", &stats
|
||||
);
|
||||
if (ret)
|
||||
return -1;
|
||||
|
||||
/* Lowest */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "lowest");
|
||||
sig->unit = strdup(desc->unit);
|
||||
sig->type = SIGNAL_TYPE_FLOAT;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
cpy = strdup(stats);
|
||||
|
||||
/* Mean */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "mean");
|
||||
sig->unit = strdup(desc->unit);
|
||||
sig->type = SIGNAL_TYPE_FLOAT;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
node = strtok(cpy, ".");
|
||||
if (!node)
|
||||
goto invalid_format;
|
||||
|
||||
/* Variance */
|
||||
sig = alloc(sizeof(struct signal));
|
||||
sig->name = strf("%s.%s", desc->name, "var");
|
||||
sig->unit = strf("%s^2", desc->unit); // variance has squared unit of variable
|
||||
sig->type = SIGNAL_TYPE_FLOAT;
|
||||
vlist_push(&n->in.signals, sig);
|
||||
}
|
||||
metric = strtok(NULL, ".");
|
||||
if (!metric)
|
||||
goto invalid_format;
|
||||
|
||||
type = strtok(NULL, ".");
|
||||
if (!type)
|
||||
goto invalid_format;
|
||||
|
||||
s->metric = stats_lookup_metric(metric);
|
||||
if (s->metric < 0)
|
||||
goto invalid_format;
|
||||
|
||||
s->type = stats_lookup_type(type);
|
||||
if (s->type < 0)
|
||||
goto invalid_format;
|
||||
|
||||
s->node_str = strdup(node);
|
||||
|
||||
free(cpy);
|
||||
return 0;
|
||||
|
||||
invalid_format:
|
||||
free(cpy);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int stats_node_type_start(struct super_node *sn)
|
||||
|
@ -105,9 +107,13 @@ int stats_node_start(struct node *n)
|
|||
if (ret)
|
||||
serror("Failed to create task");
|
||||
|
||||
s->node = vlist_lookup(nodes, s->node_str);
|
||||
if (!s->node)
|
||||
error("Invalid reference node %s for setting 'node' of node %s", s->node_str, node_name(n));
|
||||
for (size_t i = 0; i < vlist_length(&s->signals); i++) {
|
||||
struct stats_node_signal *stats_sig = (struct stats_node_signal *) vlist_at(&s->signals, i);
|
||||
|
||||
stats_sig->node = vlist_lookup(nodes, stats_sig->node_str);
|
||||
if (!stats_sig->node)
|
||||
error("Invalid reference node %s for setting 'node' of node %s", stats_sig->node_str, node_name(n));
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -128,7 +134,31 @@ char * stats_node_print(struct node *n)
|
|||
{
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
return strf("node=%s, rate=%f", s->node_str, s->rate);
|
||||
return strf("rate=%f", s->rate);
|
||||
}
|
||||
|
||||
int stats_node_init(struct node *n)
|
||||
{
|
||||
int ret;
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
ret = vlist_init(&s->signals);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_node_destroy(struct node *n)
|
||||
{
|
||||
int ret;
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
ret = vlist_destroy(&s->signals, (dtor_cb_t) stats_node_signal_destroy, true);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_node_parse(struct node *n, json_t *cfg)
|
||||
|
@ -136,13 +166,14 @@ int stats_node_parse(struct node *n, json_t *cfg)
|
|||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
int ret;
|
||||
size_t i;
|
||||
json_error_t err;
|
||||
json_t *json_signals, *json_signal;
|
||||
|
||||
const char *node;
|
||||
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s: s, s: F }",
|
||||
"node", &node,
|
||||
"rate", &s->rate
|
||||
ret = json_unpack_ex(cfg, &err, 0, "{ s: F, s: { s: o } }",
|
||||
"rate", &s->rate,
|
||||
"in",
|
||||
"signals", &json_signals
|
||||
);
|
||||
if (ret)
|
||||
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
|
||||
|
@ -150,50 +181,68 @@ int stats_node_parse(struct node *n, json_t *cfg)
|
|||
if (s->rate <= 0)
|
||||
error("Setting 'rate' of node %s must be positive", node_name(n));
|
||||
|
||||
s->node_str = strdup(node);
|
||||
if (!json_is_array(json_signals))
|
||||
error("Setting 'in.signals' of node %s must be an array", node_name(n));
|
||||
|
||||
stats_init_signals(n);
|
||||
json_array_foreach(json_signals, i, json_signal) {
|
||||
struct signal *sig = (struct signal *) vlist_at(&n->in.signals, i);
|
||||
struct stats_node_signal *stats_sig;
|
||||
|
||||
return 0;
|
||||
}
|
||||
stats_sig = alloc(sizeof(struct stats_node_signal));
|
||||
if (!stats_sig)
|
||||
return -1;
|
||||
|
||||
int stats_node_destroy(struct node *n)
|
||||
{
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
ret = stats_node_signal_parse(stats_sig, json_signal);
|
||||
if (ret)
|
||||
error("Failed to parse signal definition of node %s", node_name(n));
|
||||
|
||||
if (s->node_str)
|
||||
free(s->node_str);
|
||||
if (!sig->name) {
|
||||
const char *metric = stats_metrics[stats_sig->metric].name;
|
||||
const char *type = stats_types[stats_sig->type].name;
|
||||
|
||||
sig->name = strf("%s.%s.%s", stats_sig->node_str, metric, type);
|
||||
}
|
||||
|
||||
if (!sig->unit)
|
||||
sig->unit = strdup(stats_metrics[stats_sig->metric].unit);
|
||||
|
||||
if (sig->type == SIGNAL_TYPE_AUTO)
|
||||
sig->type = stats_types[stats_sig->type].signal_type;
|
||||
else if (sig->type != stats_types[stats_sig->type].signal_type)
|
||||
error("Invalid type for signal %zu in node %s", i, node_name(n));
|
||||
|
||||
vlist_push(&s->signals, stats_sig);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)
|
||||
{
|
||||
struct stats_node *sn = (struct stats_node *) n->_vd;
|
||||
struct stats *s = sn->node->stats;
|
||||
struct stats_node *s = (struct stats_node *) n->_vd;
|
||||
|
||||
if (!cnt)
|
||||
return 0;
|
||||
|
||||
if (!sn->node->stats)
|
||||
return -1;
|
||||
task_wait(&s->task);
|
||||
|
||||
task_wait(&sn->task);
|
||||
int len = MIN(vlist_length(&s->signals), smps[0]->capacity);
|
||||
|
||||
smps[0]->length = MIN(STATS_COUNT * 6, smps[0]->capacity);
|
||||
smps[0]->flags = SAMPLE_HAS_DATA;
|
||||
for (size_t i = 0; i < len; i++) {
|
||||
struct stats *st;
|
||||
struct stats_node_signal *sig = (struct stats_node_signal *) vlist_at(&s->signals, i);
|
||||
|
||||
for (int i = 0; i < 6 && (i+1)*STATS_METRICS <= smps[0]->length; i++) {
|
||||
int tot = hist_total(&s->histograms[i]);
|
||||
st = sig->node->stats;
|
||||
if (!st)
|
||||
return -1;
|
||||
|
||||
smps[0]->data[i*STATS_METRICS+0].f = tot ? hist_total(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*STATS_METRICS+1].f = tot ? hist_last(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*STATS_METRICS+2].f = tot ? hist_highest(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*STATS_METRICS+3].f = tot ? hist_lowest(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*STATS_METRICS+4].f = tot ? hist_mean(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*STATS_METRICS+5].f = tot ? hist_var(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i] = stats_get_value(st, sig->metric, sig->type);
|
||||
}
|
||||
|
||||
smps[0]->length = len;
|
||||
smps[0]->flags = SAMPLE_HAS_DATA;
|
||||
smps[0]->signals = &n->in.signals;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
@ -212,10 +261,11 @@ static struct plugin p = {
|
|||
.type = PLUGIN_TYPE_NODE,
|
||||
.node = {
|
||||
.vectorize = 1,
|
||||
.flags = NODE_TYPE_PROVIDES_SIGNALS,
|
||||
.flags = 0,
|
||||
.size = sizeof(struct stats_node),
|
||||
.type.start = stats_node_type_start,
|
||||
.parse = stats_node_parse,
|
||||
.init = stats_node_init,
|
||||
.destroy = stats_node_destroy,
|
||||
.print = stats_node_print,
|
||||
.start = stats_node_start,
|
||||
|
|
|
@ -261,8 +261,6 @@ static void * path_run_single(void *arg)
|
|||
struct path *p = arg;
|
||||
struct path_source *ps = (struct path_source *) vlist_at(&p->sources, 0);
|
||||
|
||||
debug(1, "Started path %s in single mode", path_name(p));
|
||||
|
||||
while (p->state == STATE_STARTED) {
|
||||
pthread_testcancel();
|
||||
|
||||
|
@ -286,8 +284,6 @@ static void * path_run_poll(void *arg)
|
|||
int ret;
|
||||
struct path *p = arg;
|
||||
|
||||
debug(1, "Started path %s in polling mode", path_name(p));
|
||||
|
||||
while (p->state == STATE_STARTED) {
|
||||
ret = poll(p->reader.pfds, p->reader.nfds, -1);
|
||||
if (ret < 0)
|
||||
|
|
15
lib/signal.c
15
lib/signal.c
|
@ -55,20 +55,7 @@ int signal_init_from_mapping(struct signal *s, const struct mapping_entry *me, u
|
|||
|
||||
switch (me->type) {
|
||||
case MAPPING_TYPE_STATS:
|
||||
switch (me->stats.type) {
|
||||
case STATS_TYPE_TOTAL:
|
||||
s->type = SIGNAL_TYPE_INTEGER;
|
||||
break;
|
||||
|
||||
case STATS_TYPE_LAST:
|
||||
case STATS_TYPE_LOWEST:
|
||||
case STATS_TYPE_HIGHEST:
|
||||
case STATS_TYPE_MEAN:
|
||||
case STATS_TYPE_VAR:
|
||||
case STATS_TYPE_STDDEV:
|
||||
s->type = SIGNAL_TYPE_FLOAT;
|
||||
break;
|
||||
}
|
||||
s->type = stats_types[me->stats.type].signal_type;
|
||||
break;
|
||||
|
||||
case MAPPING_TYPE_HEADER:
|
||||
|
|
132
lib/stats.c
132
lib/stats.c
|
@ -31,12 +31,22 @@
|
|||
#include <villas/node.h>
|
||||
#include <villas/table.h>
|
||||
|
||||
struct stats_desc stats_metrics[] = {
|
||||
{ "skipped", "samples", "Skipped samples and the distance between them", 25 },
|
||||
{ "reordered", "samples", "Reordered samples and the distance between them", 25 },
|
||||
{ "gap_sent", "seconds", "Inter-message timestamps (as sent by remote)", 25 },
|
||||
{ "gap_received", "seconds", "Inter-message arrival time (as received by this instance)", 25 },
|
||||
{ "owd", "seconds", "One-way-delay (OWD) of received messages", 25 }
|
||||
struct stats_metric_description stats_metrics[] = {
|
||||
{ "skipped", STATS_METRIC_SKIPPED, "samples", "Skipped samples and the distance between them", 25 },
|
||||
{ "reordered", STATS_METRIC_REORDERED, "samples", "Reordered samples and the distance between them", 25 },
|
||||
{ "gap_sent", STATS_METRIC_GAP_SAMPLE, "seconds", "Inter-message timestamps (as sent by remote)", 25 },
|
||||
{ "gap_received", STATS_METRIC_GAP_RECEIVED, "seconds", "Inter-message arrival time (as received by this instance)", 25 },
|
||||
{ "owd", STATS_METRIC_OWD, "seconds", "One-way-delay (OWD) of received messages", 25 }
|
||||
};
|
||||
|
||||
struct stats_type_description stats_types[] = {
|
||||
{ "last", STATS_TYPE_LAST, SIGNAL_TYPE_FLOAT },
|
||||
{ "highest", STATS_TYPE_HIGHEST, SIGNAL_TYPE_FLOAT },
|
||||
{ "lowest", STATS_TYPE_LOWEST, SIGNAL_TYPE_FLOAT },
|
||||
{ "mean", STATS_TYPE_MEAN, SIGNAL_TYPE_FLOAT },
|
||||
{ "var", STATS_TYPE_VAR, SIGNAL_TYPE_FLOAT },
|
||||
{ "stddev", STATS_TYPE_STDDEV, SIGNAL_TYPE_FLOAT },
|
||||
{ "total", STATS_TYPE_TOTAL, SIGNAL_TYPE_INTEGER }
|
||||
};
|
||||
|
||||
int stats_lookup_format(const char *str)
|
||||
|
@ -51,9 +61,33 @@ int stats_lookup_format(const char *str)
|
|||
return -1;
|
||||
}
|
||||
|
||||
enum stats_metric stats_lookup_metric(const char *str)
|
||||
{
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
|
||||
struct stats_metric_description *d = &stats_metrics[i];
|
||||
|
||||
if (!strcmp(str, d->name))
|
||||
return d->metric;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
enum stats_type stats_lookup_type(const char *str)
|
||||
{
|
||||
for (int i = 0; i < STATS_TYPE_COUNT; i++) {
|
||||
struct stats_type_description *d = &stats_types[i];
|
||||
|
||||
if (!strcmp(str, d->name))
|
||||
return d->type;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
int stats_init(struct stats *s, int buckets, int warmup)
|
||||
{
|
||||
for (int i = 0; i < STATS_COUNT; i++)
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++)
|
||||
hist_init(&s->histograms[i], buckets, warmup);
|
||||
|
||||
s->delta = alloc(sizeof(struct stats_delta));
|
||||
|
@ -63,7 +97,7 @@ int stats_init(struct stats *s, int buckets, int warmup)
|
|||
|
||||
int stats_destroy(struct stats *s)
|
||||
{
|
||||
for (int i = 0; i < STATS_COUNT; i++)
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++)
|
||||
hist_destroy(&s->histograms[i]);
|
||||
|
||||
free(s->delta);
|
||||
|
@ -71,7 +105,7 @@ int stats_destroy(struct stats *s)
|
|||
return 0;
|
||||
}
|
||||
|
||||
void stats_update(struct stats *s, enum stats_id id, double val)
|
||||
void stats_update(struct stats *s, enum stats_metric id, double val)
|
||||
{
|
||||
s->delta->values[id] = val;
|
||||
s->delta->update |= 1 << id;
|
||||
|
@ -79,7 +113,7 @@ void stats_update(struct stats *s, enum stats_id id, double val)
|
|||
|
||||
int stats_commit(struct stats *s)
|
||||
{
|
||||
for (int i = 0; i < STATS_COUNT; i++) {
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
|
||||
if (s->delta->update & 1 << i) {
|
||||
hist_put(&s->histograms[i], s->delta->values[i]);
|
||||
s->delta->update &= ~(1 << i);
|
||||
|
@ -93,8 +127,8 @@ json_t * stats_json(struct stats *s)
|
|||
{
|
||||
json_t *obj = json_object();
|
||||
|
||||
for (int i = 0; i < STATS_COUNT; i++) {
|
||||
struct stats_desc *d = &stats_metrics[i];
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
|
||||
struct stats_metric_description *d = &stats_metrics[i];
|
||||
struct hist *h = &s->histograms[i];
|
||||
|
||||
json_object_set_new(obj, d->name, hist_json(h));
|
||||
|
@ -107,17 +141,17 @@ json_t * stats_json_periodic(struct stats *s, struct node *n)
|
|||
{
|
||||
return json_pack("{ s: s, s: i, s: f, s: f, s: i, s: i }",
|
||||
"node", node_name(n),
|
||||
"processed", hist_total(&s->histograms[STATS_OWD]),
|
||||
"owd", hist_last(&s->histograms[STATS_OWD]),
|
||||
"rate", 1.0 / hist_last(&s->histograms[STATS_GAP_SAMPLE]),
|
||||
"dropped", hist_total(&s->histograms[STATS_REORDERED]),
|
||||
"skipped", hist_total(&s->histograms[STATS_SKIPPED])
|
||||
"processed", hist_total(&s->histograms[STATS_METRIC_OWD]),
|
||||
"owd", hist_last(&s->histograms[STATS_METRIC_OWD]),
|
||||
"rate", 1.0 / hist_last(&s->histograms[STATS_METRIC_GAP_SAMPLE]),
|
||||
"dropped", hist_total(&s->histograms[STATS_METRIC_REORDERED]),
|
||||
"skipped", hist_total(&s->histograms[STATS_METRIC_SKIPPED])
|
||||
);
|
||||
}
|
||||
|
||||
void stats_reset(struct stats *s)
|
||||
{
|
||||
for (int i = 0; i < STATS_COUNT; i++)
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++)
|
||||
hist_reset(&s->histograms[i]);
|
||||
}
|
||||
|
||||
|
@ -155,13 +189,13 @@ void stats_print_periodic(struct stats *s, FILE *f, enum stats_format fmt, int v
|
|||
case STATS_FORMAT_HUMAN:
|
||||
table_row(&stats_table,
|
||||
node_name_short(n),
|
||||
hist_total(&s->histograms[STATS_OWD]),
|
||||
hist_last(&s->histograms[STATS_OWD]),
|
||||
hist_mean(&s->histograms[STATS_OWD]),
|
||||
1.0 / hist_last(&s->histograms[STATS_GAP_RECEIVED]),
|
||||
1.0 / hist_mean(&s->histograms[STATS_GAP_RECEIVED]),
|
||||
hist_total(&s->histograms[STATS_REORDERED]),
|
||||
hist_total(&s->histograms[STATS_SKIPPED])
|
||||
hist_total(&s->histograms[STATS_METRIC_OWD]),
|
||||
hist_last(&s->histograms[STATS_METRIC_OWD]),
|
||||
hist_mean(&s->histograms[STATS_METRIC_OWD]),
|
||||
1.0 / hist_last(&s->histograms[STATS_METRIC_GAP_RECEIVED]),
|
||||
1.0 / hist_mean(&s->histograms[STATS_METRIC_GAP_RECEIVED]),
|
||||
hist_total(&s->histograms[STATS_METRIC_REORDERED]),
|
||||
hist_total(&s->histograms[STATS_METRIC_SKIPPED])
|
||||
);
|
||||
break;
|
||||
|
||||
|
@ -179,10 +213,10 @@ void stats_print(struct stats *s, FILE *f, enum stats_format fmt, int verbose)
|
|||
{
|
||||
switch (fmt) {
|
||||
case STATS_FORMAT_HUMAN:
|
||||
for (int i = 0; i < STATS_COUNT; i++) {
|
||||
struct stats_desc *desc = &stats_metrics[i];
|
||||
for (int i = 0; i < STATS_METRIC_COUNT; i++) {
|
||||
struct stats_metric_description *d = &stats_metrics[i];
|
||||
|
||||
info("%s: %s", desc->name, desc->desc);
|
||||
info("%s: %s", d->name, d->desc);
|
||||
hist_print(&s->histograms[i], verbose);
|
||||
}
|
||||
break;
|
||||
|
@ -198,14 +232,44 @@ void stats_print(struct stats *s, FILE *f, enum stats_format fmt, int verbose)
|
|||
}
|
||||
}
|
||||
|
||||
enum stats_id stats_lookup_id(const char *name)
|
||||
union signal_data stats_get_value(const struct stats *s, enum stats_metric sm, enum stats_type st)
|
||||
{
|
||||
for (int i = 0; i < STATS_COUNT; i++) {
|
||||
struct stats_desc *desc = &stats_metrics[i];
|
||||
const struct hist *h = &s->histograms[sm];
|
||||
|
||||
if (!strcmp(desc->name, name))
|
||||
return i;
|
||||
union signal_data d;
|
||||
|
||||
switch (st) {
|
||||
case STATS_TYPE_TOTAL:
|
||||
d.i = h->total;
|
||||
break;
|
||||
|
||||
case STATS_TYPE_LAST:
|
||||
d.f = h->last;
|
||||
break;
|
||||
|
||||
case STATS_TYPE_HIGHEST:
|
||||
d.f = h->highest;
|
||||
break;
|
||||
|
||||
case STATS_TYPE_LOWEST:
|
||||
d.f = h->lowest;
|
||||
break;
|
||||
|
||||
case STATS_TYPE_MEAN:
|
||||
d.f = hist_mean(h);
|
||||
break;
|
||||
|
||||
case STATS_TYPE_STDDEV:
|
||||
d.f = hist_stddev(h);
|
||||
break;
|
||||
|
||||
case STATS_TYPE_VAR:
|
||||
d.f = hist_var(h);
|
||||
break;
|
||||
|
||||
default:
|
||||
d.f = -1;
|
||||
}
|
||||
|
||||
return -1;
|
||||
return d;
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@ Test(mapping, parse_nodes)
|
|||
cr_assert_eq(ret, 0);
|
||||
cr_assert_eq(m.node, vlist_lookup(&nodes, "cherry"));
|
||||
cr_assert_eq(m.type, MAPPING_TYPE_STATS);
|
||||
cr_assert_eq(m.stats.id, STATS_OWD);
|
||||
cr_assert_eq(m.stats.metric, STATS_METRIC_OWD);
|
||||
cr_assert_eq(m.stats.type, STATS_TYPE_MEAN);
|
||||
|
||||
ret = mapping_parse_str(&m, "carrot.data[1-2]", &nodes);
|
||||
|
@ -126,7 +126,7 @@ Test(mapping, parse)
|
|||
ret = mapping_parse_str(&m, "stats.owd.mean", nullptr);
|
||||
cr_assert_eq(ret, 0);
|
||||
cr_assert_eq(m.type, MAPPING_TYPE_STATS);
|
||||
cr_assert_eq(m.stats.id, STATS_OWD);
|
||||
cr_assert_eq(m.stats.metric, STATS_METRIC_OWD);
|
||||
cr_assert_eq(m.stats.type, STATS_TYPE_MEAN);
|
||||
|
||||
ret = mapping_parse_str(&m, "data[1-2]", nullptr);
|
||||
|
@ -171,10 +171,10 @@ Test(mapping, parse)
|
|||
cr_assert_neq(ret, 0);
|
||||
|
||||
/* Check for superfluous chars at the end */
|
||||
ret = mapping_parse_str(&m, "stats.ts.origin.bla", nullptr);
|
||||
ret = mapping_parse_str(&m, "hdr.ts.origin.bla", nullptr);
|
||||
cr_assert_neq(ret, 0);
|
||||
|
||||
ret = mapping_parse_str(&m, "stats.ts.origin.", nullptr);
|
||||
ret = mapping_parse_str(&m, "hdr.ts.origin.", nullptr);
|
||||
cr_assert_neq(ret, 0);
|
||||
|
||||
ret = mapping_parse_str(&m, "data[1-2]bla", nullptr);
|
||||
|
|
Loading…
Add table
Reference in a new issue