From b4150d2053b1da20398746f41baaa63726c7951f Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Fri, 12 Apr 2019 09:50:42 +0200 Subject: [PATCH 1/4] hooks: add a common base class for rate limiting hooks --- include/villas/hook.hpp | 9 +++++++++ include/villas/hooks/decimate.hpp | 15 +++++++++++++-- include/villas/hooks/limit_rate.hpp | 14 +++++++------- 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/include/villas/hook.hpp b/include/villas/hook.hpp index dcf78702b..253b8784c 100644 --- a/include/villas/hook.hpp +++ b/include/villas/hook.hpp @@ -140,6 +140,15 @@ public: } }; +class LimitHook : public Hook { + +public: + using Hook::Hook; + + virtual void setRate(double rate, double maxRate = -1) = 0; + +}; + class HookFactory : public plugin::Plugin { protected: diff --git a/include/villas/hooks/decimate.hpp b/include/villas/hooks/decimate.hpp index ec0cdf283..72261bb44 100644 --- a/include/villas/hooks/decimate.hpp +++ b/include/villas/hooks/decimate.hpp @@ -31,14 +31,25 @@ namespace villas { namespace node { -class DecimateHook : public Hook { +class DecimateHook : public LimitHook { protected: int ratio; unsigned counter; public: - using Hook::Hook; + using LimitHook::LimitHook; + + virtual void setRate(double rate, double maxRate = -1) + { + assert(maxRate > 0); + + int ratio = maxRate / rate; + if (ratio == 0) + ratio = 1; + + setRatio(ratio); + } void setRatio(int r) { diff --git a/include/villas/hooks/limit_rate.hpp b/include/villas/hooks/limit_rate.hpp index 2bed1f91a..26114b635 100644 --- a/include/villas/hooks/limit_rate.hpp +++ b/include/villas/hooks/limit_rate.hpp @@ -31,7 +31,7 @@ namespace villas { namespace node { -class LimitRateHook : public Hook { +class LimitRateHook : public LimitHook { protected: enum { @@ -44,18 +44,18 @@ protected: timespec last; public: - void setRate(double rate) - { - deadtime = 1.0 / rate; - } - LimitRateHook(struct path *p, struct node *n, int fl, int prio, bool en = true) : - Hook(p, n, fl, prio, en), + LimitHook(p, n, fl, prio, en), mode(LIMIT_RATE_LOCAL) { last = (timespec) { 0, 0 }; } + virtual void setRate(double rate, double maxRate = -1) + { + deadtime = 1.0 / rate; + } + virtual void parse(json_t *cfg); virtual int process(sample *smp); From 986fd5f81af0a4d80e26b31546686abacfef9118 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sun, 14 Apr 2019 19:22:33 +0200 Subject: [PATCH 2/4] rtp: use new villas::node::LimitHook::setRate() interface --- etc/examples/nodes/rtp.conf | 19 +++- include/villas/nodes/rtp.hpp | 37 ++++--- lib/nodes/rtp.cpp | 185 ++++++++++++++--------------------- 3 files changed, 104 insertions(+), 137 deletions(-) diff --git a/etc/examples/nodes/rtp.conf b/etc/examples/nodes/rtp.conf index bbb9c8856..4a1ebafe5 100644 --- a/etc/examples/nodes/rtp.conf +++ b/etc/examples/nodes/rtp.conf @@ -2,20 +2,31 @@ nodes = { rtp_node = { type = "rtp" - rate = 10000, + format = "raw.32.be" + + rtcp = false rtcp = { enabled = false, - - mode = "aimd" - throttle_mode = "decimate" + + hook_type = "decimate" } aimd = { a = 10, b = 0.5 + Kp = 1.0 + Ki = 0.0 + Kd = 0 + + rate_min = 100 + rate_init = 2000 + rate_source = 10000 + log = "aimd-rates-%Y_%m_%d_%s.log" + + hook_type = "limit_rate" } in = { diff --git a/include/villas/nodes/rtp.hpp b/include/villas/nodes/rtp.hpp index dcace9c5a..cc7ced139 100644 --- a/include/villas/nodes/rtp.hpp +++ b/include/villas/nodes/rtp.hpp @@ -45,6 +45,7 @@ extern "C" { #include #include #include +#include /* Forward declarations */ struct format_type; @@ -53,14 +54,10 @@ struct format_type; #define RTP_INITIAL_BUFFER_LEN 1500 #define RTP_PACKET_TYPE 21 -enum rtp_throttle_mode { - RTCP_THROTTLE_DISABLED, - RTCP_THROTTLE_HOOK_DECIMATE, - RTCP_THROTTLE_HOOK_LIMIT_RATE -}; - -enum rtp_rtcp_mode { - RTCP_MODE_AIMD +enum rtp_hook_type { + RTCP_HOOK_DISABLED, + RTCP_HOOK_DECIMATE, + RTCP_HOOK_LIMIT_RATE }; struct rtp { @@ -76,28 +73,28 @@ struct rtp { struct format_type *format; struct io io; - double rate; /**< Sample rate of source */ - struct { int enabled; int num_rrs; - - enum rtp_rtcp_mode mode; - - enum rtp_throttle_mode throttle_mode; - - union { - villas::node::DecimateHook *decimate; - villas::node::LimitRateHook *limit_rate; - } throttle_hook; } rtcp; struct { double a; double b; - double last_rate; + enum rtp_hook_type rate_hook_type; + + villas::node::LimitHook *rate_hook; + villas::dsp::PID rate_pid; + + /* PID parameters for rate controller */ + double Kp, Ki, Kd; + double rate_min; + + double rate; + double rate_last; + double rate_source; /**< Sample rate of source */ std::ofstream *log; char *log_filename; diff --git a/lib/nodes/rtp.cpp b/lib/nodes/rtp.cpp index 0ebceed33..1d3127b40 100644 --- a/lib/nodes/rtp.cpp +++ b/lib/nodes/rtp.cpp @@ -57,61 +57,31 @@ using namespace villas::node; static struct plugin p; -static int rtp_set_rate(struct node *n, double rate) -{ - struct rtp *r = (struct rtp *) n->_vd; - int ratio; - - switch (r->rtcp.throttle_mode) { - case RTCP_THROTTLE_HOOK_LIMIT_RATE: - r->rtcp.throttle_hook.limit_rate->setRate(rate); - break; - - case RTCP_THROTTLE_HOOK_DECIMATE: - ratio = r->rate / rate; - if (ratio == 0) - ratio = 1; - - r->rtcp.throttle_hook.decimate->setRatio(ratio); - break; - - case RTCP_THROTTLE_DISABLED: - return 0; - - default: - return -1; - } - - r->logger->debug("Set rate limiting for node {} to {}", node_name(n), rate); - - return 0; -} - static int rtp_aimd(struct node *n, double loss_frac) { struct rtp *r = (struct rtp *) n->_vd; - int ret; double rate; if (!r->rtcp.enabled) return -1; if (loss_frac < 0.01) - rate = r->aimd.last_rate + r->aimd.a; + rate = r->aimd.rate + r->aimd.a; else - rate = r->aimd.last_rate * r->aimd.b; + rate = r->aimd.rate * r->aimd.b; - r->aimd.last_rate = rate; + r->aimd.rate = r->aimd.rate_pid.calculate(rate, r->aimd.rate); - ret = rtp_set_rate(n, rate); - if (ret) - return ret; + if (r->aimd.rate_hook) { + r->aimd.rate_hook->setRate(r->aimd.rate); + r->logger->debug("AIMD: Set rate limit to: {}", r->aimd.rate); + } if (r->aimd.log) - *(r->aimd.log) << r->rtcp.num_rrs << "\t" << loss_frac << "\t" << rate << std::endl; + *(r->aimd.log) << r->rtcp.num_rrs << "\t" << loss_frac << "\t" << r->aimd.rate << std::endl; - r->logger->debug("AIMD: {}\t{}\t{}", r->rtcp.num_rrs, loss_frac, rate); + r->logger->debug("AIMD: {}\t{}\t{}", r->rtcp.num_rrs, loss_frac, r->aimd.rate); return 0; } @@ -123,16 +93,20 @@ int rtp_init(struct node *n) r->logger = villas::logging.get("node:rtp"); /* Default values */ - r->rate = 1; + r->aimd.rate = 1; r->aimd.a = 10; r->aimd.b = 0.5; - r->aimd.last_rate = 2000; + r->aimd.Kp = 1; + r->aimd.Ki = 0; + r->aimd.Kd = 0; + r->aimd.rate_min = 1; + r->aimd.rate_source = 2000; r->aimd.log_filename = nullptr; r->aimd.log = nullptr; r->rtcp.enabled = false; - r->rtcp.throttle_mode = RTCP_THROTTLE_DISABLED; + r->aimd.rate_hook_type = RTCP_HOOK_DISABLED; return 0; } @@ -155,15 +129,15 @@ int rtp_parse(struct node *n, json_t *cfg) const char *local, *remote; const char *format = "villas.binary"; const char *log = nullptr; + const char *hook_type = nullptr; uint16_t port; json_error_t err; - json_t *json_rtcp = nullptr, *json_aimd = nullptr; + json_t *json_aimd = nullptr; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: F, s?: o, s?: o, s: { s: s }, s: { s: s } }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: b, s?: o, s: { s: s }, s: { s: s } }", "format", &format, - "rate", &r->rate, - "rtcp", &json_rtcp, + "rtcp", &r->rtcp.enabled, "aimd", &json_aimd, "out", "address", &remote, @@ -175,52 +149,39 @@ int rtp_parse(struct node *n, json_t *cfg) /* AIMD */ if (json_aimd) { - ret = json_unpack_ex(json_aimd, &err, 0, "{ s?: F, s?: F, s?: F, s?: s }", + ret = json_unpack_ex(json_aimd, &err, 0, "{ s?: F, s?: F, s?: F, s?: F, s?: F, s?: F, s?: F, s?: F, s?: s, s?: s }", "a", &r->aimd.a, "b", &r->aimd.b, - "start_rate", &r->aimd.last_rate, - "log", &log + "Kp", &r->aimd.Kp, + "Ki", &r->aimd.Ki, + "Kd", &r->aimd.Kd, + "rate_min", &r->aimd.rate_min, + "rate_source", &r->aimd.rate_source, + "rate_init", &r->aimd.rate, + "log", &log, + "hook_type", &hook_type ); if (ret) r->logger->error("Failed to parse configuration of node {}", node_name(n)); + + /* AIMD Hook type */ + if (!r->rtcp.enabled) + r->aimd.rate_hook_type = RTCP_HOOK_DISABLED; + else if (hook_type) { + if (!strcmp(hook_type, "decimate")) + r->aimd.rate_hook_type = RTCP_HOOK_DECIMATE; + else if (!strcmp(hook_type, "limit_rate")) + r->aimd.rate_hook_type = RTCP_HOOK_LIMIT_RATE; + else if (!strcmp(hook_type, "disabled")) + r->aimd.rate_hook_type = RTCP_HOOK_DISABLED; + else + r->logger->error("Unknown RTCP hook_type: {}", hook_type); + } } if (log) r->aimd.log_filename = strdup(log); - /* RTCP */ - if (json_rtcp) { - const char *mode = "aimd"; - const char *throttle_mode = "decimate"; - - /* Enable if RTCP section is available */ - r->rtcp.enabled = 1; - - ret = json_unpack_ex(json_rtcp, &err, 0, "{ s?: b, s?: s, s?: s }", - "enabled", &r->rtcp.enabled, - "mode", &mode, - "throttle_mode", &throttle_mode - ); - if (ret) - r->logger->error("Failed to parse configuration of node {}", node_name(n)); - - /* RTCP Mode */ - if (!strcmp(mode, "aimd")) - r->rtcp.mode = RTCP_MODE_AIMD; - else - r->logger->error("Unknown RTCP mode: {}", mode); - - /* RTCP Throttle mode */ - if (r->rtcp.enabled == false) - r->rtcp.throttle_mode = RTCP_THROTTLE_DISABLED; - else if (!strcmp(throttle_mode, "decimate")) - r->rtcp.throttle_mode = RTCP_THROTTLE_HOOK_DECIMATE; - else if (!strcmp(throttle_mode, "limit_rate")) - r->rtcp.throttle_mode = RTCP_THROTTLE_HOOK_LIMIT_RATE; - else - r->logger->error("Unknown RTCP throttle mode: {}", throttle_mode); - } - /* Format */ r->format = format_type_lookup(format); if (!r->format) @@ -271,34 +232,27 @@ char * rtp_print(struct node *n) r->rtcp.enabled ? "yes" : "no"); if (r->rtcp.enabled) { - const char *mode, *throttle_mode; + const char *hook_type; - switch (r->rtcp.mode) { - case RTCP_MODE_AIMD: - mode = "aimd"; - } - - switch (r->rtcp.throttle_mode) { - case RTCP_THROTTLE_HOOK_DECIMATE: - throttle_mode = "decimate"; + switch (r->aimd.rate_hook_type) { + case RTCP_HOOK_DECIMATE: + hook_type = "decimate"; break; - case RTCP_THROTTLE_HOOK_LIMIT_RATE: - throttle_mode = "limit_rate"; + case RTCP_HOOK_LIMIT_RATE: + hook_type = "limit_rate"; break; - case RTCP_THROTTLE_DISABLED: - throttle_mode = "disabled"; + case RTCP_HOOK_DISABLED: + hook_type = "disabled"; break; default: - throttle_mode = "unknown"; + hook_type = "unknown"; } - strcatf(&buf, ", rtcp.mode=%s, rtcp.throttle_mode=%s", mode, throttle_mode); - - if (r->rtcp.mode == RTCP_MODE_AIMD) - strcatf(&buf, ", aimd.a=%f, aimd.b=%f, aimd.start_rate=%f", r->aimd.a, r->aimd.b, r->aimd.last_rate); + strcatf(&buf, ", aimd.hook_type=%s", hook_type); + strcatf(&buf, ", aimd.a=%f, aimd.b=%f, aimd.start_rate=%f", r->aimd.a, r->aimd.b, r->aimd.rate); } free(local); @@ -386,32 +340,35 @@ int rtp_start(struct node *n) if (ret) return ret; - /* Initialize throttle hook */ - if (r->rtcp.throttle_mode != RTCP_THROTTLE_DISABLED) { + /* Initialize AIMD hook */ + if (r->aimd.rate_hook_type != RTCP_HOOK_DISABLED) { #ifdef WITH_HOOKS - switch (r->rtcp.throttle_mode) { - case RTCP_THROTTLE_HOOK_DECIMATE: - r->rtcp.throttle_hook.decimate = new DecimateHook(nullptr, n, 0, 0); + switch (r->aimd.rate_hook_type) { + case RTCP_HOOK_DECIMATE: + r->aimd.rate_hook = new DecimateHook(nullptr, n, 0, 0); break; - case RTCP_THROTTLE_HOOK_LIMIT_RATE: - r->rtcp.throttle_hook.limit_rate = new LimitRateHook(nullptr, n, 0, 0); + case RTCP_HOOK_LIMIT_RATE: + r->aimd.rate_hook = new LimitRateHook(nullptr, n, 0, 0); break; default: return -1; } - vlist_push(&n->out.hooks, (void *) r->rtcp.throttle_hook.limit_rate); + vlist_push(&n->out.hooks, (void *) r->aimd.rate_hook); + + r->aimd.rate_hook->setRate(r->aimd.rate_last); #else - r->logger->error("Throttle hooks not supported"); + r->logger->error("Rate limiting is not supported"); + return -1; #endif } - ret = rtp_set_rate(n, r->aimd.last_rate); - if (ret) - return ret; + double dt = 5.0; // TODO + + r->aimd.rate_pid = villas::dsp::PID(dt, r->aimd.rate_source, r->aimd.rate_min, r->aimd.Kp, r->aimd.Ki, r->aimd.Kd); /* Initialize RTP socket */ uint16_t port = sa_port(&r->in.saddr_rtp) & ~1; @@ -423,7 +380,7 @@ int rtp_start(struct node *n) rtcp_start(r->rs, node_name(n), &r->out.saddr_rtcp); - if (r->rtcp.mode == RTCP_MODE_AIMD) { + if (r->aimd.log_filename) { char fn[128]; time_t ts = time(nullptr); @@ -437,6 +394,8 @@ int rtp_start(struct node *n) *(r->aimd.log) << "# cnt\tfrac_loss\trate" << std::endl; } + else + r->aimd.log = nullptr; } return ret; From 4cea9559c9d62944dced6cbeeb34bb0fc0e9c367 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 15 Apr 2019 10:02:32 +0200 Subject: [PATCH 3/4] update VILLAScommon submodule --- common | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common b/common index af39c2b30..401dc3706 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit af39c2b305999f962d2bdb1b743b9de063acddc4 +Subproject commit 401dc3706b62c497bf11c781426aa0e6374fba18 From 2dc2d8383d17d1e83ad0fcd1c4b5ddc274f92660 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 15 Apr 2019 10:03:13 +0200 Subject: [PATCH 4/4] dp: update to use new C++ window impl --- lib/hooks/dp.cpp | 33 ++++++++------------------------- 1 file changed, 8 insertions(+), 25 deletions(-) diff --git a/lib/hooks/dp.cpp b/lib/hooks/dp.cpp index 32ae3f641..b5c5f9c6d 100644 --- a/lib/hooks/dp.cpp +++ b/lib/hooks/dp.cpp @@ -31,7 +31,7 @@ #include #include -#include +#include #include using namespace std::complex_literals; @@ -57,27 +57,27 @@ protected: int *fharmonics; int fharmonics_len; - struct window window; + dsp::Window window; void step(double *in, std::complex *out) { - int N = window.steps; + int N = window.getSteps(); __attribute__((unused)) std::complex om_k, corr; double newest = *in; - __attribute__((unused)) double oldest = window_update(&window, newest); + __attribute__((unused)) double oldest = window.update(newest); for (int k = 0; k < fharmonics_len; k++) { om_k = 2.0i * M_PI * (double) fharmonics[k] / (double) N; /* Correction for stationary phasor */ - corr = std::exp(-om_k * (steps - (window.steps + 1))); + corr = std::exp(-om_k * (steps - (N + 1))); //corr = 1; #if 0 /* Recursive update */ coeffs[k] = std::exp(om) * (coeffs[k] + (newest - oldest)); - out[k] = (2.0 / window.steps) * (coeffs[i] * corr); + out[k] = (2.0 / N) * (coeffs[i] * corr); /* DC component */ if (fharmonics[k] == 0) @@ -87,7 +87,7 @@ protected: std::complex X_k = 0; for (int n = 0; n < N; n++) { - double x_n = window.data[(window.pos - window.steps + n) & window.mask]; + double x_n = window[window.getPos() + n]; X_k += x_n * std::exp(om_k * (double) n); } @@ -135,8 +135,6 @@ public: virtual void start() { - int ret; - assert(state == STATE_PREPARED); time = 0; @@ -145,26 +143,11 @@ public: for (int i = 0; i < fharmonics_len; i++) coeffs[i] = 0; - ret = window_init(&window, (1.0 / f0) / timestep, 0.0); - if (ret) - throw RuntimeError("Failed to initialize window"); + window = dsp::Window((1.0 / f0) / timestep, 0.0); state = STATE_STARTED; } - virtual void stop() - { - int ret; - - assert(state == STATE_STARTED); - - ret = window_destroy(&window); - if (ret) - throw RuntimeError("Failed to destroy window"); - - state = STATE_STOPPED; - } - virtual void parse(json_t *cfg) { int ret;