From 49fe632bbcacf809c815aa2d1c21e34ed4350a54 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 21 Jan 2019 12:12:47 +0100 Subject: [PATCH] rtp: added hooks for rate throttling via RTCP --- etc/rtp.conf | 7 ++ include/villas/nodes/rtp.h | 21 +++++- lib/nodes/rtp.c | 141 +++++++++++++++++++++++++++++++++---- 3 files changed, 156 insertions(+), 13 deletions(-) diff --git a/etc/rtp.conf b/etc/rtp.conf index e8d3ccb63..e259d7f31 100644 --- a/etc/rtp.conf +++ b/etc/rtp.conf @@ -2,6 +2,13 @@ nodes = { rtp_node = { type = "rtp" + rtcp = { + enabled = true, + + mode = "aimd" + throttle_mode = "decimate" + } + in = { address = "0.0.0.0:12000", signals = { diff --git a/include/villas/nodes/rtp.h b/include/villas/nodes/rtp.h index 54d005a4e..f7f634a5f 100644 --- a/include/villas/nodes/rtp.h +++ b/include/villas/nodes/rtp.h @@ -59,7 +59,26 @@ struct rtp { struct format_type *format; struct io io; - bool enable_rtcp; + double rate; /**< Sample rate of source */ + + struct { + int enabled; + + enum { + RTCP_MODE_AIMD, + } mode; + + enum { + RTCP_THROTTLE_HOOK_DECIMATE, + RTCP_THROTTLE_HOOK_LIMIT_RATE + } throttle_mode; + + struct hook *throttle_hook; + } rtcp; + + struct { + + } aimd; /** AIMD state */ struct queue_signalled recv_queue; }; diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index b94ed3d28..964be3b05 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -36,12 +36,51 @@ #include #include +#include +#include #include #include +#include #include pthread_t re_pthread; +static int rtp_set_rate(struct node *n, double rate) +{ + struct rtp *r = (struct rtp *) n->_vd; + + switch (r->rtcp.throttle_mode) { + case RTCP_THROTTLE_HOOK_LIMIT_RATE: + limit_rate_set_rate(r->rtcp.throttle_hook, rate); + + break; + + case RTCP_THROTTLE_HOOK_DECIMATE: + decimate_set_ratio(r->rtcp.throttle_hook, r->rate / rate); + break; + + default: + return -1; + } + + return 0; +} + +static int rtp_aimd(struct node *n, double loss_frac) +{ + int ret; + double rate; + + /** @todo: Implement AIMD */ + rate = 1; + + ret = rtp_set_rate(n, rate); + if (ret) + return ret; + + return 0; +} + int rtp_reverse(struct node *n) { struct rtp *r = (struct rtp *) n->_vd; @@ -65,14 +104,15 @@ int rtp_parse(struct node *n, json_t *cfg) const char *local, *remote; const char *format = "villas.binary"; - bool enable_rtcp = false; uint16_t port; json_error_t err; + json_t *json_rtcp = NULL; - ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: b, s: { s: s }, s: { s: s } }", + ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: o, s: { s: s }, s: { s: s } }", "format", &format, - "enable_rtcp", &enable_rtcp, + "rate", &r->rate, + "rtcp", &json_rtcp, "out", "address", &remote, "in", @@ -81,16 +121,37 @@ int rtp_parse(struct node *n, json_t *cfg) if (ret) jerror(&err, "Failed to parse configuration of node %s", node_name(n)); + /* RTCP */ + if (json_rtcp) { + const char *mode = "aimd"; + const char *throttle_mode = "decimate"; + + ret = json_unpack_ex(json_rtcp, &err, 0, "{ s?: b, s?: s }", + "enabled", &r->rtcp.enabled, + "mode", &mode, + "throttle_mode", &throttle_mode + ); + + /* RTCP Mode */ + if (!strcmp(mode, "aimd")) + r->rtcp.mode = RTCP_MODE_AIMD; + else + error("Unknown RTCP mode: %s", mode); + + /* RTCP Throttle mode */ + if (!strcmp(throttle_mode, "decimate")) + r->rtcp.throttle_mode = RTCP_THROTTLE_HOOK_DECIMATE; + else if (!strcmp(throttle_mode, "limit_rate")) + r->rtcp.throttle_mode = RTCP_THROTTLE_HOOK_LIMIT_RATE; + else + error("Unknown RTCP throttle mode: %s", throttle_mode); + } + /* Format */ r->format = format_type_lookup(format); if(!r->format) error("Invalid format '%s' for node %s", format, node_name(n)); - /* Enable RTCP */ - r->enable_rtcp = enable_rtcp; - if(enable_rtcp) - warning("RTCP is not implemented yet"); - /* Remote address */ ret = sa_decode(&r->remote_rtp, remote, strlen(remote)); if (ret) { @@ -130,7 +191,29 @@ char * rtp_print(struct node *n) char *local = socket_print_addr((struct sockaddr *) &r->local_rtp.u); char *remote = socket_print_addr((struct sockaddr *) &r->remote_rtp.u); - buf = strf("format=%s, in.address=%s, out.address=%s", format_type_name(r->format), local, remote); + buf = strf("format=%s, in.address=%s, out.address=%s, rtcp.enabled=%s", + format_type_name(r->format), + local, remote, + r->rtcp.enabled ? "yes" : "no"); + + if (r->rtcp.enabled) { + const char *mode, *throttle_mode; + + switch (r->rtcp.mode) { + case RTCP_MODE_AIMD: + mode = "aimd"; + } + + switch (r->rtcp.throttle_mode) { + case RTCP_THROTTLE_HOOK_DECIMATE: + throttle_mode = "decimate"; + + case RTCP_THROTTLE_HOOK_LIMIT_RATE: + throttle_mode = "limit_rate"; + } + + strcatf(&buf, "rtcp.mode=%s, rtcp.throttle_mode=%s", mode, throttle_mode); + } free(local); free(remote); @@ -140,7 +223,8 @@ char * rtp_print(struct node *n) static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, struct mbuf *mb, void *arg) { - struct rtp *r = (struct rtp *) arg; + struct node *n = (struct node *) arg; + struct rtp *r = (struct rtp *) n->_vd; if (queue_signalled_push(&r->recv_queue, (void *) mbuf_alloc_ref(mb)) != 1) warning("Failed to push to queue"); @@ -152,10 +236,17 @@ static void rtp_handler(const struct sa *src, const struct rtp_header *hdr, stru static void rtcp_handler(const struct sa *src, struct rtcp_msg *msg, void *arg) { + struct node *n = (struct node *) arg; + //struct rtp *r = (struct rtp *) n->_vd; + (void)src; - (void)arg; printf("rtcp: recv %s\n", rtcp_type_name(msg->hdr.pt)); + + /** @todo: parse receive report */ + double loss_frac = 0; + + rtp_aimd(n, loss_frac); } int rtp_start(struct node *n) @@ -177,9 +268,35 @@ int rtp_start(struct node *n) if (ret) return ret; + /* Initialize throttle hook */ + struct hook_type *throttle_hook_type; + + r->rtcp.throttle_hook = alloc(sizeof(struct hook)); + if (!r->rtcp.throttle_hook) + return -1; + + switch (r->rtcp.throttle_mode) { + case RTCP_THROTTLE_HOOK_DECIMATE: + throttle_hook_type = hook_type_lookup("decimate"); + break; + + case RTCP_THROTTLE_HOOK_LIMIT_RATE: + throttle_hook_type = hook_type_lookup("limit_rate"); + break; + } + + if (!throttle_hook_type) + return -1; + + ret = hook_init(r->rtcp.throttle_hook, throttle_hook_type, NULL, n); + if (ret) + return ret; + + vlist_push(&n->out.hooks, r->rtcp.throttle_hook); + /* Initialize RTP socket */ uint16_t port = sa_port(&r->local_rtp) & ~1; - ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local_rtp, port, port+1, r->enable_rtcp, rtp_handler, rtcp_handler, n->_vd); + ret = rtp_listen(&r->rs, IPPROTO_UDP, &r->local_rtp, port, port+1, r->rtcp.enabled, rtp_handler, rtcp_handler, n); /* Start RTCP session */ rtcp_start(r->rs, node_name(n), &r->remote_rtcp);