1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

test_rtt: Another round of new features

Signed-off-by: Steffen Vogel <post@steffenvogel.de>
This commit is contained in:
Steffen Vogel 2024-05-27 12:50:48 +02:00 committed by pipeacosta
parent bae0a37526
commit 6efaa9e55b
3 changed files with 183 additions and 59 deletions

View file

@ -20,6 +20,10 @@ nodes = {
# The output format of the result files
format = "villas.human",
# Shutdown the process after the cooldown phase of the
# last test case has been completed.
shutdown = true;
# The list of test cases
# Each test case can specify a single or an array of rates and values
# If arrays are used, we will generate multiple test cases with all

View file

@ -29,16 +29,34 @@ protected:
int id;
double rate;
double
warmup; // Number of seconds to wait between before recording samples.
double cooldown; // Number of seconds to wait between tests.
unsigned values;
unsigned limit; // The number of samples we take per test.
unsigned missed;
unsigned limit; // The number of samples we send per test.
unsigned sent;
unsigned received;
unsigned limit_warmup; // The number of samples we send during warmup.
unsigned sent_warmup;
unsigned received_warmup;
struct timespec started;
struct timespec stopped;
std::string filename;
std::string filename_formatted;
json_t *getMetadata();
public:
Case(TestRTT *n, int id, int rate, int values, int limit,
const std::string &filename)
: node(n), id(id), rate(rate), values(values), limit(limit),
Case(TestRTT *n, int id, int rate, float warmup, float cooldown, int values,
int limit, int limit_warmup, const std::string &filename)
: node(n), id(id), rate(rate), warmup(warmup), cooldown(cooldown),
values(values), missed(0), limit(limit), sent(0), received(0),
limit_warmup(limit_warmup), sent_warmup(0), received_warmup(0),
filename(filename){};
int start();
@ -49,16 +67,14 @@ protected:
Format::Ptr formatter; // The format of the output file
FILE *stream;
double cooldown; // Number of seconds to wait between tests.
unsigned counter;
std::list<Case> cases; // List of test cases
std::list<Case>::iterator current_case;
std::list<Case>::iterator current;
std::string output; // The directory where we place the results.
std::string prefix; // An optional prefix in the filename.
bool shutdown;
virtual int _read(struct Sample *smps[], unsigned cnt);
virtual int _write(struct Sample *smps[], unsigned cnt);
@ -66,7 +82,7 @@ protected:
public:
TestRTT(const uuid_t &id = {}, const std::string &name = "")
: Node(id, name), task(CLOCK_MONOTONIC), formatter(nullptr),
stream(nullptr), cooldown(0), counter(-1) {}
stream(nullptr), shutdown(false) {}
virtual ~TestRTT(){};
@ -83,5 +99,33 @@ public:
virtual const std::string &getDetails();
};
class TestRTTNodeFactory : public NodeFactory {
public:
using NodeFactory::NodeFactory;
virtual Node *make(const uuid_t &id = {}, const std::string &nme = "") {
auto *n = new TestRTT(id, nme);
init(n);
return n;
}
virtual int getFlags() const {
return (int)NodeFactory::Flags::SUPPORTS_READ |
(int)NodeFactory::Flags::SUPPORTS_WRITE |
(int)NodeFactory::Flags::SUPPORTS_POLL;
}
virtual std::string getName() const { return "test_rtt"; }
virtual std::string getDescription() const {
return "Test round-trip time with loopback";
}
virtual int start(SuperNode *sn);
};
} // namespace node
} // namespace villas

View file

