diff --git a/etc/examples/nodes/test_rtt.conf b/etc/examples/nodes/test_rtt.conf index 60c7ce910..5fba2e525 100644 --- a/etc/examples/nodes/test_rtt.conf +++ b/etc/examples/nodes/test_rtt.conf @@ -20,6 +20,10 @@ nodes = { # The output format of the result files format = "villas.human", + # Shutdown the process after the cooldown phase of the + # last test case has been completed. + shutdown = true; + # The list of test cases # Each test case can specify a single or an array of rates and values # If arrays are used, we will generate multiple test cases with all diff --git a/include/villas/nodes/test_rtt.hpp b/include/villas/nodes/test_rtt.hpp index c3e58f461..dc3f98d81 100644 --- a/include/villas/nodes/test_rtt.hpp +++ b/include/villas/nodes/test_rtt.hpp @@ -29,16 +29,34 @@ protected: int id; double rate; + double + warmup; // Number of seconds to wait between before recording samples. + double cooldown; // Number of seconds to wait between tests. unsigned values; - unsigned limit; // The number of samples we take per test. + unsigned missed; + + unsigned limit; // The number of samples we send per test. + unsigned sent; + unsigned received; + + unsigned limit_warmup; // The number of samples we send during warmup. + unsigned sent_warmup; + unsigned received_warmup; + + struct timespec started; + struct timespec stopped; std::string filename; std::string filename_formatted; + json_t *getMetadata(); + public: - Case(TestRTT *n, int id, int rate, int values, int limit, - const std::string &filename) - : node(n), id(id), rate(rate), values(values), limit(limit), + Case(TestRTT *n, int id, int rate, float warmup, float cooldown, int values, + int limit, int limit_warmup, const std::string &filename) + : node(n), id(id), rate(rate), warmup(warmup), cooldown(cooldown), + values(values), missed(0), limit(limit), sent(0), received(0), + limit_warmup(limit_warmup), sent_warmup(0), received_warmup(0), filename(filename){}; int start(); @@ -49,16 +67,14 @@ protected: Format::Ptr formatter; // The format of the output file FILE *stream; - double cooldown; // Number of seconds to wait between tests. - - unsigned counter; - std::list cases; // List of test cases - std::list::iterator current_case; + std::list::iterator current; std::string output; // The directory where we place the results. std::string prefix; // An optional prefix in the filename. + bool shutdown; + virtual int _read(struct Sample *smps[], unsigned cnt); virtual int _write(struct Sample *smps[], unsigned cnt); @@ -66,7 +82,7 @@ protected: public: TestRTT(const uuid_t &id = {}, const std::string &name = "") : Node(id, name), task(CLOCK_MONOTONIC), formatter(nullptr), - stream(nullptr), cooldown(0), counter(-1) {} + stream(nullptr), shutdown(false) {} virtual ~TestRTT(){}; @@ -83,5 +99,33 @@ public: virtual const std::string &getDetails(); }; +class TestRTTNodeFactory : public NodeFactory { + +public: + using NodeFactory::NodeFactory; + + virtual Node *make(const uuid_t &id = {}, const std::string &nme = "") { + auto *n = new TestRTT(id, nme); + + init(n); + + return n; + } + + virtual int getFlags() const { + return (int)NodeFactory::Flags::SUPPORTS_READ | + (int)NodeFactory::Flags::SUPPORTS_WRITE | + (int)NodeFactory::Flags::SUPPORTS_POLL; + } + + virtual std::string getName() const { return "test_rtt"; } + + virtual std::string getDescription() const { + return "Test round-trip time with loopback"; + } + + virtual int start(SuperNode *sn); +}; + } // namespace node } // namespace villas diff --git a/lib/nodes/test_rtt.cpp b/lib/nodes/test_rtt.cpp index 78d485728..ffabd02fb 100644 --- a/lib/nodes/test_rtt.cpp +++ b/lib/nodes/test_rtt.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -21,16 +22,23 @@ using namespace villas; using namespace villas::node; using namespace villas::utils; +static SuperNode *sn = nullptr; + int TestRTT::Case::start() { - node->logger->info( - "Starting case #{}/{}: filename={}, rate={}, values={}, limit={}", id + 1, - node->cases.size(), filename_formatted, rate, values, limit); + node->logger->info("Starting case #{}/{}: filename={}, rate={}/s, values={}, " + "limit={}smps, warmup={}s, cooldown={}s", + id + 1, node->cases.size(), filename_formatted, rate, + values, limit, warmup, cooldown); // Open file node->stream = fopen(filename_formatted.c_str(), "a+"); if (!node->stream) return -1; + started = time_now(); + + node->formatter->reset(); + // Start timer node->task.setRate(rate); @@ -40,18 +48,37 @@ int TestRTT::Case::start() { int TestRTT::Case::stop() { int ret; + stopped = time_now(); + // Stop timer node->task.stop(); + node->formatter->printMetadata(node->stream, getMetadata()); + ret = fclose(node->stream); if (ret) throw SystemError("Failed to close file"); - node->logger->info("Stopping case #{}/{}", id + 1, node->cases.size()); + node->logger->info("Stopping case #{}/{}: sent={}, received={}, duration={:.2}", id + 1, node->cases.size(), sent, received, time_delta(&started, &stopped)); return 0; } +json_t *TestRTT::Case::getMetadata() { + json_t *json_warmup = nullptr; + + if (limit_warmup > 0) { + json_warmup = json_pack("{ s: i, s: i, s: i }", "limit", limit_warmup, + "sent", sent_warmup, "received", received_warmup); + } + + return json_pack( + "{ s: i, s: f, s: i, s: f, s: f, s: i, s: i, s: i, s: i, s: o* }", "id", + id, "rate", rate, "values", values, "started", time_to_double(&started), + "stopped", time_to_double(&stopped), "missed", missed, "limit", limit, + "sent", sent, "received", received, "warmup", json_warmup); +} + int TestRTT::prepare() { unsigned max_values = 0; @@ -83,24 +110,31 @@ int TestRTT::parse(json_t *json) { const char *output_str = "."; const char *prefix_str = nullptr; + double warmup_default = 0; + double cooldown_default = 0; + int shutdown_ = -1; size_t i; json_t *json_cases, *json_case, *json_val, *json_format = nullptr; - json_t *json_rates = nullptr, *json_values = nullptr; + json_t *json_rates_default = nullptr, *json_values_default = nullptr; json_error_t err; - cooldown = 0; - - ret = - json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: o, s?: F, s: o }", - "prefix", &prefix_str, "output", &output_str, "format", - &json_format, "cooldown", &cooldown, "cases", &json_cases); + ret = json_unpack_ex( + json, &err, 0, + "{ s?: s, s?: s, s?: o, s?: F, s?: F, s?: o, s?: o, s: o, s?: b }", + "prefix", &prefix_str, "output", &output_str, "format", &json_format, + "cooldown", &cooldown_default, "warmup", &warmup_default, "values", + &json_values_default, "rates", &json_rates_default, "cases", &json_cases, + "shutdown", &shutdown_); if (ret) throw ConfigError(json, err, "node-config-node-test-rtt"); output = output_str; prefix = prefix_str ? prefix_str : getNameShort(); + if (shutdown_ > 0) + shutdown = shutdown_ > 0; + // Formatter auto *fmt = json_format ? FormatFactory::make(json_format) : FormatFactory::make("villas.human"); @@ -110,6 +144,18 @@ int TestRTT::parse(json_t *json) { throw ConfigError(json_format, "node-config-node-test-rtt-format", "Invalid format configuration"); + if (json_rates_default && !json_is_array(json_rates_default) && + !json_is_number(json_rates_default)) + throw ConfigError( + json_rates_default, "node-config-node-test-rtt-rates", + "The 'rates' setting must be a real or an array of real numbers"); + + if (json_values_default && !json_is_array(json_values_default) && + !json_is_integer(json_values_default)) + throw ConfigError( + json_values_default, "node-config-node-test-rtt-values", + "The 'values' setting must be an integer or an array of integers"); + // Construct List of test cases if (!json_is_array(json_cases)) throw ConfigError(json_cases, "node-config-node-test-rtt-format", @@ -118,13 +164,19 @@ int TestRTT::parse(json_t *json) { int id = 0; json_array_foreach(json_cases, i, json_case) { int limit = -1; - double duration = -1; // in secs + double duration = -1; // in secs + double cooldown = cooldown_default; // in secs + double warmup = warmup_default; // in secs std::vector rates; std::vector values; - ret = json_unpack_ex(json_case, &err, 0, "{ s: o, s: o, s?: i, s?: F }", + json_t *json_rates = json_rates_default; + json_t *json_values = json_values_default; + + ret = json_unpack_ex(json_case, &err, 0, "{ s?: o, s?: o, s?: i, s?: F }", "rates", &json_rates, "values", &json_values, "limit", - &limit, "duration", &duration); + &limit, "duration", &duration, "warmup", &warmup, + "cooldown", &cooldown); if (limit > 0 && duration > 0) throw ConfigError( @@ -169,7 +221,7 @@ int TestRTT::parse(json_t *json) { for (int rate : rates) { for (int value : values) { - int lim; + int lim, lim_warmup; if (limit > 0) lim = limit; else if (duration > 0) @@ -177,10 +229,13 @@ int TestRTT::parse(json_t *json) { else lim = 1000; // Default value + lim_warmup = warmup * rate; + auto filename = fmt::format("{}/{}_values{}_rate{}.log", output, prefix, value, rate); - cases.emplace_back(this, id++, rate, value, lim, filename); + cases.emplace_back(this, id++, rate, warmup, cooldown, value, lim, + lim_warmup, filename); } } } @@ -190,8 +245,8 @@ int TestRTT::parse(json_t *json) { const std::string &TestRTT::getDetails() { if (details.empty()) { - details = fmt::format("output={}, prefix={}, cooldown={}, #cases={}", - output, prefix, cooldown, cases.size()); + details = fmt::format("output={}, prefix={}, #cases={}, shutdown={}", output, prefix, + cases.size(), shutdown); } return details; @@ -211,10 +266,9 @@ int TestRTT::start() { formatter->start(getInputSignals(false), ~(int)SampleFlags::HAS_DATA); - current_case = cases.begin(); - counter = 0; + current = cases.begin(); - task.setRate(current_case->rate); + task.setRate(current->rate); ret = Node::start(); if (!ret) @@ -226,8 +280,8 @@ int TestRTT::start() { int TestRTT::stop() { int ret; - if (counter > 0 && current_case != cases.end()) { - ret = current_case->stop(); + if (current != cases.end() && current->received > 0) { + ret = current->stop(); if (ret) return ret; } @@ -242,31 +296,40 @@ int TestRTT::_read(struct Sample *smps[], unsigned cnt) { auto steps = task.wait(); if (steps > 1) { logger->warn("Skipped {} steps", steps - 1); + current->missed += steps - 1; } // Cooldown of case completed.. - if (counter >= current_case->limit) { - ret = current_case->stop(); + if (current->sent + current->sent_warmup >= + current->limit + current->limit_warmup) { + ret = current->stop(); if (ret) return ret; - if (++current_case == cases.end()) { + if (++current == cases.end()) { logger->info("This was the last case."); setState(State::STOPPING); + if (sn && shutdown) + sn->setState(State::STOPPING); + return -1; } - - counter = 0; } // Handle start/stop of new cases - if (counter == 0) { - ret = current_case->start(); + if (current->sent + current->sent_warmup == 0) { + ret = current->start(); if (ret) return ret; - } + + if (current->limit_warmup > 0) + logger->info("Starting warmup phase. Sending {} samples...", + current->limit_warmup); + } else if (current->sent == current->limit_warmup) + logger->info("Completed warmup phase. Sending {} samples...", + current->limit); auto now = time_now(); @@ -275,20 +338,25 @@ int TestRTT::_read(struct Sample *smps[], unsigned cnt) { for (i = 0; i < cnt; i++) { auto smp = smps[i]; - smp->length = current_case->values; - smp->sequence = counter; + smp->length = current->values; + smp->sequence = current->sent + current->sent_warmup; smp->ts.origin = now; smp->flags = (int)SampleFlags::HAS_DATA | (int)SampleFlags::HAS_SEQUENCE | (int)SampleFlags::HAS_TS_ORIGIN; smp->signals = getInputSignals(false); - counter++; + if (current->sent_warmup < current->limit_warmup) + current->sent_warmup++; + else + current->sent++; } - if (counter >= current_case->limit) { - if (cooldown) { - logger->info("Entering cooldown phase. Waiting {} seconds...", cooldown); - task.setTimeout(cooldown); + if (current->sent + current->sent_warmup >= + current->limit + current->limit_warmup) { + if (current->cooldown > 0) { + logger->info("Entering cooldown phase. Waiting {} seconds...", + current->cooldown); + task.setTimeout(current->cooldown); } else { task.setTimeout(0); // Start next case immediately } @@ -298,19 +366,28 @@ int TestRTT::_read(struct Sample *smps[], unsigned cnt) { } int TestRTT::_write(struct Sample *smps[], unsigned cnt) { - if (current_case == cases.end()) + if (current == cases.end()) return 0; unsigned i; for (i = 0; i < cnt; i++) { - if (smps[i]->length != current_case->values) { + auto *smp = smps[i]; + + if (smp->length != current->values) { logger->warn("Discarding invalid sample due to mismatching length: " "expecting={}, has={}", - current_case->values, smps[i]->length); + current->values, smp->length); + continue; + } + + if (smp->sequence < current->limit_warmup) { + // Skip samples from warmup phase + current->received_warmup++; continue; } formatter->print(stream, smps[i]); + current->received++; } return i; @@ -318,11 +395,10 @@ int TestRTT::_write(struct Sample *smps[], unsigned cnt) { std::vector TestRTT::getPollFDs() { return {task.getFD()}; } -// Register node -static char n[] = "test_rtt"; -static char d[] = "Test round-trip time with loopback"; -static NodePlugin - p; +int TestRTTNodeFactory::start(SuperNode *sn_) { + sn = sn_; + + return 0; +} + +static TestRTTNodeFactory p;