mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
stats: fixes for new stats node type
This commit is contained in:
parent
9193829dcc
commit
9e354e7382
8 changed files with 130 additions and 71 deletions
|
@ -29,16 +29,20 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <jansson.h>
|
||||
|
||||
#include "task.h"
|
||||
|
||||
/* Forward declarations */
|
||||
struct node;
|
||||
struct path;
|
||||
struct sample;
|
||||
struct super_node;
|
||||
|
||||
struct stats_node {
|
||||
struct task task;
|
||||
double rate;
|
||||
char *node_str;
|
||||
|
||||
struct task task;
|
||||
|
||||
struct node *node;
|
||||
};
|
||||
|
|
|
@ -45,7 +45,7 @@ size_t villas_human_sprint_single(char *buf, size_t len, struct sample *s, int f
|
|||
off += snprintf(buf + off, len - off, ".%09llu", (unsigned long long) s->ts.origin.tv_nsec);
|
||||
}
|
||||
|
||||
if (flags & SAMPLE_OFFSET)
|
||||
if (flags & SAMPLE_RECEIVED)
|
||||
off += snprintf(buf + off, len - off, "%+e", time_delta(&s->ts.origin, &s->ts.received));
|
||||
|
||||
if (flags & SAMPLE_SEQUENCE)
|
||||
|
@ -53,7 +53,7 @@ size_t villas_human_sprint_single(char *buf, size_t len, struct sample *s, int f
|
|||
|
||||
if (flags & SAMPLE_VALUES) {
|
||||
for (int i = 0; i < s->length; i++) {
|
||||
switch ((s->format >> i) & 0x1) {
|
||||
switch (sample_get_data_format(s, i)) {
|
||||
case SAMPLE_DATA_FORMAT_FLOAT:
|
||||
off += snprintf(buf + off, len - off, "\t%.6lf", s->data[i].f);
|
||||
break;
|
||||
|
|
|
@ -35,6 +35,13 @@ WITH_SOCKET ?= 1
|
|||
WITH_ZEROMQ ?= 1
|
||||
WITH_NANOMSG ?= 1
|
||||
WITH_SHMEM ?= 1
|
||||
WITH_STATS ?= 1
|
||||
|
||||
# Enable stats node-type
|
||||
ifeq ($(WITH_STATS),1)
|
||||
LIB_SRCS += lib/nodes/stats.c
|
||||
LIB_CFLAGS += -DWITH_STATS
|
||||
endif
|
||||
|
||||
# Enable file node-type
|
||||
ifeq ($(WITH_FILE),1)
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
* @{
|
||||
*/
|
||||
|
||||
#include "nodes/stats.h"
|
||||
#include "hook.h"
|
||||
#include "plugin.h"
|
||||
#include "stats.h"
|
||||
|
@ -48,20 +49,18 @@ int stats_node_start(struct node *n)
|
|||
struct stats_node *s = n->_vd;
|
||||
int ret;
|
||||
|
||||
if (!s->node->stats)
|
||||
return -1;
|
||||
|
||||
if (s->node->stats->state != STATE_INITIALIZED)
|
||||
return -2;
|
||||
|
||||
ret = task_init(&s->task, s->rate, CLOCK_MONOTONIC);
|
||||
if (ret)
|
||||
serror("Failed to create task");
|
||||
|
||||
s->node = list_lookup(nodes, s->node_str);
|
||||
if (!s->node)
|
||||
error("Invalid reference node %s for setting 'node' of node %s", s->node_str, node_name(n));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_node_start(struct node *n)
|
||||
int stats_node_stop(struct node *n)
|
||||
{
|
||||
struct stats_node *s = n->_vd;
|
||||
int ret;
|
||||
|
@ -75,14 +74,14 @@ int stats_node_start(struct node *n)
|
|||
|
||||
char * stats_node_print(struct node *n)
|
||||
{
|
||||
struct stats_node *p = h->_vd;
|
||||
struct stats_node *s = n->_vd;
|
||||
|
||||
return strf("node=%s, rate=%f", node_name_short(s->node), s->rate);
|
||||
return strf("node=%s, rate=%f", s->node_str, s->rate);
|
||||
}
|
||||
|
||||
int stats_node_parse(struct node *n, json_t *cfg)
|
||||
{
|
||||
struct stats_node *p = h->_vd;
|
||||
struct stats_node *s = n->_vd;
|
||||
|
||||
int ret;
|
||||
json_error_t err;
|
||||
|
@ -99,25 +98,39 @@ int stats_node_parse(struct node *n, json_t *cfg)
|
|||
if (s->rate <= 0)
|
||||
error("Setting 'rate' of node %s must be positive", node_name(n));
|
||||
|
||||
s->node_str = strdup(node);
|
||||
|
||||
n->samplelen = STATS_COUNT * 7;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int stats_node_read(struct hook *h, struct sample *smps[], unsigned *cnt)
|
||||
int stats_node_read(struct node *n, struct sample *smps[], unsigned cnt)
|
||||
{
|
||||
struct stats_node *sn = h->_vd;
|
||||
struct stats_node *sn = n->_vd;
|
||||
struct stats *s = sn->node->stats;
|
||||
|
||||
if (*cnt == 0)
|
||||
if (!cnt)
|
||||
return 0;
|
||||
|
||||
for (int i = 0; j < MIN(STATS_COUNT, smps[0]->capacity); i++) {
|
||||
smps[0]->data[i++].f = hist_last(&s->histograms[i]);
|
||||
smps[0]->data[i++].f = hist_highest(&s->histograms[i]);
|
||||
smps[0]->data[i++].f = hist_lowest(&s->histograms[i]);
|
||||
smps[0]->data[i++].f = hist_mean(&s->histograms[i]);
|
||||
smps[0]->data[i++].f = hist_var(&s->histograms[i]);
|
||||
if (!sn->node->stats)
|
||||
return 0;
|
||||
|
||||
smps[0]->length = i;
|
||||
task_wait_until_next_period(&sn->task);
|
||||
|
||||
smps[0]->length = MIN(STATS_COUNT * 7, smps[0]->capacity);
|
||||
smps[0]->has = SAMPLE_VALUES;
|
||||
|
||||
for (int i = 0; i < 6 && i*7+5 < smps[0]->length; i++) {
|
||||
int tot = hist_total(&s->histograms[i]);
|
||||
|
||||
smps[0]->data[i*7+0].f = tot ? hist_total(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*7+1].f = tot ? hist_last(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*7+2].f = tot ? hist_last(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*7+3].f = tot ? hist_highest(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*7+4].f = tot ? hist_lowest(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*7+5].f = tot ? hist_mean(&s->histograms[i]) : 0;
|
||||
smps[0]->data[i*7+6].f = tot ? hist_var(&s->histograms[i]) : 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
|
@ -125,7 +138,7 @@ int stats_node_read(struct hook *h, struct sample *smps[], unsigned *cnt)
|
|||
|
||||
int stats_node_fd(struct node *n)
|
||||
{
|
||||
struct stats_node *p = h->_vd;
|
||||
struct stats_node *s = n->_vd;
|
||||
|
||||
return task_fd(&s->task);
|
||||
}
|
||||
|
@ -134,8 +147,8 @@ static struct plugin p = {
|
|||
.name = "stats",
|
||||
.description = "Send statistics to another node",
|
||||
.type = PLUGIN_TYPE_NODE,
|
||||
.hook = {
|
||||
.vecotrize = 1,
|
||||
.node = {
|
||||
.vectorize = 1,
|
||||
.size = sizeof(struct stats_node),
|
||||
.init = stats_node_init,
|
||||
.parse = stats_node_parse,
|
||||
|
@ -148,5 +161,6 @@ static struct plugin p = {
|
|||
};
|
||||
|
||||
REGISTER_PLUGIN(&p)
|
||||
LIST_INIT_STATIC(&p.node.instances)
|
||||
|
||||
/** @} */
|
||||
|
|
|
@ -292,6 +292,9 @@ int path_init2(struct path *p)
|
|||
}
|
||||
}
|
||||
|
||||
if (!p->samplelen)
|
||||
p->samplelen = DEFAULT_SAMPLELEN;
|
||||
|
||||
ret = pool_init(&p->pool, MAX(1, list_length(&p->destinations)) * p->queuelen, SAMPLE_LEN(p->samplelen), &memtype_hugepage);
|
||||
if (ret)
|
||||
return ret;
|
||||
|
|
36
lib/sample.c
36
lib/sample.c
|
@ -123,7 +123,7 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags)
|
|||
if (flags & SAMPLE_SEQUENCE) {
|
||||
if (a->sequence != b->sequence) {
|
||||
printf("sequence no: %d != %d\n", a->sequence, b->sequence);
|
||||
return -2;
|
||||
return 2;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -131,7 +131,15 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags)
|
|||
if (flags & SAMPLE_ORIGIN) {
|
||||
if (time_delta(&a->ts.origin, &b->ts.origin) > epsilon) {
|
||||
printf("ts.origin: %f != %f\n", time_to_double(&a->ts.origin), time_to_double(&b->ts.origin));
|
||||
return -3;
|
||||
return 3;
|
||||
}
|
||||
}
|
||||
|
||||
/* Compare ID */
|
||||
if (flags & SAMPLE_SOURCE) {
|
||||
if (a->id != b->id) {
|
||||
printf("id: %d != %d\n", a->id, b->id);
|
||||
return 7;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -139,13 +147,29 @@ int sample_cmp(struct sample *a, struct sample *b, double epsilon, int flags)
|
|||
if (flags & SAMPLE_VALUES) {
|
||||
if (a->length != b->length) {
|
||||
printf("length: %d != %d\n", a->length, b->length);
|
||||
return -4;
|
||||
return 4;
|
||||
}
|
||||
|
||||
if (a->format != b->format) {
|
||||
printf("format: %#lx != %#lx\n", a->format, b->format);
|
||||
return 6;
|
||||
}
|
||||
|
||||
for (int i = 0; i < a->length; i++) {
|
||||
if (fabs(a->data[i].f - b->data[i].f) > epsilon) {
|
||||
printf("data[%d]: %f != %f\n", i, a->data[i].f, b->data[i].f);
|
||||
return -5;
|
||||
switch (sample_get_data_format(a, i)) {
|
||||
case SAMPLE_DATA_FORMAT_FLOAT:
|
||||
if (fabs(a->data[i].f - b->data[i].f) > epsilon) {
|
||||
printf("data[%d].f: %f != %f\n", i, a->data[i].f, b->data[i].f);
|
||||
return 5;
|
||||
}
|
||||
break;
|
||||
|
||||
case SAMPLE_DATA_FORMAT_INT:
|
||||
if (a->data[i].i != b->data[i].i) {
|
||||
printf("data[%d].i: %ld != %ld\n", i, a->data[i].i, b->data[i].i);
|
||||
return 5;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,8 +47,7 @@ struct side {
|
|||
void usage()
|
||||
{
|
||||
printf("Usage: villas-test-cmp [OPTIONS] FILE1 FILE2 ... FILEn\n");
|
||||
printf(" FILE1 first file to compare\n");
|
||||
printf(" FILE2 second file to compare against\n");
|
||||
printf(" FILE a list of files to compare\n");
|
||||
printf(" OPTIONS is one or more of the following options:\n");
|
||||
printf(" -h print this usage information\n");
|
||||
printf(" -d LVL adjust the debug level\n");
|
||||
|
@ -56,7 +55,7 @@ void usage()
|
|||
printf(" -v ignore data values\n");
|
||||
printf(" -t ignore timestamp\n");
|
||||
printf(" -s ignore sequence no\n");
|
||||
printf(" -f file format for all files\n");
|
||||
printf(" -f FMT file format for all files\n");
|
||||
printf("\n");
|
||||
printf("Return codes:\n");
|
||||
printf(" 0 files are equal\n");
|
||||
|
@ -133,7 +132,7 @@ check: if (optarg == endptr)
|
|||
if (!s[i].fmt)
|
||||
error("Invalid IO format: %s", s[i].format);
|
||||
|
||||
ret = io_init(&s[i].io, s[i].fmt, IO_NONBLOCK);
|
||||
ret = io_init(&s[i].io, s[i].fmt, 0);
|
||||
if (ret)
|
||||
error("Failed to initialize IO");
|
||||
|
||||
|
@ -146,46 +145,50 @@ check: if (optarg == endptr)
|
|||
error("Failed to allocate samples");
|
||||
}
|
||||
|
||||
int line = 0;
|
||||
int eofs, line, failed;
|
||||
|
||||
line = 0;
|
||||
for (;;) {
|
||||
/* Read next sample from all files */
|
||||
int fails = 0;
|
||||
retry: eofs = 0;
|
||||
for (int i = 0; i < n; i++) {
|
||||
ret = io_eof(&s[i].io);
|
||||
if (ret) {
|
||||
fails++;
|
||||
continue;
|
||||
if (ret)
|
||||
eofs++;
|
||||
}
|
||||
|
||||
if (eofs) {
|
||||
if (eofs == n)
|
||||
ret = 0;
|
||||
else {
|
||||
printf("length unequal\n");
|
||||
ret = 1;
|
||||
}
|
||||
|
||||
goto out;
|
||||
}
|
||||
|
||||
failed = 0;
|
||||
for (int i = 0; i < n; i++) {
|
||||
ret = io_scan(&s[i].io, &s[i].sample, 1);
|
||||
if (ret <= 0) {
|
||||
fails++;
|
||||
continue;
|
||||
}
|
||||
if (ret <= 0)
|
||||
failed++;
|
||||
}
|
||||
|
||||
if (fails == n) {
|
||||
ret = 0;
|
||||
goto fail;
|
||||
}
|
||||
else if (fails != n) {
|
||||
printf("fails = %d at line %d\n", fails, line);
|
||||
ret = -1;
|
||||
goto fail;
|
||||
}
|
||||
if (failed)
|
||||
goto retry;
|
||||
|
||||
/* We compare all files against the first one */
|
||||
for (int i = 1; i < n; i++) {
|
||||
ret = sample_cmp(s[0].sample, s[i].sample, epsilon, flags);
|
||||
if (ret)
|
||||
goto fail;
|
||||
goto out;
|
||||
}
|
||||
|
||||
line++;
|
||||
}
|
||||
|
||||
fail:
|
||||
for (int i = 0; i < n; i++) {
|
||||
out: for (int i = 0; i < n; i++) {
|
||||
io_close(&s[i].io);
|
||||
io_destroy(&s[i].io);
|
||||
sample_put(s[i].sample);
|
||||
|
|
|
@ -23,9 +23,10 @@
|
|||
##################################################################################
|
||||
|
||||
INPUT_FILE=$(mktemp)
|
||||
TEMP_FILE=$(mktemp)
|
||||
|
||||
function fail() {
|
||||
rm ${INPUT_FILE}
|
||||
rm ${INPUT_FILE} ${TEMP_FILE}
|
||||
exit $1;
|
||||
}
|
||||
|
||||
|
@ -45,25 +46,28 @@ EOF
|
|||
villas-test-cmp ${INPUT_FILE} ${INPUT_FILE}
|
||||
(( $? == 0 )) || fail 1
|
||||
|
||||
villas-test-cmp ${INPUT_FILE} <(head -n5 ${INPUT_FILE}; sleep 1; tail -n5 ${INPUT_FILE})
|
||||
(( $? == 0 )) || fail 8
|
||||
|
||||
villas-test-cmp ${INPUT_FILE} <(head -n-1 ${INPUT_FILE})
|
||||
head -n-1 ${INPUT_FILE} > ${TEMP_FILE}
|
||||
villas-test-cmp ${INPUT_FILE} ${TEMP_FILE}
|
||||
(( $? == 1 )) || fail 2
|
||||
|
||||
villas-test-cmp ${INPUT_FILE} <(cat ${INPUT_FILE}; echo "1491095597.545159701(9) -0.587785")
|
||||
(( $? == 1 )) || fail 7
|
||||
|
||||
villas-test-cmp ${INPUT_FILE} <(head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(55) -0.587785")
|
||||
( head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(55) -0.587785" ) > ${TEMP_FILE}
|
||||
villas-test-cmp ${INPUT_FILE} ${TEMP_FILE}
|
||||
(( $? == 2 )) || fail 3
|
||||
|
||||
villas-test-cmp ${INPUT_FILE} <(head -n-1 ${INPUT_FILE}; echo "1491095598.545159701(9) -0.587785")
|
||||
( head -n-1 ${INPUT_FILE}; echo "1491095598.545159701(9) -0.587785" ) > ${TEMP_FILE}
|
||||
villas-test-cmp ${INPUT_FILE} ${TEMP_FILE}
|
||||
(( $? == 3 )) || fail 4
|
||||
|
||||
villas-test-cmp ${INPUT_FILE} <(head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(9) -0.587785 -0.587785")
|
||||
( head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(9) -0.587785 -0.587785" ) > ${TEMP_FILE}
|
||||
villas-test-cmp ${INPUT_FILE} ${TEMP_FILE}
|
||||
(( $? == 4 )) || fail 5
|
||||
|
||||
villas-test-cmp ${INPUT_FILE} <(head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(9) -1.587785")
|
||||
( head -n-1 ${INPUT_FILE}; echo "1491095597.545159701(9) -1.587785" ) > ${TEMP_FILE}
|
||||
villas-test-cmp ${INPUT_FILE} ${TEMP_FILE}
|
||||
(( $? == 5 )) || fail 6
|
||||
|
||||
rm ${INPUT_FILE}
|
||||
( cat ${INPUT_FILE}; echo "1491095597.545159701(9) -0.587785" ) > ${TEMP_FILE}
|
||||
villas-test-cmp ${INPUT_FILE} ${TEMP_FILE}
|
||||
(( $? == 1 )) || fail 7
|
||||
|
||||
rm ${INPUT_FILE} ${TEMP_FILE}
|
||||
|
|
Loading…
Add table
Reference in a new issue