1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

test_rtt: Port to C++

Signed-off-by: Steffen Vogel <post@steffenvogel.de>
This commit is contained in:
Steffen Vogel 2024-04-09 23:39:13 +02:00 committed by pipeacosta
parent 176e969c6a
commit 55238f58b9
2 changed files with 199 additions and 291 deletions

View file

@ -9,63 +9,78 @@
#include <villas/format.hpp>
#include <villas/list.hpp>
#include <villas/node.hpp>
#include <villas/task.hpp>
namespace villas {
namespace node {
// Forward declarations
class NodeCompat;
struct Sample;
struct test_rtt;
class TestRTT : public Node {
struct test_rtt_case {
double rate;
unsigned values;
unsigned limit; // The number of samples we take per test.
protected:
class Case {
friend TestRTT;
char *filename;
char *filename_formatted;
protected:
TestRTT *node;
NodeCompat *node;
};
int id;
double rate;
unsigned values;
unsigned limit; // The number of samples we take per test.
struct test_rtt {
struct Task task; // The periodic task for test_rtt_read()
Format *formatter; // The format of the output file
std::string filename;
std::string filename_formatted;
public:
Case(TestRTT *n, int id, int rate, int values, int limit,
std::string filename)
: node(n), id(id), rate(rate), values(values), limit(limit),
filename(filename){};
int start();
int stop();
};
Task task; // The periodic task for test_rtt_read()
Format::Ptr formatter; // The format of the output file
FILE *stream;
double cooldown; // Number of seconds to wait between tests.
int current; // Index of current test in test_rtt::cases
int counter;
struct List cases; // List of test cases
std::list<Case> cases; // List of test cases
std::list<Case>::iterator current_case;
char *output; // The directory where we place the results.
char *prefix; // An optional prefix in the filename.
std::string output; // The directory where we place the results.
std::string prefix; // An optional prefix in the filename.
virtual int _read(struct Sample *smps[], unsigned cnt);
virtual int _write(struct Sample *smps[], unsigned cnt);
public:
TestRTT(const uuid_t &id = {}, const std::string &name = "")
: Node(id, name), task(CLOCK_MONOTONIC), formatter(nullptr) {}
virtual ~TestRTT(){};
virtual int prepare();
virtual int parse(json_t *json);
virtual int start();
virtual int stop();
virtual std::vector<int> getPollFDs();
virtual const std::string &getDetails();
};
char *test_rtt_print(NodeCompat *n);
int test_rtt_parse(NodeCompat *n, json_t *json);
int test_rtt_prepare(NodeCompat *n);
int test_rtt_init(NodeCompat *n);
int test_rtt_destroy(NodeCompat *n);
int test_rtt_start(NodeCompat *n);
int test_rtt_stop(NodeCompat *n);
int test_rtt_read(NodeCompat *n, struct Sample *const smps[], unsigned cnt);
int test_rtt_write(NodeCompat *n, struct Sample *const smps[], unsigned cnt);
int test_rtt_poll_fds(NodeCompat *n, int fds[]);
} // namespace node
} // namespace villas

View file

@ -12,7 +12,6 @@
#include <villas/exceptions.hpp>
#include <villas/format.hpp>
#include <villas/node_compat.hpp>
#include <villas/nodes/test_rtt.hpp>
#include <villas/sample.hpp>
#include <villas/timing.hpp>
@ -22,59 +21,40 @@ using namespace villas;
using namespace villas::node;
using namespace villas::utils;
static NodeCompatType p;
static int test_rtt_case_start(NodeCompat *n, int id) {
auto *t = n->getData<struct test_rtt>();
struct test_rtt_case *c = (struct test_rtt_case *)list_at(&t->cases, id);
n->logger->info(
"Starting case #{}/{}: filename={}, rate={}, values={}, limit={}",
t->current, list_length(&t->cases), c->filename_formatted, c->rate, c->values, c->limit);
int TestRTT::Case::start() {
node->logger->info(
"Starting case #{}/{}: filename={}, rate={}, values={}, limit={}", id,
node->cases.size(), filename_formatted, rate, values, limit);
// Open file
t->stream = fopen(c->filename_formatted, "a+");
if (!t->stream)
node->stream = fopen(filename_formatted.c_str(), "a+");
if (!node->stream)
return -1;
// Start timer
t->task.setRate(c->rate);
node->task.setRate(rate);
t->counter = 0;
t->current = id;
node->counter = 0;
return 0;
}
static int test_rtt_case_stop(NodeCompat *n, int id) {
int TestRTT::Case::stop() {
int ret;
auto *t = n->getData<struct test_rtt>();
// Stop timer
t->task.stop();
node->task.stop();
ret = fclose(t->stream);
ret = fclose(node->stream);
if (ret)
throw SystemError("Failed to close file");
n->logger->info("Stopping case #{}", id);
node->logger->info("Stopping case #{}/{}", id, node->cases.size());
return 0;
}
static int test_rtt_case_destroy(struct test_rtt_case *c) {
if (c->filename)
free(c->filename);
if (c->filename_formatted)
free(c->filename_formatted);
return 0;
}
int villas::node::test_rtt_prepare(NodeCompat *n) {
auto *t = n->getData<struct test_rtt>();
int TestRTT::prepare() {
unsigned max_values = 0;
// Take current for time for test case prefix
@ -82,73 +62,67 @@ int villas::node::test_rtt_prepare(NodeCompat *n) {
struct tm tm;
gmtime_r(&ts, &tm);
for (size_t i = 0; i < list_length(&t->cases); i++) {
struct test_rtt_case *c = (struct test_rtt_case *)list_at(&t->cases, i);
for (auto &c : cases) {
if (c.values > max_values)
max_values = c.values;
if (c->values > max_values)
max_values = c->values;
c->filename_formatted = new char[NAME_MAX];
if (!c->filename_formatted)
auto filename_formatted = new char[NAME_MAX];
if (!filename_formatted)
throw MemoryAllocationError();
strftime(c->filename_formatted, NAME_MAX, c->filename, &tm);
strftime(filename_formatted, NAME_MAX, c.filename.c_str(), &tm);
c.filename_formatted = filename_formatted;
}
n->in.signals = std::make_shared<SignalList>(max_values, SignalType::FLOAT);
in.signals = std::make_shared<SignalList>(max_values, SignalType::FLOAT);
return 0;
return Node::prepare();
}
int villas::node::test_rtt_parse(NodeCompat *n, json_t *json) {
int TestRTT::parse(json_t *json) {
int ret;
auto *t = n->getData<struct test_rtt>();
const char *output = ".";
const char *prefix = nullptr;
std::vector<int> rates;
std::vector<int> values;
const char *output_str = ".";
const char *prefix_str = nullptr;
size_t i;
json_t *json_cases, *json_case, *json_val, *json_format = nullptr;
json_t *json_rates = nullptr, *json_values = nullptr;
json_error_t err;
t->cooldown = 0;
cooldown = 0;
// Generate list of test cases
ret = list_init(&t->cases);
if (ret)
return ret;
ret = json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: o, s?: F, s: o }",
"prefix", &prefix, "output", &output, "format",
&json_format, "cooldown", &t->cooldown, "cases",
&json_cases);
ret =
json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: o, s?: F, s: o }",
"prefix", &prefix_str, "output", &output_str, "format",
&json_format, "cooldown", &cooldown, "cases", &json_cases);
if (ret)
throw ConfigError(json, err, "node-config-node-test-rtt");
t->output = strdup(output);
t->prefix = strdup(prefix ? prefix : n->getNameShort().c_str());
output = output_str;
prefix = prefix_str ? prefix_str : getNameShort();
// Initialize IO module
if (!json_format)
json_format = json_string("villas.binary");
// Formatter
auto *fmt = json_format ? FormatFactory::make(json_format)
: FormatFactory::make("villas.human");
t->formatter = FormatFactory::make(json_format);
if (!t->formatter)
throw ConfigError(json, "node-config-node-test-rtt-format",
"Invalid value for setting 'format'");
formatter = Format::Ptr(fmt);
if (!formatter)
throw ConfigError(json_format, "node-config-node-exec-format",
"Invalid format configuration");
// Construct List of test cases
if (!json_is_array(json_cases))
throw ConfigError(json_cases, "node-config-node-test-rtt-format",
"The 'cases' setting must be an array.");
int id = 0;
json_array_foreach(json_cases, i, json_case) {
int limit = -1;
double duration = -1; // in secs
std::vector<int> rates;
std::vector<int> values;
ret = json_unpack_ex(json_case, &err, 0, "{ s: o, s: o, s?: i, s?: F }",
"rates", &json_rates, "values", &json_values, "limit",
@ -169,9 +143,6 @@ int villas::node::test_rtt_parse(NodeCompat *n, json_t *json) {
json_case, "node-config-node-test-rtt-values",
"The 'values' setting must be an integer or an array of integers");
values.clear();
rates.clear();
if (json_is_array(json_rates)) {
size_t j;
json_array_foreach(json_rates, j, json_val) {
@ -200,28 +171,18 @@ int villas::node::test_rtt_parse(NodeCompat *n, json_t *json) {
for (int rate : rates) {
for (int value : values) {
auto *c = new struct test_rtt_case;
if (!c)
throw MemoryAllocationError();
c->filename = nullptr;
c->filename_formatted = nullptr;
c->node = n;
c->rate = rate;
c->values = value;
int lim;
if (limit > 0)
c->limit = limit;
lim = limit;
else if (duration > 0)
c->limit = duration * c->rate;
lim = duration * rate;
else
c->limit = 1000; // default value
lim = 1000; // Default value
c->filename = strf("%s/%s_values%d_rate%.0f.log", t->output, t->prefix,
c->values, c->rate);
auto filename = fmt::format("{}/{}_values{}_rate{}.log", output, prefix,
value, rate);
list_push(&t->cases, c);
cases.emplace_back(this, id++, rate, value, lim, filename);
}
}
}
@ -229,205 +190,137 @@ int villas::node::test_rtt_parse(NodeCompat *n, json_t *json) {
return 0;
}
int villas::node::test_rtt_init(NodeCompat *n) {
auto *t = n->getData<struct test_rtt>();
new (&t->task) Task(CLOCK_MONOTONIC);
t->formatter = nullptr;
return 0;
}
int villas::node::test_rtt_destroy(NodeCompat *n) {
int ret;
auto *t = n->getData<struct test_rtt>();
ret = list_destroy(&t->cases, (dtor_cb_t)test_rtt_case_destroy, true);
if (ret)
return ret;
t->task.~Task();
if (t->output)
free(t->output);
if (t->prefix)
free(t->prefix);
if (t->formatter)
delete t->formatter;
return 0;
}
char *villas::node::test_rtt_print(NodeCompat *n) {
auto *t = n->getData<struct test_rtt>();
return strf("output=%s, prefix=%s, cooldown=%f, #cases=%zu", t->output,
t->prefix, t->cooldown, list_length(&t->cases));
}
int villas::node::test_rtt_start(NodeCompat *n) {
int ret;
struct stat st;
auto *t = n->getData<struct test_rtt>();
struct test_rtt_case *c = (struct test_rtt_case *)list_first(&t->cases);
// Create folder for results if not present
ret = stat(t->output, &st);
if (ret || !S_ISDIR(st.st_mode)) {
ret = mkdir(t->output, 0777);
if (ret)
throw SystemError("Failed to create output directory: {}", t->output);
const std::string &TestRTT::getDetails() {
if (details.empty()) {
details = fmt::format("output={}, prefix={}, cooldown={}, #cases={}",
output, prefix, cooldown, cases.size());
}
t->formatter->start(n->getInputSignals(false), ~(int)SampleFlags::HAS_DATA);
return details;
}
t->task.setRate(c->rate);
int TestRTT::start() {
int ret;
struct stat st;
t->current = -1;
t->counter = -1;
// Create output folder for results if not present
ret = stat(output.c_str(), &st);
if (ret || !S_ISDIR(st.st_mode)) {
ret = mkdir(output.c_str(), 0777);
if (ret)
throw SystemError("Failed to create output directory: {}", output);
}
formatter->start(getInputSignals(false), ~(int)SampleFlags::HAS_DATA);
current_case = cases.begin();
counter = -1;
task.setRate(current_case->rate);
ret = Node::start();
if (!ret)
state = State::STARTED;
return 0;
}
int villas::node::test_rtt_stop(NodeCompat *n) {
int TestRTT::stop() {
int ret;
auto *t = n->getData<struct test_rtt>();
if (t->counter >= 0) {
ret = test_rtt_case_stop(n, t->current);
if (counter >= 0 && current_case != cases.end()) {
ret = current_case->stop();
if (ret)
return ret;
}
delete t->formatter;
return 0;
}
int villas::node::test_rtt_read(NodeCompat *n, struct Sample *const smps[],
unsigned cnt) {
int ret;
unsigned i;
uint64_t steps;
int TestRTT::_read(struct Sample *smps[], unsigned cnt) {
// Wait for next sample or cooldown
auto steps = task.wait();
if (steps > 1) {
logger->warn("Skipped {} steps", steps - 1);
}
auto *t = n->getData<struct test_rtt>();
// Cooldown of last case completed. Terminating..
if (current_case == cases.end()) {
logger->info("This was the last case.");
setState(State::STOPPING);
return -1;
}
// Handle start/stop of new cases
if (t->counter == -1) {
if (t->current < 0) {
t->current = 0;
} else {
ret = test_rtt_case_stop(n, t->current);
if (ret)
return ret;
t->current++;
}
if ((unsigned)t->current >= list_length(&t->cases)) {
n->logger->info("This was the last case.");
n->setState(State::STOPPING);
return -1;
} else {
ret = test_rtt_case_start(n, t->current);
if (ret)
return ret;
}
if (counter < 0) {
int ret = current_case->start();
if (ret)
return ret;
}
struct test_rtt_case *c =
(struct test_rtt_case *)list_at(&t->cases, t->current);
// Wait
steps = t->task.wait();
if (steps > 1)
n->logger->warn("Skipped {} steps", steps - 1);
if ((unsigned)t->counter >= c->limit) {
n->logger->info("Stopping case #{}", t->current);
t->counter = -1;
if (t->cooldown) {
n->logger->info("Entering cooldown phase. Waiting {} seconds...",
t->cooldown);
t->task.setTimeout(t->cooldown);
}
return 0;
} else {
struct timespec now = time_now();
// Prepare samples
for (i = 0; i < cnt; i++) {
smps[i]->length = c->values;
smps[i]->sequence = t->counter;
smps[i]->ts.origin = now;
smps[i]->flags = (int)SampleFlags::HAS_DATA |
(int)SampleFlags::HAS_SEQUENCE |
(int)SampleFlags::HAS_TS_ORIGIN;
smps[i]->signals = n->getInputSignals(false);
t->counter++;
}
return i;
}
}
int villas::node::test_rtt_write(NodeCompat *n, struct Sample *const smps[],
unsigned cnt) {
auto *t = n->getData<struct test_rtt>();
if (t->current < 0)
return 0;
struct test_rtt_case *c =
(struct test_rtt_case *)list_at(&t->cases, t->current);
auto now = time_now();
// Prepare samples
unsigned i;
for (i = 0; i < cnt; i++) {
if (smps[i]->length != c->values) {
n->logger->warn("Discarding invalid sample due to mismatching length: "
"expecting={}, has={}",
c->values, smps[i]->length);
continue;
auto smp = smps[i];
smp->length = current_case->values;
smp->sequence = counter;
smp->ts.origin = now;
smp->flags = (int)SampleFlags::HAS_DATA | (int)SampleFlags::HAS_SEQUENCE |
(int)SampleFlags::HAS_TS_ORIGIN;
smp->signals = getInputSignals(false);
counter++;
}
if ((unsigned)counter >= current_case->limit) {
logger->info("Stopping case #{}", current_case->id);
counter = -1;
if (cooldown) {
logger->info("Entering cooldown phase. Waiting {} seconds...", cooldown);
task.setTimeout(cooldown);
}
t->formatter->print(t->stream, smps[i]);
current_case++;
return 0;
}
return i;
}
int villas::node::test_rtt_poll_fds(NodeCompat *n, int fds[]) {
auto *t = n->getData<struct test_rtt>();
int TestRTT::_write(struct Sample *smps[], unsigned cnt) {
if (current_case == cases.end())
return 0;
fds[0] = t->task.getFD();
unsigned i;
for (i = 0; i < cnt; i++) {
if (smps[i]->length != current_case->values) {
logger->warn("Discarding invalid sample due to mismatching length: "
"expecting={}, has={}",
current_case->values, smps[i]->length);
continue;
}
return 1;
formatter->print(stream, smps[i]);
}
return i;
}
__attribute__((constructor(110))) static void register_plugin() {
p.name = "test_rtt";
p.description = "Test round-trip time with loopback";
p.vectorize = 0;
p.flags = (int)NodeFactory::Flags::PROVIDES_SIGNALS;
p.size = sizeof(struct test_rtt);
p.parse = test_rtt_parse;
p.prepare = test_rtt_prepare;
p.init = test_rtt_init;
p.destroy = test_rtt_destroy;
p.print = test_rtt_print;
p.start = test_rtt_start;
p.stop = test_rtt_stop;
p.read = test_rtt_read;
p.write = test_rtt_write;
std::vector<int> TestRTT::getPollFDs() { return {task.getFD()}; }
static NodeCompatFactory ncp(&p);
}
// Register node
static char n[] = "test_rtt";
static char d[] = "Test round-trip time with loopback";
static NodePlugin<TestRTT, n, d,
(int)NodeFactory::Flags::SUPPORTS_READ |
(int)NodeFactory::Flags::SUPPORTS_WRITE |
(int)NodeFactory::Flags::SUPPORTS_POLL>
p;