1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

rtp: added hooks for rate throttling via RTCP

This commit is contained in:
Steffen Vogel 2019-01-21 12:12:47 +01:00
parent 2d94f15d3c
commit 49fe632bbc
3 changed files with 156 additions and 13 deletions

View file

@ -2,6 +2,13 @@ nodes = {
rtp_node = {
type = "rtp"
rtcp = {
enabled = true,
mode = "aimd"
throttle_mode = "decimate"
}
in = {
address = "0.0.0.0:12000",
signals = {

View file

@ -59,7 +59,26 @@ struct rtp {
struct format_type *format;
struct io io;
bool enable_rtcp;
double rate; /**< Sample rate of source */
struct {
int enabled;
enum {
RTCP_MODE_AIMD,
} mode;
enum {
RTCP_THROTTLE_HOOK_DECIMATE,
RTCP_THROTTLE_HOOK_LIMIT_RATE
} throttle_mode;
struct hook *throttle_hook;
} rtcp;
struct {
} aimd; /** AIMD state */
struct queue_signalled recv_queue;
};

View file

@ -36,12 +36,51 @@
#include <villas/plugin.h>
#include <villas/nodes/socket.h>
#include <villas/hooks/limit_rate.h>
#include <villas/hooks/decimate.h>
#include <villas/nodes/rtp.h>
#include <villas/utils.h>
#include <villas/hook.h>
#include <villas/format_type.h>
pthread_t re_pthread;
static int rtp_set_rate(struct node *n, double rate)
{
struct rtp *r = (struct rtp *) n->_vd;
switch (r->rtcp.throttle_mode) {
case RTCP_THROTTLE_HOOK_LIMIT_RATE:
limit_rate_set_rate(r->rtcp.throttle_hook, rate);
break;
case RTCP_THROTTLE_HOOK_DECIMATE:
decimate_set_ratio(r->rtcp.throttle_hook, r->rate / rate);
break;
default:
return -1;
}
return 0;
}
static int rtp_aimd(struct node *n, double loss_frac)
{
int ret;
double rate;
/** @todo: Implement AIMD */
rate = 1;
ret = rtp_set_rate(n, rate);
if (ret)
return ret;
return 0;
}
int rtp_reverse(struct node *n)
{
struct rtp *r = (struct rtp *) n->_vd;
@ -65,14 +104,15 @@ int rtp_parse(struct node *n, json_t *cfg)
const char *local, *remote;
const char *format = "villas.binary";
bool enable_rtcp = false;
uint16_t port;
json_error_t err;
json_t *json_rtcp = NULL;
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: b, s: { s: s }, s: { s: s } }",
ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: o, s: { s: s }, s: { s: s } }",
"format", &format,
"enable_rtcp", &enable_rtcp,
"rate", &r->rate,
"rtcp", &json_rtcp,
"out",
"address", &remote,
"in",
@ -81,16 +121,37 @@ int rtp_parse(struct node *n, json_t *cfg)
if (ret)
jerror(&err, "Failed to parse configuration of node %s", node_name(n));
/* RTCP */
if (json_rtcp) {
const char *mode = "aimd";
const char *throttle_mode = "decimate";
ret = json_unpack_ex(json_rtcp, &err, 0, "{ s?: b, s?: s }",
"enabled", &r->rtcp.enabled,
"mode", &mode,
"throttle_mode", &throttle_mode
);
/* RTCP Mode */
if (!strcmp(mode, "aimd"))
r->rtcp.mode = RTCP_MODE_AIMD;
else
error("Unknown RTCP mode: %s", mode);
/* RTCP Throttle mode */
if (!strcmp(throttle_mode, "decimate"))
r->rtcp.throttle_mode = RTCP_THROTTLE_HOOK_DECIMATE;
else if (!strcmp(throttle_mode, "limit_rate"))
r->rtcp.throttle_mode = RTCP_THROTTLE_HOOK_LIMIT_RATE;
else
error("Unknown RTCP throttle mode: %s", throttle_mode);
}
/* Format */
r->format = format_type_lookup(format);
if(!r->format)
error("Invalid format '%s' for node %s", format, node_name(n));
/* Enable RTCP */
r->enable_rtcp = enable_rtcp;
if(enable_rtcp)
warning("RTCP is not implemented yet");
/* Remote address */
ret = sa_decode(&r->remote_rtp, remote, strlen(remote));
if (ret) {
@ -130,7 +191,29 @@ char * rtp_print(struct node *n)
char *local = socket_print_addr((struct sockaddr *) &r->local_rtp.u);
char *remote = socket_print_addr((struct sockaddr *) &r->remote_rtp.u);
buf = strf("format=%s, in.address=%s, out.address=%s", format_type_name(r->format), local, remote);
buf = strf("format=%s, in.address=%s, out.address=%s, rtcp.enabled=%s",
format_type_name(r->format),
local, remote,
r->rtcp.enabled ? "yes" : "no");
if (r->rtcp.enabled) {
const char *mode, *throttle_mode;
switch (r->rtcp.mode) {
case RTCP_MODE_AIMD:
mode = "aimd";
}
switch (r->rtcp.throttle_mode) {
case RTCP_THROTTLE_HOOK_DECIMATE:
throttle_mode = "decimate";
case RTCP_THROTTLE_HOOK_LIMIT_RATE:
throttle_mode = "limit_rate";
}
strcatf(&buf, "rtcp.mode=%s, rtcp.throttle_mode=%s", mode, throttle_mode);
}
free(local);
free(remote);
@ -140,7 +223,8 @@ char * rtp_print(struct node *n)
static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, struct mbuf *mb, void *arg)
{
struct rtp *r = (struct rtp *) arg;
struct node *n = (struct node *) arg;
struct rtp *r = (struct rtp *) n->_vd;
if (queue_signalled_push(&r->recv_queue, (void *) mbuf_alloc_ref(mb)) != 1)
warning("Failed to push to queue");
@ -152,10 +236,17 @@ static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, stru
static void rtcp_handler(const struct sa *src, struct rtcp_msg *msg, void *arg)
{
struct node *n = (struct node *) arg;
//struct rtp *r = (struct rtp *) n->_vd;
(void)src;
(void)arg;
printf("rtcp: recv %s\n", rtcp_type_name(msg->hdr.pt));
/** @todo: parse receive report */
double loss_frac = 0;
rtp_aimd(n, loss_frac);
}
int rtp_start(struct node *n)
@ -177,9 +268,35 @@ int rtp_start(struct node *n)
if (ret)
return ret;
/* Initialize throttle hook */
struct hook_type *throttle_hook_type;
r->rtcp.throttle_hook = alloc(sizeof(struct hook));
if (!r->rtcp.throttle_hook)
return -1;
switch (r->rtcp.throttle_mode) {
case RTCP_THROTTLE_HOOK_DECIMATE:
throttle_hook_type = hook_type_lookup("decimate");
break;
case RTCP_THROTTLE_HOOK_LIMIT_RATE:
throttle_hook_type = hook_type_lookup("limit_rate");
break;
}
if (!throttle_hook_type)
return -1;
ret = hook_init(r->rtcp.throttle_hook, throttle_hook_type, NULL, n);
if (ret)
return ret;
vlist_push(&n->out.hooks, r->rtcp.throttle_hook);
/* Initialize RTP socket */
uint16_t port = sa_port(&r->local_rtp) & ~1;
ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local_rtp, port, port+1, r->enable_rtcp, rtp_handler, rtcp_handler, n->_vd);
ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local_rtp, port, port+1, r->rtcp.enabled, rtp_handler, rtcp_handler, n);
/* Start RTCP session */
rtcp_start(r->rs, node_name(n), &r->remote_rtcp);