From d2300e7397b4ad04fbba8b54909ebf8a932f9346 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Thu, 6 Jul 2017 23:48:19 +0200 Subject: [PATCH] temp node test_rtt --- etc/example.conf | 3 +- include/villas/nodes/test_rtt.h | 72 +++++++++ lib/Makefile.villas.inc | 2 +- lib/hooks/stats_collect.c | 16 +- lib/nodes/test_rtt.c | 261 ++++++++++++++++++++++++++++++++ lib/path.c | 10 +- 6 files changed, 349 insertions(+), 15 deletions(-) create mode 100644 include/villas/nodes/test_rtt.h create mode 100644 lib/nodes/test_rtt.c diff --git a/etc/example.conf b/etc/example.conf index 944b7d408..78e1a546d 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -193,7 +193,7 @@ nodes = { signal_node = { type = "signal", - signal = "sine", # One of "sine", "ramp", "triangle", "random", "mixed" + signal = "sine", # One of "sine", "square", "ramp", "triangle", "random", "mixed" values = 4, # Number of values per sample amplitude = 2.3, # Amplitude of generated signals frequency = 10, # Frequency of generated signals @@ -204,6 +204,7 @@ nodes = { type = "loopback", # A loopback node will receive exactly the same data which has been sent to it. # The internal implementation is based on queue. queuelen = 1024 # The queue length of the internal queue which buffers the samples. + samplelen = 64 # Each buffered sample can contain up to 64 values. } }; diff --git a/include/villas/nodes/test_rtt.h b/include/villas/nodes/test_rtt.h new file mode 100644 index 000000000..db7469241 --- /dev/null +++ b/include/villas/nodes/test_rtt.h @@ -0,0 +1,72 @@ +/** Node type: Node-type for testing Round-trip Time. + * + * @file + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +/** + * @addtogroup test-rtt Node-type for testing Round-trip Time. + * @ingroup node + * @{ + */ + +#pragma once + +#include "node.h" +#include "list.h" +#include "io.h" + +struct test_rtt_case { + int tfd; + double rate; + int values; + int counter; + int limit; /**< The number of samples we take per test. */ +}; + +struct test_rtt { + double cooldown; /**< Number of seconds to wait beween tests. */ + int current; /**< Index of current test in test_rtt::cases */ + + struct io io; /**< The format of the output file */ + struct list cases; /**< List of test cases */ + + const char *output; /**< The directory where we place the results. */ +}; + +/** @see node_type::print */ +char * test_rtt_print(struct node *n); + +/** @see node_type::parse */ +int test_rtt_parse(struct node *n, config_setting_t *cfg); + +/** @see node_type::open */ +int test_rtt_start(struct node *n); + +/** @see node_type::close */ +int test_rtt_stop(struct node *n); + +/** @see node_type::read */ +int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt); + +/** @see node_type::write */ +int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt); + +/** @} */ diff --git a/lib/Makefile.villas.inc b/lib/Makefile.villas.inc index 58f5754e0..6f1a0ded3 100644 --- a/lib/Makefile.villas.inc +++ b/lib/Makefile.villas.inc @@ -34,7 +34,7 @@ LIB_SRCS += $(addprefix lib/kernel/, kernel.c rt.c) \ utils.c super_node.c hist.c timing.c pool.c list.c queue.c \ queue_signalled.c memory.c advio.c plugin.c node_type.c stats.c \ mapping.c sample_io.c shmem.c config_helper.c crypt.c compat.c \ - log_table.c log_helper.c \ + log_table.c log_helper.c ) LIB_LDFLAGS = -shared diff --git a/lib/hooks/stats_collect.c b/lib/hooks/stats_collect.c index 0d2c03105..da55787e8 100644 --- a/lib/hooks/stats_collect.c +++ b/lib/hooks/stats_collect.c @@ -25,6 +25,7 @@ */ #include "common.h" +#include "advio.h" #include "hook.h" #include "plugin.h" #include "stats.h" @@ -38,7 +39,7 @@ struct stats_collect { int warmup; int buckets; - FILE *output; + AFILE *output; const char *uri; }; @@ -58,8 +59,7 @@ static int stats_collect_init(struct hook *h) p->warmup = 500; p->buckets = 20; p->uri = NULL; - p->output = stdout; - + return 0; } @@ -75,11 +75,11 @@ static int stats_collect_start(struct hook *h) struct stats_collect *p = h->_vd; if (p->uri) { - p->output = fopen(p->uri, "w+"); + p->output = afopen(p->uri, "w+"); if (!p->output) error("Failed to open file %s for writing", p->uri); } - + return stats_init(&p->stats, p->buckets, p->warmup); } @@ -87,10 +87,10 @@ static int stats_collect_stop(struct hook *h) { struct stats_collect *p = h->_vd; - stats_print(&p->stats, p->output, p->format, p->verbose); + stats_print(&p->stats, p->uri ? p->output->file : stdout, p->format, p->verbose); if (p->uri) - fclose(p->output); + afclose(p->output); return 0; } @@ -108,7 +108,7 @@ static int stats_collect_periodic(struct hook *h) { struct stats_collect *p = h->_vd; - stats_print_periodic(&p->stats, p->output, p->format, p->verbose, h->path); + stats_print_periodic(&p->stats, p->uri ? p->output->file : stdout, p->format, p->verbose, h->path); return 0; } diff --git a/lib/nodes/test_rtt.c b/lib/nodes/test_rtt.c new file mode 100644 index 000000000..3a4a6e8e3 --- /dev/null +++ b/lib/nodes/test_rtt.c @@ -0,0 +1,261 @@ +/** Node type: Node-type for testing Round-trip Time. + * + * @author Steffen Vogel + * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include "timing.h" +#include "plugin.h" +#include "nodes/test_rtt.h" + +static int test_rtt_case_start(struct test_rtt *t) +{ + int ret; + char fn[512]; + + /* Open file */ + snprintf(fn, sizeof(fn), "%s/test_rtt_%d_%.0f.log", t->output, c->values, c->rate); + ret = io_init(&t->io, NULL, IO_FORMAT_ALL & ~IO_FORMAT_VALUES); + if (ret) + return ret; + + /* Start timer. */ + c->tfd = timerfd_create_rate(c->rate); + if (c->tfd < 0) + serror("Failed to create timer"); + + return 0; +} + +static int test_rtt_case_stop(struct test_rtt_case *c) +{ + /* Close file */ + io_close(&t->io); + + /* Stop timer. */ + close(c->tfd); + + return 0; +} + +int test_rtt_parse(struct node *n, config_setting_t *cfg) +{ + struct test_rtt *t = n->_vd; + + int limit, ret; + int numrates = 0, numvalues = 0; + int *rates, *values; + const char *format; + + config_setting_t *cfg_rates, *cfg_values, *cfg_elem; + + if (!config_setting_lookup_int(cfg, "limit", &limit)) + limit = 1000; + + if (!config_setting_lookup_string(cfg, "output", &t->output)) + t->output = "."; + + if (!config_setting_lookup_string(cfg, "format", &format)) + format = "villas"; + + if (!config_setting_lookup_float(cfg, "cooldown", &t->cooldown)) + t->cooldown = 1.0; + + cfg_rates = config_setting_get_member(cfg, "rates"); + if (cfg_rates) { + if (!config_setting_is_array(cfg_rates) || !config_setting_length(cfg_rates)) + cerror(cfg_rates, "The 'rates' setting must be an array of integers with at least one element."); + + numrates = config_setting_length(cfg_rates); + rates = alloc(sizeof(rates[0]) * numrates); + + for (int i = 0; i < numrates; i++) { + cfg_elem = config_setting_get_elem(cfg_rates, i); + + if (config_setting_type(cfg_elem) != CONFIG_TYPE_INT) + cerror(cfg_elem, "The 'rates' setting must be an array of strings"); + + rates[i] = config_setting_get_int(cfg_elem); + } + } + else + cerror(cfg, "Node %s requires setting 'rates' which must be a list of integers", node_name(n)); + + cfg_values = config_setting_get_member(cfg, "values"); + if (cfg_values) { + if (!config_setting_is_array(cfg_values) || !config_setting_length(cfg_values)) + cerror(cfg_values, "The 'values' setting must be an array of integers with at least one element."); + + numvalues = config_setting_length(cfg_values); + values = alloc(sizeof(values[0]) * numvalues); + + for (int i = 0; i < numvalues; i++) { + cfg_elem = config_setting_get_elem(cfg_values, i); + + if (config_setting_type(cfg_elem) != CONFIG_TYPE_INT) + cerror(cfg_elem, "The 'values' setting must be an array of strings"); + + values[i] = config_setting_get_int(cfg_elem); + } + } + else + cerror(cfg, "Node %s requires setting 'values' which must be a list of integers", node_name(n)); + + /* Initialize IO module */ + struct plugin *p; + + p = plugin_lookup(PLUGIN_TYPE_FORMAT, format); + if (!p) + cerror(cfg, "Invalid value for setting 'format'"); + + ret = io_init(&t->io, &p->io, IO_FORMAT_ALL & ~IO_FORMAT_VALUES); + if (ret) + return ret; + + /* Generate list of test cases */ + list_init(&t->cases); + + for (int i = 0; i < numrates; i++) { + for (int j = 0; j < numvalues; j++) { + struct test_rtt_case *c = alloc(sizeof(struct test_rtt_case)); + + c->rate = rates[i]; + c->values = values[j]; + c->limit = limit; + + list_push(&t->cases, c); + } + } + + return 0; +} + +char * test_rtt_print(struct node *n) +{ + struct test_rtt *t = n->_vd; + + return strf("output=%s, cooldown=%f, #cases=%zu", t->output, t->cooldown, list_length(&t->cases)); +} + +int test_rtt_start(struct node *n) +{ + struct test_rtt *t = n->_vd; + + t->current = 0; + + test_rtt_case_start(t); + + return 0; +} + +int test_rtt_stop(struct node *n) +{ + int ret; + struct test_rtt *t = n->_vd; + + ret = test_rtt_case_stop(t->current); + if (ret) + error("Failed to stop test case"); + + list_destroy(&t->cases, NULL, true); + + return 0; +} + +int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt) +{ + int ret, i; + uint64_t steps; + + struct test_rtt *t = n->_vd; + struct test_rtt_case *c = t->current; + + /* Wait */ + ret = read(c->tfd, &steps, sizeof(steps)); + if (ret != sizeof(steps)) + return -1; + + if (steps > 1) + warn("Skipped %zu samples", steps - 1); + + struct timespec now = time_now(); + + /* Prepare samples. */ + for (i = 0; i < cnt; i++) { + if (c->counter >= c->limit) { + info("Reached limit. Terminating."); + killme(SIGTERM); + pause(); + } + + int values = c->values; + if (smps[i]->capacity < MAX(2, values)) { + values = smps[i]->capacity; + warn("Sample capacity too small. Limiting to %d values.", values); + } + + smps[i]->data[0].i = c->values; + smps[i]->data[1].f = c->rate; + + smps[i]->length = values; + smps[i]->sequence = c->counter++; + smps[i]->ts.origin = now; + } + + return i; +} + +int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt) +{ + struct test_rtt *t = n->_vd; + + int i; + + for (i = 0; i < cnt; i++) { + if (smps[i]->length != c->values) { + warn("Discarding invalid sample"); + continue; + } + + io_print(&t->io, smps[i], 1); + } + + return i; +} + +static struct plugin p = { + .name = "test_rtt", + .description = "Test round-trip time with loopback", + .type = PLUGIN_TYPE_NODE, + .node = { + .vectorize = 0, + .size = sizeof(struct test_rtt), + .parse = test_rtt_parse, + .print = test_rtt_print, + .start = test_rtt_start, + .stop = test_rtt_stop, + .read = test_rtt_read, + .write = test_rtt_write + } +}; + +REGISTER_PLUGIN(&p) +LIST_INIT_STATIC(&p.node.instances) diff --git a/lib/path.c b/lib/path.c index 50fd6b6d9..276b5796c 100644 --- a/lib/path.c +++ b/lib/path.c @@ -63,7 +63,7 @@ static void path_read(struct path *p) if (recv < 0) error("Failed to receive message from node %s", node_name(ps->node)); else if (recv < ready) - warn("Partial read for path %s: read=%u expected=%u", path_name(p), recv, ready); + warn("Partial read for path %s: read=%u, expected=%u", path_name(p), recv, ready); /* Run preprocessing hooks for vector of samples */ enqueue = hook_read_list(&p->hooks, smps, recv); @@ -73,7 +73,7 @@ static void path_read(struct path *p) if (p->stats) stats_update(p->stats->delta, STATS_SKIPPED, recv - enqueue); } - + /* Keep track of the lowest index that wasn't enqueued; * all following samples must be freed here */ for (size_t i = 0; i < list_length(&p->destinations); i++) { @@ -123,7 +123,7 @@ static void path_write(struct path *p) if (sent < 0) error("Failed to sent %u samples to node %s", cnt, node_name(pd->node)); else if (sent < tosend) - warn("Partial write to node %s", node_name(pd->node)); + warn("Partial write to node %s: written=%d, expected=%d", node_name(pd->node), sent, tosend); released = sample_put_many(smps, sent); @@ -235,7 +235,7 @@ int path_parse(struct path *p, config_setting_t *cfg, struct list *nodes) struct node *n = list_at(&destinations, i); struct path_destination *pd = alloc(sizeof(struct path_destination)); - + pd->node = n; pd->queuelen = p->queuelen; @@ -281,7 +281,7 @@ int path_init2(struct path *p) if (vt->builtin) { struct hook *h = alloc(sizeof(struct hook)); - + ret = hook_init(h, vt, p); if (ret) { free(h);