1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

enable new node-type test_rtt

This commit is contained in:
Steffen Vogel 2017-09-16 15:34:16 +02:00
parent d9d85ce2e8
commit d0901dd268
3 changed files with 233 additions and 102 deletions

View file

@ -29,27 +29,37 @@
#pragma once
#include "node.h"
#include "list.h"
#include "io.h"
#include "task.h"
/* Forward declarations */
struct test_rtt;
struct node;
struct sample;
struct test_rtt_case {
struct task task;
double rate;
int values;
int counter;
int limit; /**< The number of samples we take per test. */
char *filename;
};
struct test_rtt {
double cooldown; /**< Number of seconds to wait beween tests. */
int current; /**< Index of current test in test_rtt::cases */
struct task task; /**< The periodic task for test_rtt_read() */
struct io io; /**< The format of the output file */
struct io_format *format;
double cooldown; /**< Number of seconds to wait beween tests. */
int current; /**< Index of current test in test_rtt::cases */
int counter;
struct list cases; /**< List of test cases */
const char *output; /**< The directory where we place the results. */
char *output; /**< The directory where we place the results. */
char *prefix; /**< An optional prefix in the filename. */
};
/** @see node_type::print */

View file

@ -26,7 +26,7 @@ ifeq ($(PLATFORM),Linux)
WITH_LOOPBACK ?= 1
endif
WITH_TEST_RTT ?= 0
WITH_TEST_RTT ?= 1
WITH_FILE ?= 1
WITH_SIGNAL ?= 1
WITH_NGSI ?= 1

View file

