diff --git a/common b/common index af39c2b30..117dda747 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit af39c2b305999f962d2bdb1b743b9de063acddc4 +Subproject commit 117dda74728679b890e20fb8bb1fc4b5792fc46d diff --git a/lib/api/CMakeLists.txt b/lib/api/CMakeLists.txt index 458df4102..71acd90b3 100644 --- a/lib/api/CMakeLists.txt +++ b/lib/api/CMakeLists.txt @@ -35,6 +35,7 @@ set(API_SRC actions/paths.cpp actions/restart.cpp actions/node.cpp + actions/stats.cpp ) if(WITH_WEB) diff --git a/lib/api/actions/stats.cpp b/lib/api/actions/stats.cpp new file mode 100644 index 000000000..fa8316495 --- /dev/null +++ b/lib/api/actions/stats.cpp @@ -0,0 +1,79 @@ +/** The API ressource for getting and resetting statistics. + * + * @author Steffen Vogel + * @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC + * @license GNU General Public License (version 3) + * + * VILLASnode + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + *********************************************************************************/ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace villas { +namespace node { +namespace api { + +class StatsAction : public Action { + +public: + using Action::Action; + + virtual int execute(json_t *args, json_t **resp) + { + int ret, reset = 0; + json_error_t err; + + ret = json_unpack_ex(args, &err, 0, "{ s?: b }", + "reset", &reset + ); + if (ret < 0) + return ret; + + struct vlist *nodes = session->getSuperNode()->getNodes(); + + for (size_t i = 0; i < vlist_length(nodes); i++) { + struct node *n = (struct node *) vlist_at(nodes, i); + + if (n->stats) { + if (reset) { + stats_reset(n->stats); + info("Stats resetted for node %s", node_name(n)); + } + } + } + + *resp = json_object(); + + return 0; + } + +}; + +/* Register actions */ +static ActionPlugin p1("stats", "get or reset internal statistics counters"); + +} /* namespace api */ +} /* namespace node */ +} /* namespace villas */ diff --git a/lib/hooks/stats.cpp b/lib/hooks/stats.cpp index 879dc52f8..258ddef5a 100644 --- a/lib/hooks/stats.cpp +++ b/lib/hooks/stats.cpp @@ -153,7 +153,12 @@ public: output(nullptr), uri(nullptr) { + int ret; + stats.state = STATE_DESTROYED; + ret = stats_init(&stats, buckets, warmup); + if (ret) + throw RuntimeError("Failed to initialize stats"); /* Register statistic object to path. * @@ -177,18 +182,6 @@ public: stats_destroy(&stats); } - virtual void prepare() - { - int ret; - assert(state == STATE_CHECKED); - - ret = stats_init(&stats, buckets, warmup); - if (ret) - throw RuntimeError("Failed to initialze stats"); - - state = STATE_PREPARED; - } - virtual void start() { assert(state == STATE_PREPARED); diff --git a/lib/nodes/rtp.cpp b/lib/nodes/rtp.cpp index 0ebceed33..730a03400 100644 --- a/lib/nodes/rtp.cpp +++ b/lib/nodes/rtp.cpp @@ -82,7 +82,7 @@ static int rtp_set_rate(struct node *n, double rate) return -1; } - r->logger->debug("Set rate limiting for node {} to {}", node_name(n), rate); + r->logger->info("Set rate limiting for node {} to {}", node_name(n), rate); return 0; } @@ -111,8 +111,6 @@ static int rtp_aimd(struct node *n, double loss_frac) if (r->aimd.log) *(r->aimd.log) << r->rtcp.num_rrs << "\t" << loss_frac << "\t" << rate << std::endl; - r->logger->debug("AIMD: {}\t{}\t{}", r->rtcp.num_rrs, loss_frac, rate); - return 0; } @@ -339,17 +337,18 @@ static void rtcp_handler(const struct sa *src, struct rtcp_msg *msg, void *arg) if (msg->hdr.pt == RTCP_SR) { if (msg->hdr.count > 0) { const struct rtcp_rr *rr = &msg->r.sr.rrv[0]; - debug(5, "RTP: fraction lost = %d", rr->fraction); - double loss = (double) rr->fraction / 256; + double loss_frac = (double) rr->fraction / 256; - rtp_aimd(n, loss); + rtp_aimd(n, loss_frac); if (n->stats) { stats_update(n->stats, STATS_METRIC_RTP_PKTS_LOST, rr->lost); - stats_update(n->stats, STATS_METRIC_RTP_LOSS_FRACTION, loss); + stats_update(n->stats, STATS_METRIC_RTP_LOSS_FRACTION, loss_frac); stats_update(n->stats, STATS_METRIC_RTP_JITTER, rr->jitter); } + + r->logger->info("RTCP: rr: num_rrs={}, loss_frac={}, pkts_lost={}, jitter={}", r->rtcp.num_rrs, loss_frac, rr->lost, rr->jitter); } else debug(5, "RTCP: Received sender report with zero reception reports"); @@ -392,10 +391,18 @@ int rtp_start(struct node *n) switch (r->rtcp.throttle_mode) { case RTCP_THROTTLE_HOOK_DECIMATE: r->rtcp.throttle_hook.decimate = new DecimateHook(nullptr, n, 0, 0); + r->rtcp.throttle_hook.decimate->parse(); + r->rtcp.throttle_hook.decimate->check(); + r->rtcp.throttle_hook.decimate->prepare(); + r->rtcp.throttle_hook.decimate->start(); break; case RTCP_THROTTLE_HOOK_LIMIT_RATE: r->rtcp.throttle_hook.limit_rate = new LimitRateHook(nullptr, n, 0, 0); + r->rtcp.throttle_hook.limit_rate->parse(); + r->rtcp.throttle_hook.limit_rate->check(); + r->rtcp.throttle_hook.limit_rate->prepare(); + r->rtcp.throttle_hook.limit_rate->start(); break; default: @@ -423,7 +430,7 @@ int rtp_start(struct node *n) rtcp_start(r->rs, node_name(n), &r->out.saddr_rtcp); - if (r->rtcp.mode == RTCP_MODE_AIMD) { + if (r->aimd.log) { char fn[128]; time_t ts = time(nullptr); diff --git a/lib/nodes/test_rtt.cpp b/lib/nodes/test_rtt.cpp index 4d19ea396..34a84259b 100644 --- a/lib/nodes/test_rtt.cpp +++ b/lib/nodes/test_rtt.cpp @@ -38,6 +38,8 @@ static int test_rtt_case_start(struct test_rtt *t, int id) int ret; struct test_rtt_case *c = (struct test_rtt_case *) vlist_at(&t->cases, id); + info("Starting case #%d: filename=%s, rate=%f, values=%d, limit=%d", t->current, c->filename_formatted, c->rate, c->values, c->limit); + /* Open file */ ret = io_open(&t->io, c->filename_formatted); if (ret) @@ -68,6 +70,8 @@ static int test_rtt_case_stop(struct test_rtt *t, int id) if (ret) return ret; + info("Stopping case #%d", id); + return 0; } @@ -197,7 +201,7 @@ int test_rtt_parse(struct node *n, json_t *cfg) if (!json_is_number(json_val)) error("The 'rates' setting of node %s must be an array of real numbers", node_name(n)); - rates[i] = json_integer_value(json_val); + rates[j] = json_integer_value(json_val); } } else @@ -210,15 +214,10 @@ int test_rtt_parse(struct node *n, json_t *cfg) error("The 'values' setting of node %s must be an array of integers", node_name(n)); values[j] = json_integer_value(json_val); - if (values[j] < 2) - error("Each 'values' entry must be at least 2 or larger"); } } - else { + else values[0] = json_integer_value(json_values); - if (values[0] <= 2) - error("Each 'values' entry must be at least 2 or larger"); - } for (int i = 0; i < numrates; i++) { for (int j = 0; j < numvalues; j++) { @@ -288,7 +287,7 @@ int test_rtt_start(struct node *n) if (ret || !S_ISDIR(st.st_mode)) { ret = mkdir(t->output, 0777); if (ret) { - warning("Failed to create output director: %s", t->output); + warning("Failed to create output directory: %s", t->output); return ret; } } @@ -301,10 +300,6 @@ int test_rtt_start(struct node *n) if (ret) return ret; - ret = io_check(&t->io); - if (ret) - return ret; - ret = task_init(&t->task, c->rate, CLOCK_MONOTONIC); if (ret) return ret; @@ -375,21 +370,6 @@ int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned if (steps > 1) warning("Skipped %ld steps", (long) (steps - 1)); - if (t->counter == 0) - info("Starting case #%d: filename=%s, rate=%f, values=%d, limit=%d", t->current, c->filename_formatted, c->rate, c->values, c->limit); - - struct timespec now = time_now(); - - /* Prepare samples */ - for (i = 0; i < cnt; i++) { - smps[i]->length = c->values; - smps[i]->sequence = t->counter; - smps[i]->ts.origin = now; - smps[i]->flags = SAMPLE_HAS_DATA | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_TS_ORIGIN; - - t->counter++; - } - if ((unsigned) t->counter >= c->limit) { info("Stopping case #%d", t->current); @@ -401,9 +381,25 @@ int test_rtt_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned if (ret < 0) return ret; } - } - return i; + return 0; + } + else { + struct timespec now = time_now(); + + /* Prepare samples */ + for (i = 0; i < cnt; i++) { + smps[i]->length = c->values; + smps[i]->sequence = t->counter; + smps[i]->ts.origin = now; + smps[i]->flags = SAMPLE_HAS_DATA | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_TS_ORIGIN; + + t->counter++; + } + + + return i; + } } int test_rtt_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release)