diff --git a/include/villas/nodes/test_rtt.hpp b/include/villas/nodes/test_rtt.hpp index e1cce1715..c6eecf26c 100644 --- a/include/villas/nodes/test_rtt.hpp +++ b/include/villas/nodes/test_rtt.hpp @@ -9,63 +9,78 @@ #include #include +#include #include namespace villas { namespace node { // Forward declarations -class NodeCompat; struct Sample; -struct test_rtt; +class TestRTT : public Node { -struct test_rtt_case { - double rate; - unsigned values; - unsigned limit; // The number of samples we take per test. +protected: + class Case { + friend TestRTT; - char *filename; - char *filename_formatted; + protected: + TestRTT *node; - NodeCompat *node; -}; + int id; + double rate; + unsigned values; + unsigned limit; // The number of samples we take per test. -struct test_rtt { - struct Task task; // The periodic task for test_rtt_read() - Format *formatter; // The format of the output file + std::string filename; + std::string filename_formatted; + + public: + Case(TestRTT *n, int id, int rate, int values, int limit, + std::string filename) + : node(n), id(id), rate(rate), values(values), limit(limit), + filename(filename){}; + + int start(); + int stop(); + }; + + Task task; // The periodic task for test_rtt_read() + Format::Ptr formatter; // The format of the output file FILE *stream; double cooldown; // Number of seconds to wait between tests. - int current; // Index of current test in test_rtt::cases int counter; - struct List cases; // List of test cases + std::list cases; // List of test cases + std::list::iterator current_case; - char *output; // The directory where we place the results. - char *prefix; // An optional prefix in the filename. + std::string output; // The directory where we place the results. + std::string prefix; // An optional prefix in the filename. + + virtual int _read(struct Sample *smps[], unsigned cnt); + + virtual int _write(struct Sample *smps[], unsigned cnt); + +public: + TestRTT(const uuid_t &id = {}, const std::string &name = "") + : Node(id, name), task(CLOCK_MONOTONIC), formatter(nullptr) {} + + virtual ~TestRTT(){}; + + virtual int prepare(); + + virtual int parse(json_t *json); + + virtual int start(); + + virtual int stop(); + + virtual std::vector getPollFDs(); + + virtual const std::string &getDetails(); }; -char *test_rtt_print(NodeCompat *n); - -int test_rtt_parse(NodeCompat *n, json_t *json); - -int test_rtt_prepare(NodeCompat *n); - -int test_rtt_init(NodeCompat *n); - -int test_rtt_destroy(NodeCompat *n); - -int test_rtt_start(NodeCompat *n); - -int test_rtt_stop(NodeCompat *n); - -int test_rtt_read(NodeCompat *n, struct Sample *const smps[], unsigned cnt); - -int test_rtt_write(NodeCompat *n, struct Sample *const smps[], unsigned cnt); - -int test_rtt_poll_fds(NodeCompat *n, int fds[]); - } // namespace node } // namespace villas diff --git a/lib/nodes/test_rtt.cpp b/lib/nodes/test_rtt.cpp index 3325955f0..2fdaab4a9 100644 --- a/lib/nodes/test_rtt.cpp +++ b/lib/nodes/test_rtt.cpp @@ -12,7 +12,6 @@ #include #include -#include #include #include #include @@ -22,59 +21,40 @@ using namespace villas; using namespace villas::node; using namespace villas::utils; -static NodeCompatType p; - -static int test_rtt_case_start(NodeCompat *n, int id) { - auto *t = n->getData(); - struct test_rtt_case *c = (struct test_rtt_case *)list_at(&t->cases, id); - - n->logger->info( - "Starting case #{}/{}: filename={}, rate={}, values={}, limit={}", - t->current, list_length(&t->cases), c->filename_formatted, c->rate, c->values, c->limit); +int TestRTT::Case::start() { + node->logger->info( + "Starting case #{}/{}: filename={}, rate={}, values={}, limit={}", id, + node->cases.size(), filename_formatted, rate, values, limit); // Open file - t->stream = fopen(c->filename_formatted, "a+"); - if (!t->stream) + node->stream = fopen(filename_formatted.c_str(), "a+"); + if (!node->stream) return -1; // Start timer - t->task.setRate(c->rate); + node->task.setRate(rate); - t->counter = 0; - t->current = id; + node->counter = 0; return 0; } -static int test_rtt_case_stop(NodeCompat *n, int id) { +int TestRTT::Case::stop() { int ret; - auto *t = n->getData(); // Stop timer - t->task.stop(); + node->task.stop(); - ret = fclose(t->stream); + ret = fclose(node->stream); if (ret) throw SystemError("Failed to close file"); - n->logger->info("Stopping case #{}", id); + node->logger->info("Stopping case #{}/{}", id, node->cases.size()); return 0; } -static int test_rtt_case_destroy(struct test_rtt_case *c) { - if (c->filename) - free(c->filename); - - if (c->filename_formatted) - free(c->filename_formatted); - - return 0; -} - -int villas::node::test_rtt_prepare(NodeCompat *n) { - auto *t = n->getData(); - +int TestRTT::prepare() { unsigned max_values = 0; // Take current for time for test case prefix @@ -82,73 +62,67 @@ int villas::node::test_rtt_prepare(NodeCompat *n) { struct tm tm; gmtime_r(&ts, &tm); - for (size_t i = 0; i < list_length(&t->cases); i++) { - struct test_rtt_case *c = (struct test_rtt_case *)list_at(&t->cases, i); + for (auto &c : cases) { + if (c.values > max_values) + max_values = c.values; - if (c->values > max_values) - max_values = c->values; - - c->filename_formatted = new char[NAME_MAX]; - if (!c->filename_formatted) + auto filename_formatted = new char[NAME_MAX]; + if (!filename_formatted) throw MemoryAllocationError(); - strftime(c->filename_formatted, NAME_MAX, c->filename, &tm); + strftime(filename_formatted, NAME_MAX, c.filename.c_str(), &tm); + + c.filename_formatted = filename_formatted; } - n->in.signals = std::make_shared(max_values, SignalType::FLOAT); + in.signals = std::make_shared(max_values, SignalType::FLOAT); - return 0; + return Node::prepare(); } -int villas::node::test_rtt_parse(NodeCompat *n, json_t *json) { +int TestRTT::parse(json_t *json) { int ret; - auto *t = n->getData(); - const char *output = "."; - const char *prefix = nullptr; - - std::vector rates; - std::vector values; + const char *output_str = "."; + const char *prefix_str = nullptr; size_t i; json_t *json_cases, *json_case, *json_val, *json_format = nullptr; json_t *json_rates = nullptr, *json_values = nullptr; json_error_t err; - t->cooldown = 0; + cooldown = 0; - // Generate list of test cases - ret = list_init(&t->cases); - if (ret) - return ret; - - ret = json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: o, s?: F, s: o }", - "prefix", &prefix, "output", &output, "format", - &json_format, "cooldown", &t->cooldown, "cases", - &json_cases); + ret = + json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: o, s?: F, s: o }", + "prefix", &prefix_str, "output", &output_str, "format", + &json_format, "cooldown", &cooldown, "cases", &json_cases); if (ret) throw ConfigError(json, err, "node-config-node-test-rtt"); - t->output = strdup(output); - t->prefix = strdup(prefix ? prefix : n->getNameShort().c_str()); + output = output_str; + prefix = prefix_str ? prefix_str : getNameShort(); - // Initialize IO module - if (!json_format) - json_format = json_string("villas.binary"); + // Formatter + auto *fmt = json_format ? FormatFactory::make(json_format) + : FormatFactory::make("villas.human"); - t->formatter = FormatFactory::make(json_format); - if (!t->formatter) - throw ConfigError(json, "node-config-node-test-rtt-format", - "Invalid value for setting 'format'"); + formatter = Format::Ptr(fmt); + if (!formatter) + throw ConfigError(json_format, "node-config-node-exec-format", + "Invalid format configuration"); // Construct List of test cases if (!json_is_array(json_cases)) throw ConfigError(json_cases, "node-config-node-test-rtt-format", "The 'cases' setting must be an array."); + int id = 0; json_array_foreach(json_cases, i, json_case) { int limit = -1; double duration = -1; // in secs + std::vector rates; + std::vector values; ret = json_unpack_ex(json_case, &err, 0, "{ s: o, s: o, s?: i, s?: F }", "rates", &json_rates, "values", &json_values, "limit", @@ -169,9 +143,6 @@ int villas::node::test_rtt_parse(NodeCompat *n, json_t *json) { json_case, "node-config-node-test-rtt-values", "The 'values' setting must be an integer or an array of integers"); - values.clear(); - rates.clear(); - if (json_is_array(json_rates)) { size_t j; json_array_foreach(json_rates, j, json_val) { @@ -200,28 +171,18 @@ int villas::node::test_rtt_parse(NodeCompat *n, json_t *json) { for (int rate : rates) { for (int value : values) { - auto *c = new struct test_rtt_case; - if (!c) - throw MemoryAllocationError(); - - c->filename = nullptr; - c->filename_formatted = nullptr; - c->node = n; - - c->rate = rate; - c->values = value; - + int lim; if (limit > 0) - c->limit = limit; + lim = limit; else if (duration > 0) - c->limit = duration * c->rate; + lim = duration * rate; else - c->limit = 1000; // default value + lim = 1000; // Default value - c->filename = strf("%s/%s_values%d_rate%.0f.log", t->output, t->prefix, - c->values, c->rate); + auto filename = fmt::format("{}/{}_values{}_rate{}.log", output, prefix, + value, rate); - list_push(&t->cases, c); + cases.emplace_back(this, id++, rate, value, lim, filename); } } } @@ -229,205 +190,137 @@ int villas::node::test_rtt_parse(NodeCompat *n, json_t *json) { return 0; } -int villas::node::test_rtt_init(NodeCompat *n) { - auto *t = n->getData(); - - new (&t->task) Task(CLOCK_MONOTONIC); - - t->formatter = nullptr; - - return 0; -} - -int villas::node::test_rtt_destroy(NodeCompat *n) { - int ret; - auto *t = n->getData(); - - ret = list_destroy(&t->cases, (dtor_cb_t)test_rtt_case_destroy, true); - if (ret) - return ret; - - t->task.~Task(); - - if (t->output) - free(t->output); - - if (t->prefix) - free(t->prefix); - - if (t->formatter) - delete t->formatter; - - return 0; -} - -char *villas::node::test_rtt_print(NodeCompat *n) { - auto *t = n->getData(); - - return strf("output=%s, prefix=%s, cooldown=%f, #cases=%zu", t->output, - t->prefix, t->cooldown, list_length(&t->cases)); -} - -int villas::node::test_rtt_start(NodeCompat *n) { - int ret; - struct stat st; - auto *t = n->getData(); - struct test_rtt_case *c = (struct test_rtt_case *)list_first(&t->cases); - - // Create folder for results if not present - ret = stat(t->output, &st); - if (ret || !S_ISDIR(st.st_mode)) { - ret = mkdir(t->output, 0777); - if (ret) - throw SystemError("Failed to create output directory: {}", t->output); +const std::string &TestRTT::getDetails() { + if (details.empty()) { + details = fmt::format("output={}, prefix={}, cooldown={}, #cases={}", + output, prefix, cooldown, cases.size()); } - t->formatter->start(n->getInputSignals(false), ~(int)SampleFlags::HAS_DATA); + return details; +} - t->task.setRate(c->rate); +int TestRTT::start() { + int ret; + struct stat st; - t->current = -1; - t->counter = -1; + // Create output folder for results if not present + ret = stat(output.c_str(), &st); + if (ret || !S_ISDIR(st.st_mode)) { + ret = mkdir(output.c_str(), 0777); + if (ret) + throw SystemError("Failed to create output directory: {}", output); + } + + formatter->start(getInputSignals(false), ~(int)SampleFlags::HAS_DATA); + + current_case = cases.begin(); + counter = -1; + + task.setRate(current_case->rate); + + ret = Node::start(); + if (!ret) + state = State::STARTED; return 0; } -int villas::node::test_rtt_stop(NodeCompat *n) { +int TestRTT::stop() { int ret; - auto *t = n->getData(); - if (t->counter >= 0) { - ret = test_rtt_case_stop(n, t->current); + if (counter >= 0 && current_case != cases.end()) { + ret = current_case->stop(); if (ret) return ret; } - delete t->formatter; - return 0; } -int villas::node::test_rtt_read(NodeCompat *n, struct Sample *const smps[], - unsigned cnt) { - int ret; - unsigned i; - uint64_t steps; +int TestRTT::_read(struct Sample *smps[], unsigned cnt) { + // Wait for next sample or cooldown + auto steps = task.wait(); + if (steps > 1) { + logger->warn("Skipped {} steps", steps - 1); + } - auto *t = n->getData(); + // Cooldown of last case completed. Terminating.. + if (current_case == cases.end()) { + logger->info("This was the last case."); + + setState(State::STOPPING); + + return -1; + } // Handle start/stop of new cases - if (t->counter == -1) { - if (t->current < 0) { - t->current = 0; - } else { - ret = test_rtt_case_stop(n, t->current); - if (ret) - return ret; - - t->current++; - } - - if ((unsigned)t->current >= list_length(&t->cases)) { - n->logger->info("This was the last case."); - - n->setState(State::STOPPING); - - return -1; - } else { - ret = test_rtt_case_start(n, t->current); - if (ret) - return ret; - } + if (counter < 0) { + int ret = current_case->start(); + if (ret) + return ret; } - struct test_rtt_case *c = - (struct test_rtt_case *)list_at(&t->cases, t->current); - - // Wait - steps = t->task.wait(); - if (steps > 1) - n->logger->warn("Skipped {} steps", steps - 1); - - if ((unsigned)t->counter >= c->limit) { - n->logger->info("Stopping case #{}", t->current); - - t->counter = -1; - - if (t->cooldown) { - n->logger->info("Entering cooldown phase. Waiting {} seconds...", - t->cooldown); - t->task.setTimeout(t->cooldown); - } - - return 0; - } else { - struct timespec now = time_now(); - - // Prepare samples - for (i = 0; i < cnt; i++) { - smps[i]->length = c->values; - smps[i]->sequence = t->counter; - smps[i]->ts.origin = now; - smps[i]->flags = (int)SampleFlags::HAS_DATA | - (int)SampleFlags::HAS_SEQUENCE | - (int)SampleFlags::HAS_TS_ORIGIN; - smps[i]->signals = n->getInputSignals(false); - - t->counter++; - } - - return i; - } -} - -int villas::node::test_rtt_write(NodeCompat *n, struct Sample *const smps[], - unsigned cnt) { - auto *t = n->getData(); - - if (t->current < 0) - return 0; - - struct test_rtt_case *c = - (struct test_rtt_case *)list_at(&t->cases, t->current); + auto now = time_now(); + // Prepare samples unsigned i; for (i = 0; i < cnt; i++) { - if (smps[i]->length != c->values) { - n->logger->warn("Discarding invalid sample due to mismatching length: " - "expecting={}, has={}", - c->values, smps[i]->length); - continue; + auto smp = smps[i]; + + smp->length = current_case->values; + smp->sequence = counter; + smp->ts.origin = now; + smp->flags = (int)SampleFlags::HAS_DATA | (int)SampleFlags::HAS_SEQUENCE | + (int)SampleFlags::HAS_TS_ORIGIN; + smp->signals = getInputSignals(false); + + counter++; + } + + if ((unsigned)counter >= current_case->limit) { + logger->info("Stopping case #{}", current_case->id); + + counter = -1; + + if (cooldown) { + logger->info("Entering cooldown phase. Waiting {} seconds...", cooldown); + task.setTimeout(cooldown); } - t->formatter->print(t->stream, smps[i]); + current_case++; + + return 0; } return i; } -int villas::node::test_rtt_poll_fds(NodeCompat *n, int fds[]) { - auto *t = n->getData(); +int TestRTT::_write(struct Sample *smps[], unsigned cnt) { + if (current_case == cases.end()) + return 0; - fds[0] = t->task.getFD(); + unsigned i; + for (i = 0; i < cnt; i++) { + if (smps[i]->length != current_case->values) { + logger->warn("Discarding invalid sample due to mismatching length: " + "expecting={}, has={}", + current_case->values, smps[i]->length); + continue; + } - return 1; + formatter->print(stream, smps[i]); + } + + return i; } -__attribute__((constructor(110))) static void register_plugin() { - p.name = "test_rtt"; - p.description = "Test round-trip time with loopback"; - p.vectorize = 0; - p.flags = (int)NodeFactory::Flags::PROVIDES_SIGNALS; - p.size = sizeof(struct test_rtt); - p.parse = test_rtt_parse; - p.prepare = test_rtt_prepare; - p.init = test_rtt_init; - p.destroy = test_rtt_destroy; - p.print = test_rtt_print; - p.start = test_rtt_start; - p.stop = test_rtt_stop; - p.read = test_rtt_read; - p.write = test_rtt_write; +std::vector TestRTT::getPollFDs() { return {task.getFD()}; } - static NodeCompatFactory ncp(&p); -} +// Register node +static char n[] = "test_rtt"; +static char d[] = "Test round-trip time with loopback"; +static NodePlugin + p;