@ -22,125 +22,205 @@
#include <stdio.h>
#include "node.h"
#include "sample.h"
#include "timing.h"
#include "plugin.h"
#include "nodes/test_rtt.h"
static int test_rtt_case_start(struct test_rtt *t, int caseno)
static int test_rtt_case_start(struct test_rtt *t, int id)
{
int ret;
char fn[512];
struct test_rtt_case *c = list_at(&t->cases, caseno);
struct test_rtt_case *c = list_at(&t->cases, id);
/* Open file */
snprintf(fn, sizeof(fn), "%s/test_rtt_%d_%.0f.log", t->output, c->values, c->rate);
ret = io_init(&t->io, NULL, FORMAT_ALL & ~FORMAT_VALUES);
ret = io_open(&t->io, c->filename);
if (ret)
return ret;
/* Start timer. */
ret = task_init(&t->timer, c->rate);
ret = task_set_rate(&t->task, c->rate);
if (ret)
serror("Failed to create timer");
return ret;
t->counter = 0;
t->current = id;
return 0;
}
static int test_rtt_case_stop(struct test_rtt_case *c)
static int test_rtt_case_stop(struct test_rtt *t, int id)
{
/* Close file */
io_close(&c->io);
int ret;
/* Stop timer. */
task_destroy(&c->task);
/* Stop timer */
ret = task_set_rate(&t->task, 0);
if (ret)
return ret;
/* Close file */
ret = io_close(&t->io);
if (ret)
return ret;
return 0;
}
int test_rtt_parse(struct node *n, json_t *cfg)
{
int ret;
struct test_rtt *t = n->_vd;
struct plugin *p;
int ret, numrates, numvalues, limit = 1000;
int *rates, *values;
const char *format = "villas-human";
const char *output = ".";
const char *prefix = node_name_short(n);
int *rates = NULL;
int *values = NULL;
int numrates = 0;
int numvalues = 0;
size_t index;
json_t *cfg_rates, *cfg_values, *cfg_val;
json_t *cfg_cases, *cfg_case, *cfg_val;
json_t *cfg_rates = NULL, *cfg_values = NULL;
json_error_t err;
t->cooldown = 1.0;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: i, s?: s, s?: s, s: F, s: o, s: o }",
"limit", &limit,
/* Generate list of test cases */
list_init(&t->cases);
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: F, s: o }",
"prefix", &prefix,
"output", &output,
"format", &format,
"cooldown", &t->cooldown,
"rates", &cfg_rates,
"values", &cfg_values
"cases", &cfg_cases
);
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
t->output = strdup(output);
if (cfg_rates) {
if (!json_is_array(cfg_rates) || json_array_size(cfg_rates) < 1)
error("The 'rates' setting of node %s must be an array of integers with at least one element.", node_name(n));
numrates = json_array_size(cfg_rates);
rates = alloc(sizeof(rates[0]) * numrates);
json_array_foreach(cfg_rates, index, cfg_val) {
if (!json_is_integer(cfg_val))
error("The 'rates' setting of node %s must be an array of integers", node_name(n));
rates[index] = json_integer_value(cfg_val);
}
}
if (cfg_values) {
if (!json_is_array(cfg_values) || json_array_size(cfg_values) < 1)
error("The 'values' setting of node %s must be an array of integers with at least one element.", node_name(n));
numvalues = json_array_size(cfg_values);
rates = alloc(sizeof(rates[0]) * numvalues);
json_array_foreach(cfg_values, index, cfg_val) {
if (!json_is_integer(cfg_val))
error("The 'values' setting of node %s must be an array of integers", node_name(n));
values[index] = json_integer_value(cfg_val);
}
}
t->prefix = strdup(prefix);
/* Initialize IO module */
p = plugin_lookup(PLUGIN_TYPE_FORMAT, format);
if (!p)
t->format = io_format_lookup(format);
if (!t->format)
error("Invalid value for setting 'format' of node %s", node_name(n));
ret = io_init(&t->io, &p->io, FORMAT_ALL & ~FORMAT_VALUES);
/* Construct list of test cases */
if (!json_is_array(cfg_cases))
error("The 'cases' setting of node %s must be an array.", node_name(n));
json_array_foreach(cfg_cases, index, cfg_case) {
int limit = -1;
double duration = -1; /* in secs */
ret = json_unpack_ex(cfg_case, &err, 0, "{ s: o, s: o, s?: i, s?: F }",
"rates", &cfg_rates,
"values", &cfg_values,
"limit", &limit,
"duration", &duration
);
if (limit > 0 && duration > 0)
error("The settings 'duration' and 'limit' of node %s must be used exclusively", node_name(n));
if (json_is_array(cfg_rates))
numrates = json_array_size(cfg_rates);
else if (json_is_number(cfg_rates))
numrates = 1;
else
error("The 'rates' setting of node %s must be a real or an array of real numbers", node_name(n));
if (json_is_array(cfg_values))
numvalues = json_array_size(cfg_values);
else if (json_is_integer(cfg_values))
numvalues = 1;
else
error("The 'values' setting of node %s must be an integer or an array of integers", node_name(n));
rates = realloc(rates, sizeof(rates[0]) * numrates);
values = realloc(values, sizeof(values[0]) * numvalues);
if (json_is_array(cfg_rates)) {
json_array_foreach(cfg_rates, index, cfg_val) {
if (!json_is_number(cfg_val))
error("The 'rates' setting of node %s must be an array of real numbers", node_name(n));
rates[index] = json_integer_value(cfg_val);
}
}
else
rates[0] = json_number_value(cfg_rates);
if (json_is_array(cfg_values)) {
json_array_foreach(cfg_values, index, cfg_val) {
if (!json_is_integer(cfg_val))
error("The 'values' setting of node %s must be an array of integers", node_name(n));
values[index] = json_integer_value(cfg_val);
if (values[index] < 2)
error("Each 'values' entry must be at least 2 or larger");
}
}
else {
values[0] = json_integer_value(cfg_values);
if (values[0] <= 2)
error("Each 'values' entry must be at least 2 or larger");
}
for (int i = 0; i < numrates; i++) {
for (int j = 0; j < numvalues; j++) {
struct test_rtt_case *c = alloc(sizeof(struct test_rtt_case));
c->rate = rates[i];
c->values = values[j];
if (limit > 0)
c->limit = limit;
else if (duration > 0)
c->limit = duration * c->rate;
else
c->limit = 1000; /* default value */
asprintf(&c->filename, "%s/%s_%d_%.0f.log", t->output, t->prefix, c->values, c->rate);
list_push(&t->cases, c);
}
}
}
free(values);
free(rates);
return 0;
}
int test_rtt_destroy(struct node *n)
{
int ret;
struct test_rtt *t = n->_vd;
ret = list_destroy(&t->cases, NULL, true);
if (ret)
return ret;
/* Generate list of test cases */
list_init(&t->cases);
ret = task_destroy(&t->task);
if (ret)
return ret;
for (int i = 0; i < numrates; i++) {
for (int j = 0; j < numvalues; j++) {
struct test_rtt_case *c = alloc(sizeof(struct test_rtt_case));
ret = io_destroy(&t->io);
if (ret)
return ret;
c->rate = rates[i];
c->values = values[j];
c->limit = limit;
if (t->output)
free(t->output);
list_push(&t->cases, c);
}
}
if (t->prefix)
free(t->prefix);
return 0;
}
@ -149,16 +229,25 @@ char * test_rtt_print(struct node *n)
{
struct test_rtt *t = n->_vd;
return strf("output=%s, cooldown=%f, #cases=%zu", t->output, t->cooldown, list_length(&t->cases));
return strf("output=%s, prefix=%s, cooldown=%f, #cases=%zu", t->output, t->prefix, t->cooldown, list_length(&t->cases));
}
int test_rtt_start(struct node *n)
{
int ret;
struct test_rtt *t = n->_vd;
struct test_rtt_case *c = list_first(&t->cases);
t->current = 0;
ret = io_init(&t->io, t->format, SAMPLE_HAS_ALL & ~SAMPLE_HAS_VALUES);
if (ret)
return ret;
test_rtt_case_start(t);
ret = task_init(&t->task, c->rate, CLOCK_MONOTONIC);
if (ret)
return ret;
t->current = -1;
t->counter = -1;
return 0;
}
@ -168,50 +257,81 @@ int test_rtt_stop(struct node *n)
int ret;
struct test_rtt *t = n->_vd;
ret = test_rtt_case_stop(t->current);
if (ret)
error("Failed to stop test case");
list_destroy(&t->cases, NULL, true);
if (t->counter != -1) {
ret = test_rtt_case_stop(t, t->current);
if (ret)
return ret;
}
return 0;
}
int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt)
{
int ret, i;
int i, ret, values;
uint64_t steps;
struct test_rtt *t = n->_vd;
struct test_rtt_case *c = t->current;
/* Wait */
steps = task_wait_until_next_period(&c->task);
if (steps > 1)
warn("Skipped %zu samples", steps - 1);
if (t->counter == -1) {
if (t->current == -1) {
t->current = 0;
}
else {
ret = test_rtt_case_stop(t, t->current);
if (ret)
return ret;
struct timespec now = time_now();
t->current++;
}
/* Prepare samples. */
for (i = 0; i < cnt; i++) {
if (c->counter >= c->limit) {
info("Reached limit. Terminating.");
if (t->current >= list_length(&t->cases)) {
info("This was the last case. Terminating.");
killme(SIGTERM);
pause();
}
else {
struct test_rtt_case *c = list_at(&t->cases, t->current);
info("Starting case #%d: filename=%s, rate=%f, values=%d, limit=%d", t->current, c->filename, c->rate, c->values, c->limit);
ret = test_rtt_case_start(t, t->current);
if (ret)
return ret;
}
}
int values = c->values;
if (smps[i]->capacity < MAX(2, values)) {
struct test_rtt_case *c = list_at(&t->cases, t->current);
/* Wait */
steps = task_wait(&t->task);
if (steps > 1)
warn("Skipped %zu steps", steps - 1);
struct timespec now = time_now();
/* Prepare samples */
for (i = 0; i < cnt; i++) {
values = c->values;
if (smps[i]->capacity < values) {
values = smps[i]->capacity;
warn("Sample capacity too small. Limiting to %d values.", values);
}
smps[i]->data[0].i = c->values;
smps[i]->data[1].f = c->rate;
smps[i]->length = values;
smps[i]->sequence = c->counter++;
smps[i]->sequence = t->counter;
smps[i]->ts.origin = now;
smps[i]->flags = SAMPLE_HAS_VALUES | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_ORIGIN;
t->counter++;
}
if (t->counter >= c->limit) {
info("Entering cooldown phase. Waiting %f seconds...", t->cooldown);
t->counter = -1;
ret = task_set_timeout(&t->task, t->cooldown);
if (ret < 0)
return ret;
}
return i;
@ -220,16 +340,16 @@ int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt)
int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct test_rtt *t = n->_vd;
struct test_rtt_case *c = list_at(&t->cases, t->current);
int i;
for (i = 0; i < cnt; i++) {
if (smps[i]->length != c->values) {
warn("Discarding invalid sample");
warn("Discarding invalid sample due to mismatching length: expecting=%d, has=%d", c->values, smps[i]->length);
continue;
}
io_print(&t->io, smps[i], 1);
io_print(&t->io, &smps[i], 1);
}
return i;
@ -250,6 +370,7 @@ static struct plugin p = {
.vectorize = 0,
.size = sizeof(struct test_rtt),
.parse = test_rtt_parse,
.destroy = test_rtt_destroy,
.print = test_rtt_print,
.start = test_rtt_start,
.stop = test_rtt_stop,