@ -14,6 +14,7 @@
#include <villas/format.hpp>
#include <villas/nodes/test_rtt.hpp>
#include <villas/sample.hpp>
#include <villas/super_node.hpp>
#include <villas/timing.hpp>
#include <villas/utils.hpp>
@ -21,16 +22,23 @@ using namespace villas;
using namespace villas::node;
using namespace villas::utils;
static SuperNode *sn = nullptr;
int TestRTT::Case::start() {
node->logger->info(
"Starting case #{}/{}: filename={}, rate={}, values={}, limit={}", id + 1,
node->cases.size(), filename_formatted, rate, values, limit);
node->logger->info("Starting case #{}/{}: filename={}, rate={}/s, values={}, "
"limit={}smps, warmup={}s, cooldown={}s",
id + 1, node->cases.size(), filename_formatted, rate,
values, limit, warmup, cooldown);
// Open file
node->stream = fopen(filename_formatted.c_str(), "a+");
if (!node->stream)
return -1;
started = time_now();
node->formatter->reset();
// Start timer
node->task.setRate(rate);
@ -40,18 +48,37 @@ int TestRTT::Case::start() {
int TestRTT::Case::stop() {
int ret;
stopped = time_now();
// Stop timer
node->task.stop();
node->formatter->printMetadata(node->stream, getMetadata());
ret = fclose(node->stream);
if (ret)
throw SystemError("Failed to close file");
node->logger->info("Stopping case #{}/{}", id + 1, node->cases.size());
node->logger->info("Stopping case #{}/{}: sent={}, received={}, duration={:.2}", id + 1, node->cases.size(), sent, received, time_delta(&started, &stopped));
return 0;
}
json_t *TestRTT::Case::getMetadata() {
json_t *json_warmup = nullptr;
if (limit_warmup > 0) {
json_warmup = json_pack("{ s: i, s: i, s: i }", "limit", limit_warmup,
"sent", sent_warmup, "received", received_warmup);
}
return json_pack(
"{ s: i, s: f, s: i, s: f, s: f, s: i, s: i, s: i, s: i, s: o* }", "id",
id, "rate", rate, "values", values, "started", time_to_double(&started),
"stopped", time_to_double(&stopped), "missed", missed, "limit", limit,
"sent", sent, "received", received, "warmup", json_warmup);
}
int TestRTT::prepare() {
unsigned max_values = 0;
@ -83,24 +110,31 @@ int TestRTT::parse(json_t *json) {
const char *output_str = ".";
const char *prefix_str = nullptr;
double warmup_default = 0;
double cooldown_default = 0;
int shutdown_ = -1;
size_t i;
json_t *json_cases, *json_case, *json_val, *json_format = nullptr;
json_t *json_rates = nullptr, *json_values = nullptr;
json_t *json_rates_default = nullptr, *json_values_default = nullptr;
json_error_t err;
cooldown = 0;
ret =
json_unpack_ex(json, &err, 0, "{ s?: s, s?: s, s?: o, s?: F, s: o }",
"prefix", &prefix_str, "output", &output_str, "format",
&json_format, "cooldown", &cooldown, "cases", &json_cases);
ret = json_unpack_ex(
json, &err, 0,
"{ s?: s, s?: s, s?: o, s?: F, s?: F, s?: o, s?: o, s: o, s?: b }",
"prefix", &prefix_str, "output", &output_str, "format", &json_format,
"cooldown", &cooldown_default, "warmup", &warmup_default, "values",
&json_values_default, "rates", &json_rates_default, "cases", &json_cases,
"shutdown", &shutdown_);
if (ret)
throw ConfigError(json, err, "node-config-node-test-rtt");
output = output_str;
prefix = prefix_str ? prefix_str : getNameShort();
if (shutdown_ > 0)
shutdown = shutdown_ > 0;
// Formatter
auto *fmt = json_format ? FormatFactory::make(json_format)
: FormatFactory::make("villas.human");
@ -110,6 +144,18 @@ int TestRTT::parse(json_t *json) {
throw ConfigError(json_format, "node-config-node-test-rtt-format",
"Invalid format configuration");
if (json_rates_default && !json_is_array(json_rates_default) &&
!json_is_number(json_rates_default))
throw ConfigError(
json_rates_default, "node-config-node-test-rtt-rates",
"The 'rates' setting must be a real or an array of real numbers");
if (json_values_default && !json_is_array(json_values_default) &&
!json_is_integer(json_values_default))
throw ConfigError(
json_values_default, "node-config-node-test-rtt-values",
"The 'values' setting must be an integer or an array of integers");
// Construct List of test cases
if (!json_is_array(json_cases))
throw ConfigError(json_cases, "node-config-node-test-rtt-format",
@ -118,13 +164,19 @@ int TestRTT::parse(json_t *json) {
int id = 0;
json_array_foreach(json_cases, i, json_case) {
int limit = -1;
double duration = -1; // in secs
double duration = -1; // in secs
double cooldown = cooldown_default; // in secs
double warmup = warmup_default; // in secs
std::vector<int> rates;
std::vector<int> values;
ret = json_unpack_ex(json_case, &err, 0, "{ s: o, s: o, s?: i, s?: F }",
json_t *json_rates = json_rates_default;
json_t *json_values = json_values_default;
ret = json_unpack_ex(json_case, &err, 0, "{ s?: o, s?: o, s?: i, s?: F }",
"rates", &json_rates, "values", &json_values, "limit",
&limit, "duration", &duration);
&limit, "duration", &duration, "warmup", &warmup,
"cooldown", &cooldown);
if (limit > 0 && duration > 0)
throw ConfigError(
@ -169,7 +221,7 @@ int TestRTT::parse(json_t *json) {
for (int rate : rates) {
for (int value : values) {
int lim;
int lim, lim_warmup;
if (limit > 0)
lim = limit;
else if (duration > 0)
@ -177,10 +229,13 @@ int TestRTT::parse(json_t *json) {
else
lim = 1000; // Default value
lim_warmup = warmup * rate;
auto filename = fmt::format("{}/{}_values{}_rate{}.log", output, prefix,
value, rate);
cases.emplace_back(this, id++, rate, value, lim, filename);
cases.emplace_back(this, id++, rate, warmup, cooldown, value, lim,
lim_warmup, filename);
}
}
}
@ -190,8 +245,8 @@ int TestRTT::parse(json_t *json) {
const std::string &TestRTT::getDetails() {
if (details.empty()) {
details = fmt::format("output={}, prefix={}, cooldown={}, #cases={}",
output, prefix, cooldown, cases.size());
details = fmt::format("output={}, prefix={}, #cases={}, shutdown={}", output, prefix,
cases.size(), shutdown);
}
return details;
@ -211,10 +266,9 @@ int TestRTT::start() {
formatter->start(getInputSignals(false), ~(int)SampleFlags::HAS_DATA);
current_case = cases.begin();
counter = 0;
current = cases.begin();
task.setRate(current_case->rate);
task.setRate(current->rate);
ret = Node::start();
if (!ret)
@ -226,8 +280,8 @@ int TestRTT::start() {
int TestRTT::stop() {
int ret;
if (counter > 0 && current_case != cases.end()) {
ret = current_case->stop();
if (current != cases.end() && current->received > 0) {
ret = current->stop();
if (ret)
return ret;
}
@ -242,31 +296,40 @@ int TestRTT::_read(struct Sample *smps[], unsigned cnt) {
auto steps = task.wait();
if (steps > 1) {
logger->warn("Skipped {} steps", steps - 1);
current->missed += steps - 1;
}
// Cooldown of case completed..
if (counter >= current_case->limit) {
ret = current_case->stop();
if (current->sent + current->sent_warmup >=
current->limit + current->limit_warmup) {
ret = current->stop();
if (ret)
return ret;
if (++current_case == cases.end()) {
if (++current == cases.end()) {
logger->info("This was the last case.");
setState(State::STOPPING);
if (sn && shutdown)
sn->setState(State::STOPPING);
return -1;
}
counter = 0;
}
// Handle start/stop of new cases
if (counter == 0) {
ret = current_case->start();
if (current->sent + current->sent_warmup == 0) {
ret = current->start();
if (ret)
return ret;
}
if (current->limit_warmup > 0)
logger->info("Starting warmup phase. Sending {} samples...",
current->limit_warmup);
} else if (current->sent == current->limit_warmup)
logger->info("Completed warmup phase. Sending {} samples...",
current->limit);
auto now = time_now();
@ -275,20 +338,25 @@ int TestRTT::_read(struct Sample *smps[], unsigned cnt) {
for (i = 0; i < cnt; i++) {
auto smp = smps[i];
smp->length = current_case->values;
smp->sequence = counter;
smp->length = current->values;
smp->sequence = current->sent + current->sent_warmup;
smp->ts.origin = now;
smp->flags = (int)SampleFlags::HAS_DATA | (int)SampleFlags::HAS_SEQUENCE |
(int)SampleFlags::HAS_TS_ORIGIN;
smp->signals = getInputSignals(false);
counter++;
if (current->sent_warmup < current->limit_warmup)
current->sent_warmup++;
else
current->sent++;
}
if (counter >= current_case->limit) {
if (cooldown) {
logger->info("Entering cooldown phase. Waiting {} seconds...", cooldown);
task.setTimeout(cooldown);
if (current->sent + current->sent_warmup >=
current->limit + current->limit_warmup) {
if (current->cooldown > 0) {
logger->info("Entering cooldown phase. Waiting {} seconds...",
current->cooldown);
task.setTimeout(current->cooldown);
} else {
task.setTimeout(0); // Start next case immediately
}
@ -298,19 +366,28 @@ int TestRTT::_read(struct Sample *smps[], unsigned cnt) {
}
int TestRTT::_write(struct Sample *smps[], unsigned cnt) {
if (current_case == cases.end())
if (current == cases.end())
return 0;
unsigned i;
for (i = 0; i < cnt; i++) {
if (smps[i]->length != current_case->values) {
auto *smp = smps[i];
if (smp->length != current->values) {
logger->warn("Discarding invalid sample due to mismatching length: "
"expecting={}, has={}",
current_case->values, smps[i]->length);
current->values, smp->length);
continue;
}
if (smp->sequence < current->limit_warmup) {
// Skip samples from warmup phase
current->received_warmup++;
continue;
}
formatter->print(stream, smps[i]);
current->received++;
}
return i;
@ -318,11 +395,10 @@ int TestRTT::_write(struct Sample *smps[], unsigned cnt) {
std::vector<int> TestRTT::getPollFDs() { return {task.getFD()}; }
// Register node
static char n[] = "test_rtt";
static char d[] = "Test round-trip time with loopback";
static NodePlugin<TestRTT, n, d,
(int)NodeFactory::Flags::SUPPORTS_READ |
(int)NodeFactory::Flags::SUPPORTS_WRITE |
(int)NodeFactory::Flags::SUPPORTS_POLL>
p;
int TestRTTNodeFactory::start(SuperNode *sn_) {
sn = sn_;
return 0;
}
static TestRTTNodeFactory p;