From 9e354e73825a295940b89de83454b26698a78899 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 4 Sep 2017 23:03:00 +0200 Subject: [PATCH] stats: fixes for new stats node type --- include/villas/nodes/stats.h | 8 +++-- lib/io/villas_human.c | 4 +-- lib/nodes/Makefile.inc | 7 ++++ lib/nodes/stats.c | 60 +++++++++++++++++++++-------------- lib/path.c | 3 ++ lib/sample.c | 36 +++++++++++++++++---- src/test-cmp.c | 53 ++++++++++++++++--------------- tests/integration/test-cmp.sh | 30 ++++++++++-------- 8 files changed, 130 insertions(+), 71 deletions(-) diff --git a/include/villas/nodes/stats.h b/include/villas/nodes/stats.h index 23c4c82e1..8bb5235a7 100644 --- a/include/villas/nodes/stats.h +++ b/include/villas/nodes/stats.h @@ -29,16 +29,20 @@ #pragma once +#include + #include "task.h" /* Forward declarations */ struct node; -struct path; struct sample; +struct super_node; struct stats_node { - struct task task; double rate; + char *node_str; + + struct task task; struct node *node; }; diff --git a/lib/io/villas_human.c b/lib/io/villas_human.c index 7bdba26e5..aa179fac9 100644 --- a/lib/io/villas_human.c +++ b/lib/io/villas_human.c @@ -45,7 +45,7 @@ size_t villas_human_sprint_single(char *buf, size_t len, struct sample *s, int f off += snprintf(buf + off, len - off, ".%09llu", (unsigned long long) s->ts.origin.tv_nsec); } - if (flags & SAMPLE_OFFSET) + if (flags & SAMPLE_RECEIVED) off += snprintf(buf + off, len - off, "%+e", time_delta(&s->ts.origin, &s->ts.received)); if (flags & SAMPLE_SEQUENCE) @@ -53,7 +53,7 @@ size_t villas_human_sprint_single(char *buf, size_t len, struct sample *s, int f if (flags & SAMPLE_VALUES) { for (int i = 0; i < s->length; i++) { - switch ((s->format >> i) & 0x1) { + switch (sample_get_data_format(s, i)) { case SAMPLE_DATA_FORMAT_FLOAT: off += snprintf(buf + off, len - off, "\t%.6lf", s->data[i].f); break; diff --git a/lib/nodes/Makefile.inc b/lib/nodes/Makefile.inc index 8f4a6984a..e7f5f6a6a 100644 --- a/lib/nodes/Makefile.inc +++ b/lib/nodes/Makefile.inc @@ -35,6 +35,13 @@ WITH_SOCKET ?= 1 WITH_ZEROMQ ?= 1 WITH_NANOMSG ?= 1 WITH_SHMEM ?= 1 +WITH_STATS ?= 1 + +# Enable stats node-type +ifeq ($(WITH_STATS),1) + LIB_SRCS += lib/nodes/stats.c + LIB_CFLAGS += -DWITH_STATS +endif # Enable file node-type ifeq ($(WITH_FILE),1) diff --git a/lib/nodes/stats.c b/lib/nodes/stats.c index 167faf445..0ad71d180 100644 --- a/lib/nodes/stats.c +++ b/lib/nodes/stats.c @@ -24,6 +24,7 @@ * @{ */ +#include "nodes/stats.h" #include "hook.h" #include "plugin.h" #include "stats.h" @@ -48,20 +49,18 @@ int stats_node_start(struct node *n) struct stats_node *s = n->_vd; int ret; - if (!s->node->stats) - return -1; - - if (s->node->stats->state != STATE_INITIALIZED) - return -2; - ret = task_init(&s->task, s->rate, CLOCK_MONOTONIC); if (ret) serror("Failed to create task"); + s->node = list_lookup(nodes, s->node_str); + if (!s->node) + error("Invalid reference node %s for setting 'node' of node %s", s->node_str, node_name(n)); + return 0; } -int stats_node_start(struct node *n) +int stats_node_stop(struct node *n) { struct stats_node *s = n->_vd; int ret; @@ -75,14 +74,14 @@ int stats_node_start(struct node *n) char * stats_node_print(struct node *n) { - struct stats_node *p = h->_vd; + struct stats_node *s = n->_vd; - return strf("node=%s, rate=%f", node_name_short(s->node), s->rate); + return strf("node=%s, rate=%f", s->node_str, s->rate); } int stats_node_parse(struct node *n, json_t *cfg) { - struct stats_node *p = h->_vd; + struct stats_node *s = n->_vd; int ret; json_error_t err; @@ -99,25 +98,39 @@ int stats_node_parse(struct node *n, json_t *cfg) if (s->rate <= 0) error("Setting 'rate' of node %s must be positive", node_name(n)); + s->node_str = strdup(node); + + n->samplelen = STATS_COUNT * 7; + return 0; } -int stats_node_read(struct hook *h, struct sample *smps[], unsigned *cnt) +int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt) { - struct stats_node *sn = h->_vd; + struct stats_node *sn = n->_vd; struct stats *s = sn->node->stats; - if (*cnt == 0) + if (!cnt) return 0; - for (int i = 0; j < MIN(STATS_COUNT, smps[0]->capacity); i++) { - smps[0]->data[i++].f = hist_last(&s->histograms[i]); - smps[0]->data[i++].f = hist_highest(&s->histograms[i]); - smps[0]->data[i++].f = hist_lowest(&s->histograms[i]); - smps[0]->data[i++].f = hist_mean(&s->histograms[i]); - smps[0]->data[i++].f = hist_var(&s->histograms[i]); + if (!sn->node->stats) + return 0; - smps[0]->length = i; + task_wait_until_next_period(&sn->task); + + smps[0]->length = MIN(STATS_COUNT * 7, smps[0]->capacity); + smps[0]->has = SAMPLE_VALUES; + + for (int i = 0; i < 6 && i*7+5 < smps[0]->length; i++) { + int tot = hist_total(&s->histograms[i]); + + smps[0]->data[i*7+0].f = tot ? hist_total(&s->histograms[i]) : 0; + smps[0]->data[i*7+1].f = tot ? hist_last(&s->histograms[i]) : 0; + smps[0]->data[i*7+2].f = tot ? hist_last(&s->histograms[i]) : 0; + smps[0]->data[i*7+3].f = tot ? hist_highest(&s->histograms[i]) : 0; + smps[0]->data[i*7+4].f = tot ? hist_lowest(&s->histograms[i]) : 0; + smps[0]->data[i*7+5].f = tot ? hist_mean(&s->histograms[i]) : 0; + smps[0]->data[i*7+6].f = tot ? hist_var(&s->histograms[i]) : 0; } return 1; @@ -125,7 +138,7 @@ int stats_node_read(struct hook *h, struct sample *smps[], unsigned *cnt) int stats_node_fd(struct node *n) { - struct stats_node *p = h->_vd; + struct stats_node *s = n->_vd; return task_fd(&s->task); } @@ -134,8 +147,8 @@ static struct plugin p = { .name = "stats", .description = "Send statistics to another node", .type = PLUGIN_TYPE_NODE, - .hook = { - .vecotrize = 1, + .node = { + .vectorize = 1, .size = sizeof(struct stats_node), .init = stats_node_init, .parse = stats_node_parse, @@ -148,5 +161,6 @@ static struct plugin p = { }; REGISTER_PLUGIN(&p) +LIST_INIT_STATIC(&p.node.instances) /** @} */ diff --git a/lib/path.c b/lib/path.c index 3148dd6f1..14cc69ec3 100644 --- a/lib/path.c +++ b/lib/path.c @@ -292,6 +292,9 @@ int path_init2(struct path *p) } } + if (!p->samplelen) + p->samplelen = DEFAULT_SAMPLELEN; + ret = pool_init(&p->pool, MAX(1, list_length(&p->destinations)) * p->queuelen, SAMPLE_LEN(p->samplelen), &memtype_hugepage); if (ret) return ret; diff --git a/lib/sample.c b/lib/sample.c index 45d492daf..580f04159 100644 --- a/lib/sample.c +++ b/lib/sample.c @@ -123,7 +123,7 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags) if (flags & SAMPLE_SEQUENCE) { if (a->sequence != b->sequence) { printf("sequence no: %d != %d\n", a->sequence, b->sequence); - return -2; + return 2; } } @@ -131,7 +131,15 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags) if (flags & SAMPLE_ORIGIN) { if (time_delta(&a->ts.origin, &b->ts.origin) > epsilon) { printf("ts.origin: %f != %f\n", time_to_double(&a->ts.origin), time_to_double(&b->ts.origin)); - return -3; + return 3; + } + } + + /* Compare ID */ + if (flags & SAMPLE_SOURCE) { + if (a->id != b->id) { + printf("id: %d != %d\n", a->id, b->id); + return 7; } } @@ -139,13 +147,29 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags) if (flags & SAMPLE_VALUES) { if (a->length != b->length) { printf("length: %d != %d\n", a->length, b->length); - return -4; + return 4; + } + + if (a->format != b->format) { + printf("format: %#lx != %#lx\n", a->format, b->format); + return 6; } for (int i = 0; i < a->length; i++) { - if (fabs(a->data[i].f - b->data[i].f) > epsilon) { - printf("data[%d]: %f != %f\n", i, a->data[i].f, b->data[i].f); - return -5; + switch (sample_get_data_format(a, i)) { + case SAMPLE_DATA_FORMAT_FLOAT: + if (fabs(a->data[i].f - b->data[i].f) > epsilon) { + printf("data[%d].f: %f != %f\n", i, a->data[i].f, b->data[i].f); + return 5; + } + break; + + case SAMPLE_DATA_FORMAT_INT: + if (a->data[i].i != b->data[i].i) { + printf("data[%d].i: %ld != %ld\n", i, a->data[i].i, b->data[i].i); + return 5; + } + break; } } } diff --git a/src/test-cmp.c b/src/test-cmp.c index 06b75679b..c8698cf5e 100644 --- a/src/test-cmp.c +++ b/src/test-cmp.c @@ -47,8 +47,7 @@ struct side { void usage() { printf("Usage: villas-test-cmp [OPTIONS] FILE1 FILE2 ... FILEn\n"); - printf(" FILE1 first file to compare\n"); - printf(" FILE2 second file to compare against\n"); + printf(" FILE a list of files to compare\n"); printf(" OPTIONS is one or more of the following options:\n"); printf(" -h print this usage information\n"); printf(" -d LVL adjust the debug level\n"); @@ -56,7 +55,7 @@ void usage() printf(" -v ignore data values\n"); printf(" -t ignore timestamp\n"); printf(" -s ignore sequence no\n"); - printf(" -f file format for all files\n"); + printf(" -f FMT file format for all files\n"); printf("\n"); printf("Return codes:\n"); printf(" 0 files are equal\n"); @@ -133,7 +132,7 @@ check: if (optarg == endptr) if (!s[i].fmt) error("Invalid IO format: %s", s[i].format); - ret = io_init(&s[i].io, s[i].fmt, IO_NONBLOCK); + ret = io_init(&s[i].io, s[i].fmt, 0); if (ret) error("Failed to initialize IO"); @@ -146,46 +145,50 @@ check: if (optarg == endptr) error("Failed to allocate samples"); } - int line = 0; + int eofs, line, failed; + + line = 0; for (;;) { /* Read next sample from all files */ - int fails = 0; +retry: eofs = 0; for (int i = 0; i < n; i++) { ret = io_eof(&s[i].io); - if (ret) { - fails++; - continue; + if (ret) + eofs++; + } + + if (eofs) { + if (eofs == n) + ret = 0; + else { + printf("length unequal\n"); + ret = 1; } + goto out; + } + + failed = 0; + for (int i = 0; i < n; i++) { ret = io_scan(&s[i].io, &s[i].sample, 1); - if (ret <= 0) { - fails++; - continue; - } + if (ret <= 0) + failed++; } - if (fails == n) { - ret = 0; - goto fail; - } - else if (fails != n) { - printf("fails = %d at line %d\n", fails, line); - ret = -1; - goto fail; - } + if (failed) + goto retry; /* We compare all files against the first one */ for (int i = 1; i < n; i++) { ret = sample_cmp(s[0].sample, s[i].sample, epsilon, flags); if (ret) - goto fail; + goto out; } line++; } -fail: - for (int i = 0; i < n; i++) { +out: for (int i = 0; i < n; i++) { io_close(&s[i].io); io_destroy(&s[i].io); sample_put(s[i].sample); diff --git a/tests/integration/test-cmp.sh b/tests/integration/test-cmp.sh index fbd38b95b..a3be92987 100755 --- a/tests/integration/test-cmp.sh +++ b/tests/integration/test-cmp.sh @@ -23,9 +23,10 @@ ################################################################################## INPUT_FILE=$(mktemp) +TEMP_FILE=$(mktemp) function fail() { - rm ${INPUT_FILE} + rm ${INPUT_FILE} ${TEMP_FILE} exit $1; } @@ -45,25 +46,28 @@ EOF villas-test-cmp ${INPUT_FILE} ${INPUT_FILE} (( $? == 0 )) || fail 1 -villas-test-cmp ${INPUT_FILE} <(head -n5 ${INPUT_FILE}; sleep 1; tail -n5 ${INPUT_FILE}) -(( $? == 0 )) || fail 8 - -villas-test-cmp ${INPUT_FILE} <(head -n-1 ${INPUT_FILE}) +head -n-1 ${INPUT_FILE} > ${TEMP_FILE} +villas-test-cmp ${INPUT_FILE} ${TEMP_FILE} (( $? == 1 )) || fail 2 -villas-test-cmp ${INPUT_FILE} <(cat ${INPUT_FILE}; echo "1491095597.545159701(9) -0.587785") -(( $? == 1 )) || fail 7 - -villas-test-cmp ${INPUT_FILE} <(head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(55) -0.587785") +( head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(55) -0.587785" ) > ${TEMP_FILE} +villas-test-cmp ${INPUT_FILE} ${TEMP_FILE} (( $? == 2 )) || fail 3 -villas-test-cmp ${INPUT_FILE} <(head -n-1 ${INPUT_FILE}; echo "1491095598.545159701(9) -0.587785") +( head -n-1 ${INPUT_FILE}; echo "1491095598.545159701(9) -0.587785" ) > ${TEMP_FILE} +villas-test-cmp ${INPUT_FILE} ${TEMP_FILE} (( $? == 3 )) || fail 4 -villas-test-cmp ${INPUT_FILE} <(head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(9) -0.587785 -0.587785") +( head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(9) -0.587785 -0.587785" ) > ${TEMP_FILE} +villas-test-cmp ${INPUT_FILE} ${TEMP_FILE} (( $? == 4 )) || fail 5 -villas-test-cmp ${INPUT_FILE} <(head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(9) -1.587785") +( head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(9) -1.587785" ) > ${TEMP_FILE} +villas-test-cmp ${INPUT_FILE} ${TEMP_FILE} (( $? == 5 )) || fail 6 -rm ${INPUT_FILE} \ No newline at end of file +( cat ${INPUT_FILE}; echo "1491095597.545159701(9) -0.587785" ) > ${TEMP_FILE} +villas-test-cmp ${INPUT_FILE} ${TEMP_FILE} +(( $? == 1 )) || fail 7 + +rm ${INPUT_FILE} ${TEMP_FILE}