1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

rtp: use new villas::node::LimitHook::setRate() interface

This commit is contained in:
Steffen Vogel 2019-04-14 19:22:33 +02:00
parent b4150d2053
commit 986fd5f81a
3 changed files with 104 additions and 137 deletions

View file

@ -2,20 +2,31 @@ nodes = {
rtp_node = {
type = "rtp"
rate = 10000,
format = "raw.32.be"
rtcp = false
rtcp = {
enabled = false,
mode = "aimd"
throttle_mode = "decimate"
hook_type = "decimate"
}
aimd = {
a = 10,
b = 0.5
Kp = 1.0
Ki = 0.0
Kd = 0
rate_min = 100
rate_init = 2000
rate_source = 10000
log = "aimd-rates-%Y_%m_%d_%s.log"
hook_type = "limit_rate"
}
in = {

View file

@ -45,6 +45,7 @@ extern "C" {
#include <villas/queue_signalled.h>
#include <villas/hooks/limit_rate.hpp>
#include <villas/hooks/decimate.hpp>
#include <villas/dsp/pid.hpp>
/* Forward declarations */
struct format_type;
@ -53,14 +54,10 @@ struct format_type;
#define RTP_INITIAL_BUFFER_LEN 1500
#define RTP_PACKET_TYPE 21
enum rtp_throttle_mode {
RTCP_THROTTLE_DISABLED,
RTCP_THROTTLE_HOOK_DECIMATE,
RTCP_THROTTLE_HOOK_LIMIT_RATE
};
enum rtp_rtcp_mode {
RTCP_MODE_AIMD
enum rtp_hook_type {
RTCP_HOOK_DISABLED,
RTCP_HOOK_DECIMATE,
RTCP_HOOK_LIMIT_RATE
};
struct rtp {
@ -76,28 +73,28 @@ struct rtp {
struct format_type *format;
struct io io;
double rate; /**< Sample rate of source */
struct {
int enabled;
int num_rrs;
enum rtp_rtcp_mode mode;
enum rtp_throttle_mode throttle_mode;
union {
villas::node::DecimateHook *decimate;
villas::node::LimitRateHook *limit_rate;
} throttle_hook;
} rtcp;
struct {
double a;
double b;
double last_rate;
enum rtp_hook_type rate_hook_type;
villas::node::LimitHook *rate_hook;
villas::dsp::PID rate_pid;
/* PID parameters for rate controller */
double Kp, Ki, Kd;
double rate_min;
double rate;
double rate_last;
double rate_source; /**< Sample rate of source */
std::ofstream *log;
char *log_filename;

View file

@ -57,61 +57,31 @@ using namespace villas::node;
static struct plugin p;
static int rtp_set_rate(struct node *n, double rate)
{
struct rtp *r = (struct rtp *) n->_vd;
int ratio;
switch (r->rtcp.throttle_mode) {
case RTCP_THROTTLE_HOOK_LIMIT_RATE:
r->rtcp.throttle_hook.limit_rate->setRate(rate);
break;
case RTCP_THROTTLE_HOOK_DECIMATE:
ratio = r->rate / rate;
if (ratio == 0)
ratio = 1;
r->rtcp.throttle_hook.decimate->setRatio(ratio);
break;
case RTCP_THROTTLE_DISABLED:
return 0;
default:
return -1;
}
r->logger->debug("Set rate limiting for node {} to {}", node_name(n), rate);
return 0;
}
static int rtp_aimd(struct node *n, double loss_frac)
{
struct rtp *r = (struct rtp *) n->_vd;
int ret;
double rate;
if (!r->rtcp.enabled)
return -1;
if (loss_frac < 0.01)
rate = r->aimd.last_rate + r->aimd.a;
rate = r->aimd.rate + r->aimd.a;
else
rate = r->aimd.last_rate * r->aimd.b;
rate = r->aimd.rate * r->aimd.b;
r->aimd.last_rate = rate;
r->aimd.rate = r->aimd.rate_pid.calculate(rate, r->aimd.rate);
ret = rtp_set_rate(n, rate);
if (ret)
return ret;
if (r->aimd.rate_hook) {
r->aimd.rate_hook->setRate(r->aimd.rate);
r->logger->debug("AIMD: Set rate limit to: {}", r->aimd.rate);
}
if (r->aimd.log)
*(r->aimd.log) << r->rtcp.num_rrs << "\t" << loss_frac << "\t" << rate << std::endl;
*(r->aimd.log) << r->rtcp.num_rrs << "\t" << loss_frac << "\t" << r->aimd.rate << std::endl;
r->logger->debug("AIMD: {}\t{}\t{}", r->rtcp.num_rrs, loss_frac, rate);
r->logger->debug("AIMD: {}\t{}\t{}", r->rtcp.num_rrs, loss_frac, r->aimd.rate);
return 0;
}
@ -123,16 +93,20 @@ int rtp_init(struct node *n)
r->logger = villas::logging.get("node:rtp");
/* Default values */
r->rate = 1;
r->aimd.rate = 1;
r->aimd.a = 10;
r->aimd.b = 0.5;
r->aimd.last_rate = 2000;
r->aimd.Kp = 1;
r->aimd.Ki = 0;
r->aimd.Kd = 0;
r->aimd.rate_min = 1;
r->aimd.rate_source = 2000;
r->aimd.log_filename = nullptr;
r->aimd.log = nullptr;
r->rtcp.enabled = false;
r->rtcp.throttle_mode = RTCP_THROTTLE_DISABLED;
r->aimd.rate_hook_type = RTCP_HOOK_DISABLED;
return 0;
}
@ -155,15 +129,15 @@ int rtp_parse(struct node *n, json_t *cfg)
const char *local, *remote;
const char *format = "villas.binary";
const char *log = nullptr;
const char *hook_type = nullptr;
uint16_t port;
json_error_t err;
json_t *json_rtcp = nullptr, *json_aimd = nullptr;
json_t *json_aimd = nullptr;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: F, s?: o, s?: o, s: { s: s }, s: { s: s } }",
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: b, s?: o, s: { s: s }, s: { s: s } }",
"format", &format,
"rate", &r->rate,
"rtcp", &json_rtcp,
"rtcp", &r->rtcp.enabled,
"aimd", &json_aimd,
"out",
"address", &remote,
@ -175,52 +149,39 @@ int rtp_parse(struct node *n, json_t *cfg)
/* AIMD */
if (json_aimd) {
ret = json_unpack_ex(json_aimd, &err, 0, "{ s?: F, s?: F, s?: F, s?: s }",
ret = json_unpack_ex(json_aimd, &err, 0, "{ s?: F, s?: F, s?: F, s?: F, s?: F, s?: F, s?: F, s?: F, s?: s, s?: s }",
"a", &r->aimd.a,
"b", &r->aimd.b,
"start_rate", &r->aimd.last_rate,
"log", &log
"Kp", &r->aimd.Kp,
"Ki", &r->aimd.Ki,
"Kd", &r->aimd.Kd,
"rate_min", &r->aimd.rate_min,
"rate_source", &r->aimd.rate_source,
"rate_init", &r->aimd.rate,
"log", &log,
"hook_type", &hook_type
);
if (ret)
r->logger->error("Failed to parse configuration of node {}", node_name(n));
/* AIMD Hook type */
if (!r->rtcp.enabled)
r->aimd.rate_hook_type = RTCP_HOOK_DISABLED;
else if (hook_type) {
if (!strcmp(hook_type, "decimate"))
r->aimd.rate_hook_type = RTCP_HOOK_DECIMATE;
else if (!strcmp(hook_type, "limit_rate"))
r->aimd.rate_hook_type = RTCP_HOOK_LIMIT_RATE;
else if (!strcmp(hook_type, "disabled"))
r->aimd.rate_hook_type = RTCP_HOOK_DISABLED;
else
r->logger->error("Unknown RTCP hook_type: {}", hook_type);
}
}
if (log)
r->aimd.log_filename = strdup(log);
/* RTCP */
if (json_rtcp) {
const char *mode = "aimd";
const char *throttle_mode = "decimate";
/* Enable if RTCP section is available */
r->rtcp.enabled = 1;
ret = json_unpack_ex(json_rtcp, &err, 0, "{ s?: b, s?: s, s?: s }",
"enabled", &r->rtcp.enabled,
"mode", &mode,
"throttle_mode", &throttle_mode
);
if (ret)
r->logger->error("Failed to parse configuration of node {}", node_name(n));
/* RTCP Mode */
if (!strcmp(mode, "aimd"))
r->rtcp.mode = RTCP_MODE_AIMD;
else
r->logger->error("Unknown RTCP mode: {}", mode);
/* RTCP Throttle mode */
if (r->rtcp.enabled == false)
r->rtcp.throttle_mode = RTCP_THROTTLE_DISABLED;
else if (!strcmp(throttle_mode, "decimate"))
r->rtcp.throttle_mode = RTCP_THROTTLE_HOOK_DECIMATE;
else if (!strcmp(throttle_mode, "limit_rate"))
r->rtcp.throttle_mode = RTCP_THROTTLE_HOOK_LIMIT_RATE;
else
r->logger->error("Unknown RTCP throttle mode: {}", throttle_mode);
}
/* Format */
r->format = format_type_lookup(format);
if (!r->format)
@ -271,34 +232,27 @@ char * rtp_print(struct node *n)
r->rtcp.enabled ? "yes" : "no");
if (r->rtcp.enabled) {
const char *mode, *throttle_mode;
const char *hook_type;
switch (r->rtcp.mode) {
case RTCP_MODE_AIMD:
mode = "aimd";
}
switch (r->rtcp.throttle_mode) {
case RTCP_THROTTLE_HOOK_DECIMATE:
throttle_mode = "decimate";
switch (r->aimd.rate_hook_type) {
case RTCP_HOOK_DECIMATE:
hook_type = "decimate";
break;
case RTCP_THROTTLE_HOOK_LIMIT_RATE:
throttle_mode = "limit_rate";
case RTCP_HOOK_LIMIT_RATE:
hook_type = "limit_rate";
break;
case RTCP_THROTTLE_DISABLED:
throttle_mode = "disabled";
case RTCP_HOOK_DISABLED:
hook_type = "disabled";
break;
default:
throttle_mode = "unknown";
hook_type = "unknown";
}
strcatf(&buf, ", rtcp.mode=%s, rtcp.throttle_mode=%s", mode, throttle_mode);
if (r->rtcp.mode == RTCP_MODE_AIMD)
strcatf(&buf, ", aimd.a=%f, aimd.b=%f, aimd.start_rate=%f", r->aimd.a, r->aimd.b, r->aimd.last_rate);
strcatf(&buf, ", aimd.hook_type=%s", hook_type);
strcatf(&buf, ", aimd.a=%f, aimd.b=%f, aimd.start_rate=%f", r->aimd.a, r->aimd.b, r->aimd.rate);
}
free(local);
@ -386,32 +340,35 @@ int rtp_start(struct node *n)
if (ret)
return ret;
/* Initialize throttle hook */
if (r->rtcp.throttle_mode != RTCP_THROTTLE_DISABLED) {
/* Initialize AIMD hook */
if (r->aimd.rate_hook_type != RTCP_HOOK_DISABLED) {
#ifdef WITH_HOOKS
switch (r->rtcp.throttle_mode) {
case RTCP_THROTTLE_HOOK_DECIMATE:
r->rtcp.throttle_hook.decimate = new DecimateHook(nullptr, n, 0, 0);
switch (r->aimd.rate_hook_type) {
case RTCP_HOOK_DECIMATE:
r->aimd.rate_hook = new DecimateHook(nullptr, n, 0, 0);
break;
case RTCP_THROTTLE_HOOK_LIMIT_RATE:
r->rtcp.throttle_hook.limit_rate = new LimitRateHook(nullptr, n, 0, 0);
case RTCP_HOOK_LIMIT_RATE:
r->aimd.rate_hook = new LimitRateHook(nullptr, n, 0, 0);
break;
default:
return -1;
}
vlist_push(&n->out.hooks, (void *) r->rtcp.throttle_hook.limit_rate);
vlist_push(&n->out.hooks, (void *) r->aimd.rate_hook);
r->aimd.rate_hook->setRate(r->aimd.rate_last);
#else
r->logger->error("Throttle hooks not supported");
r->logger->error("Rate limiting is not supported");
return -1;
#endif
}
ret = rtp_set_rate(n, r->aimd.last_rate);
if (ret)
return ret;
double dt = 5.0; // TODO
r->aimd.rate_pid = villas::dsp::PID(dt, r->aimd.rate_source, r->aimd.rate_min, r->aimd.Kp, r->aimd.Ki, r->aimd.Kd);
/* Initialize RTP socket */
uint16_t port = sa_port(&r->in.saddr_rtp) & ~1;
@ -423,7 +380,7 @@ int rtp_start(struct node *n)
rtcp_start(r->rs, node_name(n), &r->out.saddr_rtcp);
if (r->rtcp.mode == RTCP_MODE_AIMD) {
if (r->aimd.log_filename) {
char fn[128];
time_t ts = time(nullptr);
@ -437,6 +394,8 @@ int rtp_start(struct node *n)
*(r->aimd.log) << "# cnt\tfrac_loss\trate" << std::endl;
}
else
r->aimd.log = nullptr;
}
return ret;