diff --git a/include/villas/node.h b/include/villas/node.h index 53d77c106..c97bf7914 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -84,12 +84,14 @@ struct node { struct node_direction in, out; -#ifdef WITH_NETEM - int mark; /**< Socket mark for netem, routing and filtering */ +#ifdef __linux__ + int fwmark; /**< Socket mark for netem, routing and filtering */ +#ifdef WITH_NETEM struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */ struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */ #endif /* WITH_NETEM */ +#endif /* __linux__ */ struct node_type *_vt; /**< Virtual functions (C++ OOP style) */ void *_vd; /**< Virtual data (used by struct node::_vt functions) */ diff --git a/lib/kernel/if.c b/lib/kernel/if.c index b200c0742..d78eced93 100644 --- a/lib/kernel/if.c +++ b/lib/kernel/if.c @@ -73,23 +73,23 @@ int if_start(struct interface *i) //if_set_affinity(i, i->affinity); /* Assign fwmark's to nodes which have netem options */ - int ret, mark = 0; + int ret, fwmark = 0; for (size_t j = 0; j < vlist_length(&i->nodes); j++) { struct node *n = (struct node *) vlist_at(&i->nodes, j); - if (n->tc_qdisc) - n->mark = 1 + mark++; + if (n->tc_qdisc && n->fwmark < 0) + n->fwmark = 1 + fwmark++; } /* Abort if no node is using netem */ - if (mark == 0) + if (fwmark == 0) return 0; if (getuid() != 0) error("Network emulation requires super-user privileges!"); /* Replace root qdisc */ - ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, mark); + ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, fwmark); if (ret) error("Failed to setup priority queuing discipline: %s", nl_geterror(ret)); @@ -98,16 +98,16 @@ int if_start(struct interface *i) struct node *n = (struct node *) vlist_at(&i->nodes, j); if (n->tc_qdisc) { - ret = tc_mark(i, &n->tc_classifier, TC_HANDLE(1, n->mark), n->mark); + ret = tc_mark(i, &n->tc_classifier, TC_HANDLE(1, n->fwmark), n->fwmark); if (ret) error("Failed to setup FW mark classifier: %s", nl_geterror(ret)); char *buf = tc_netem_print(n->tc_qdisc); debug(LOG_IF | 5, "Starting network emulation on interface '%s' for FW mark %u: %s", - if_name(i), n->mark, buf); + if_name(i), n->fwmark, buf); free(buf); - ret = tc_netem(i, &n->tc_qdisc, TC_HANDLE(0x1000+n->mark, 0), TC_HANDLE(1, n->mark)); + ret = tc_netem(i, &n->tc_qdisc, TC_HANDLE(0x1000+n->fwmark, 0), TC_HANDLE(1, n->fwmark)); if (ret) error("Failed to setup netem qdisc: %s", nl_geterror(ret)); } diff --git a/lib/node.c b/lib/node.c index 2fa14b92e..cec2e5b3a 100644 --- a/lib/node.c +++ b/lib/node.c @@ -225,6 +225,10 @@ int node_init(struct node *n, struct node_type *vt) n->_name = NULL; n->_name_long = NULL; +#ifdef __linux__ + n->fwmark = -1; +#endif /* __linux__ */ + #ifdef WITH_NETEM n->tc_qdisc = NULL; n->tc_classifier = NULL; @@ -280,16 +284,24 @@ int node_parse(struct node *n, json_t *json, const char *name) n->name = strdup(name); - ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o }, s?: { s?: o } }", + ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o } }", "type", &type, "in", - "signals", &json_signals, - "out", - "netem", &json_netem + "signals", &json_signals ); if (ret) jerror(&err, "Failed to parse node %s", node_name(n)); +#ifdef __linux__ + ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: o, s?: i } }", + "out", + "netem", &json_netem, + "fwmark", &n->fwmark + ); + if (ret) + jerror(&err, "Failed to parse node %s", node_name(n)); +#endif /* __linux__ */ + nt = node_type_lookup(type); assert(nt == node_type(n)); @@ -396,18 +408,18 @@ int node_start(struct node *n) #ifdef __linux__ /* Set fwmark for outgoing packets if netem is enabled for this node */ - if (n->mark) { + if (n->fwmark) { int fds[16]; int num_sds = node_netem_fds(n, fds); for (int i = 0; i < num_sds; i++) { int fd = fds[i]; - ret = setsockopt(fd, SOL_SOCKET, SO_MARK, &n->mark, sizeof(n->mark)); + ret = setsockopt(fd, SOL_SOCKET, SO_MARK, &n->fwmark, sizeof(n->fwmark)); if (ret) serror("Failed to set FW mark for outgoing packets"); else - debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", fd, n->mark); + debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", fd, n->fwmark); } } #endif /* __linux__ */ @@ -657,7 +669,7 @@ char * node_name_long(struct node *n) strcatf(&n->_name_long, ", out.netem=%s", n->tc_qdisc ? "yes" : "no"); if (n->tc_qdisc) - strcatf(&n->_name_long, ", mark=%d", n->mark); + strcatf(&n->_name_long, ", fwmark=%d", n->fwmark); #endif /* WITH_NETEM */ /* Append node-type specific details */ diff --git a/lib/nodes/rtp.c b/lib/nodes/rtp.c index 55cdf9565..2bc368320 100644 --- a/lib/nodes/rtp.c +++ b/lib/nodes/rtp.c @@ -58,6 +58,7 @@ static struct plugin p; static int rtp_set_rate(struct node *n, double rate) { struct rtp *r = (struct rtp *) n->_vd; + int ratio; switch (r->rtcp.throttle_mode) { case RTCP_THROTTLE_HOOK_LIMIT_RATE: @@ -65,7 +66,10 @@ static int rtp_set_rate(struct node *n, double rate) break; case RTCP_THROTTLE_HOOK_DECIMATE: - decimate_set_ratio(r->rtcp.throttle_hook, r->rate / rate); + ratio = r->rate / rate; + if (ratio == 0) + ratio = 1; + decimate_set_ratio(r->rtcp.throttle_hook, ratio); break; case RTCP_THROTTLE_DISABLED: @@ -75,7 +79,7 @@ static int rtp_set_rate(struct node *n, double rate) return -1; } - debug(5, "Set rate limiting for node %s to %f", node_name(n), rate); + info("Set rate limiting for node %s to %f", node_name(n), rate); return 0; } @@ -87,7 +91,10 @@ static int rtp_aimd(struct node *n, double loss_frac) int ret; double rate; - if (loss_frac < 1e-3) + if (!r->rtcp.enabled) + return -1; + + if (loss_frac < 0.01) rate = r->aimd.last_rate + r->aimd.a; else rate = r->aimd.last_rate * r->aimd.b; @@ -98,7 +105,10 @@ static int rtp_aimd(struct node *n, double loss_frac) if (ret) return ret; - fprintf(r->aimd.log, "%d\t%f\t%f\n", r->rtcp.num_rrs, loss_frac, rate); + if (r->aimd.log) + fprintf(r->aimd.log, "%d\t%f\t%f\n", r->rtcp.num_rrs, loss_frac, rate); + + info("AIMD: %d\t%f\t%f", r->rtcp.num_rrs, loss_frac, rate); return 0; } @@ -112,7 +122,8 @@ int rtp_init(struct node *n) r->aimd.a = 10; r->aimd.b = 0.5; - r->aimd.last_rate = 100; + r->aimd.last_rate = 2000; + r->aimd.log = NULL; r->rtcp.enabled = false; r->rtcp.throttle_mode = RTCP_THROTTLE_DISABLED; @@ -163,7 +174,7 @@ int rtp_parse(struct node *n, json_t *cfg) /* AIMD */ if (json_aimd) { - ret = json_unpack_ex(json_rtcp, &err, 0, "{ s?: F, s?: F, s?: F }", + ret = json_unpack_ex(json_aimd, &err, 0, "{ s?: F, s?: F, s?: F }", "a", &r->aimd.a, "b", &r->aimd.b, "start_rate", &r->aimd.last_rate @@ -207,7 +218,7 @@ int rtp_parse(struct node *n, json_t *cfg) /* Format */ r->format = format_type_lookup(format); - if(!r->format) + if (!r->format) error("Invalid format '%s' for node %s", format, node_name(n)); /* Remote address */ @@ -280,6 +291,9 @@ char * rtp_print(struct node *n) } strcatf(&buf, ", rtcp.mode=%s, rtcp.throttle_mode=%s", mode, throttle_mode); + + if (r->rtcp.mode == RTCP_MODE_AIMD) + strcatf(&buf, ", aimd.a=%f, aimd.b=%f, aimd.start_rate=%f", r->aimd.a, r->aimd.b, r->aimd.last_rate); } free(local); @@ -315,16 +329,16 @@ static void rtcp_handler(const struct sa *src, struct rtcp_msg *msg, void *arg) /* source not used */ (void) src; - debug(5, "rtcp: recv %s", rtcp_type_name(msg->hdr.pt)); + debug(5, "RTCP: recv %s", rtcp_type_name(msg->hdr.pt)); if (msg->hdr.pt == RTCP_SR) { - if(msg->hdr.count > 0) { + if (msg->hdr.count > 0) { const struct rtcp_rr *rr = &msg->r.sr.rrv[0]; - debug(5, "rtp: fraction lost = %d", rr->fraction); - rtp_aimd(n, rr->fraction); + debug(5, "RTP: fraction lost = %d", rr->fraction); + rtp_aimd(n, (double) rr->fraction / 256); } else - warning("Received RTCP sender report with zero reception reports"); + debug(5, "RTCP: Received sender report with zero reception reports"); } r->rtcp.num_rrs++; @@ -469,14 +483,14 @@ int rtp_type_start(struct super_node *sn) /* Initialize library */ ret = libre_init(); if (ret) { - error("Error initializing libre"); + warning("Error initializing libre"); return ret; } /* Add worker thread */ ret = pthread_create(&re_pthread, NULL, th_func, NULL); if (ret) { - error("Error creating rtp node type pthread"); + warning("Error creating rtp node type pthread"); return ret; } diff --git a/tests/integration/pipe-loopback-rtp-dual.sh b/tests/integration/pipe-loopback-rtp-dual.sh index aaceb3c00..6de543ea5 100755 --- a/tests/integration/pipe-loopback-rtp-dual.sh +++ b/tests/integration/pipe-loopback-rtp-dual.sh @@ -45,6 +45,9 @@ NUM_SAMPLES=100 cat > ${CONFIG_FILE_SRC} << EOF { + "logging" : { + "level" : "debug" + }, "nodes" : { "rtp_node" : { "type" : "rtp", @@ -77,6 +80,9 @@ EOF cat > ${CONFIG_FILE_DEST} << EOF { + "logging" : { + "level" : "debug" + }, "nodes" : { "rtp_node" : { "type" : "rtp", @@ -107,11 +113,16 @@ cat > ${CONFIG_FILE_DEST} << EOF } EOF -villas-signal mixed -v 5 -l ${NUM_SAMPLES} -n > ${INPUT_FILE} - +VILLAS_LOG_PREFIX="[DEST] " \ villas-pipe -l ${NUM_SAMPLES} ${CONFIG_FILE_DEST} rtp_node > ${OUTPUT_FILE} & +PID=$! -villas-pipe ${CONFIG_FILE_SRC} rtp_node < ${INPUT_FILE} +sleep 1 + +VILLAS_LOG_PREFIX="[SIGN] " \ +villas-signal mixed -v 5 -r ${RATE} -l ${NUM_SAMPLES} | tee ${INPUT_FILE} | \ +VILLAS_LOG_PREFIX="[SRC] " \ +villas-pipe ${CONFIG_FILE_SRC} rtp_node > ${OUTPUT_FILE} # Compare data villas-test-cmp ${CMPFLAGS} ${INPUT_FILE} ${OUTPUT_FILE} @@ -119,4 +130,6 @@ RC=$? rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE} +kill $PID + exit $RC diff --git a/tests/integration/pipe-loopback-rtp-tbf.sh b/tests/integration/pipe-loopback-rtp-tbf.sh new file mode 100755 index 000000000..45bb5cb41 --- /dev/null +++ b/tests/integration/pipe-loopback-rtp-tbf.sh @@ -0,0 +1,140 @@ +#!/bin/bash +# +# Integration loopback test for villas-pipe. +# +# @author Steffen Vogel +# @author Marvin Klimke +# @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC +# @license GNU General Public License (version 3) +# +# VILLASnode +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +################################################################################## + +if [ -n "${CI}" ]; then + echo "RTP tests are not ready yet" + exit 99 +fi + +SCRIPT=$(realpath $0) +SCRIPTPATH=$(dirname ${SCRIPT}) +source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh + +CONFIG_FILE_SRC=$(mktemp) +CONFIG_FILE_DEST=$(mktemp) +INPUT_FILE=$(mktemp) +OUTPUT_FILE=$(mktemp) + +FORMAT="villas.binary" +VECTORIZE="1" + +RATE=500 +NUM_SAMPLES=10000000 +NUM_VALUES=5 + +cat > ${CONFIG_FILE_SRC} << EOF +{ + "logging" : { + "level" : "info" + }, + "nodes" : { + "rtp_node" : { + "type" : "rtp", + "format" : "${FORMAT}", + "vectorize" : ${VECTORIZE}, + "rate" : ${RATE}, + "rtcp" : { + "enabled" : true, + "mode" : "aimd", + "throttle_mode" : "decimate" + }, + "aimd" : { + "a" : 10, + "b" : 0.75, + "start_rate" : ${RATE} + }, + "in" : { + "address" : "0.0.0.0:12002", + "signals" : { + "count" : ${NUM_VALUES}, + "type" : "float" + } + }, + "out" : { + "address" : "127.0.0.1:12000", + "fwmark" : 123 + } + } + } +} +EOF + +cat > ${CONFIG_FILE_DEST} << EOF +{ + "logging" : { + "level" : "info" + }, + "nodes" : { + "rtp_node" : { + "type" : "rtp", + "format" : "${FORMAT}", + "vectorize" : ${VECTORIZE}, + "rate" : ${RATE}, + "rtcp": { + "enabled" : true, + "mode" : "aimd", + "throttle_mode" : "decimate" + }, + "in" : { + "address" : "0.0.0.0:12000", + "signals" : { + "count" : ${NUM_VALUES}, + "type" : "float" + } + }, + "out" : { + "address" : "127.0.0.1:12002" + } + } + } +} +EOF + +tc qdisc del dev lo root +tc qdisc add dev lo root handle 4000 prio bands 4 priomap 1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1 +tc qdisc add dev lo parent 4000:3 tbf rate 40kbps burst 32kbit latency 200ms #peakrate 40kbps mtu 1000 minburst 1520 +tc filter add dev lo protocol ip handle 123 fw flowid 4000:3 + +#exit + +VILLAS_LOG_PREFIX="[DEST] " \ +villas-pipe -l ${NUM_SAMPLES} ${CONFIG_FILE_DEST} rtp_node > ${OUTPUT_FILE} & +PID=$! + +sleep 1 + +VILLAS_LOG_PREFIX="[SIGN] " \ +villas-signal mixed -v ${NUM_VALUES} -r ${RATE} -l ${NUM_SAMPLES} | tee ${INPUT_FILE} | \ +VILLAS_LOG_PREFIX="[SRC] " \ +villas-pipe ${CONFIG_FILE_SRC} rtp_node > ${OUTPUT_FILE} + +# Compare data +villas-test-cmp ${CMPFLAGS} ${INPUT_FILE} ${OUTPUT_FILE} +RC=$? + +rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE} + +kill $PID +exit $RC diff --git a/tools/plots/plot_aimd.py b/tools/plots/plot_aimd.py new file mode 100644 index 000000000..ce8344428 --- /dev/null +++ b/tools/plots/plot_aimd.py @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 + +import sys +import matplotlib as mpl +import numpy as np +import matplotlib.pyplot as plt + +def read_datafile(file_name): + # the skiprows keyword is for heading, but I don't know if trailing lines + # can be specified + data = np.loadtxt(file_name, delimiter='\t', skiprows=1) + return data + +filename = sys.argv[1] + +data = read_datafile(filename) + +print(data[:,0]) + +fig, ax1 = plt.subplots() +ax1.plot(data[:,0], data[:,1]) + +ax2 = ax1.twinx() +ax2.plot(data[:,0], data[:,2], c='red') + +fig.tight_layout() +plt.show()