1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'node-rtp-2' into develop

This commit is contained in:
Steffen Vogel 2019-02-17 21:19:24 +01:00
commit b3e729e4ef
7 changed files with 243 additions and 35 deletions

View file

@ -84,12 +84,14 @@ struct node {
struct node_direction in, out;
#ifdef WITH_NETEM
int mark; /**< Socket mark for netem, routing and filtering */
#ifdef __linux__
int fwmark; /**< Socket mark for netem, routing and filtering */
#ifdef WITH_NETEM
struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */
struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */
#endif /* WITH_NETEM */
#endif /* __linux__ */
struct node_type *_vt; /**< Virtual functions (C++ OOP style) */
void *_vd; /**< Virtual data (used by struct node::_vt functions) */

View file

@ -73,23 +73,23 @@ int if_start(struct interface *i)
//if_set_affinity(i, i->affinity);
/* Assign fwmark's to nodes which have netem options */
int ret, mark = 0;
int ret, fwmark = 0;
for (size_t j = 0; j < vlist_length(&i->nodes); j++) {
struct node *n = (struct node *) vlist_at(&i->nodes, j);
if (n->tc_qdisc)
n->mark = 1 + mark++;
if (n->tc_qdisc && n->fwmark < 0)
n->fwmark = 1 + fwmark++;
}
/* Abort if no node is using netem */
if (mark == 0)
if (fwmark == 0)
return 0;
if (getuid() != 0)
error("Network emulation requires super-user privileges!");
/* Replace root qdisc */
ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, mark);
ret = tc_prio(i, &i->tc_qdisc, TC_HANDLE(1, 0), TC_H_ROOT, fwmark);
if (ret)
error("Failed to setup priority queuing discipline: %s", nl_geterror(ret));
@ -98,16 +98,16 @@ int if_start(struct interface *i)
struct node *n = (struct node *) vlist_at(&i->nodes, j);
if (n->tc_qdisc) {
ret = tc_mark(i, &n->tc_classifier, TC_HANDLE(1, n->mark), n->mark);
ret = tc_mark(i, &n->tc_classifier, TC_HANDLE(1, n->fwmark), n->fwmark);
if (ret)
error("Failed to setup FW mark classifier: %s", nl_geterror(ret));
char *buf = tc_netem_print(n->tc_qdisc);
debug(LOG_IF | 5, "Starting network emulation on interface '%s' for FW mark %u: %s",
if_name(i), n->mark, buf);
if_name(i), n->fwmark, buf);
free(buf);
ret = tc_netem(i, &n->tc_qdisc, TC_HANDLE(0x1000+n->mark, 0), TC_HANDLE(1, n->mark));
ret = tc_netem(i, &n->tc_qdisc, TC_HANDLE(0x1000+n->fwmark, 0), TC_HANDLE(1, n->fwmark));
if (ret)
error("Failed to setup netem qdisc: %s", nl_geterror(ret));
}

View file

@ -225,6 +225,10 @@ int node_init(struct node *n, struct node_type *vt)
n->_name = NULL;
n->_name_long = NULL;
#ifdef __linux__
n->fwmark = -1;
#endif /* __linux__ */
#ifdef WITH_NETEM
n->tc_qdisc = NULL;
n->tc_classifier = NULL;
@ -280,16 +284,24 @@ int node_parse(struct node *n, json_t *json, const char *name)
n->name = strdup(name);
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o }, s?: { s?: o } }",
ret = json_unpack_ex(json, &err, 0, "{ s: s, s?: { s?: o } }",
"type", &type,
"in",
"signals", &json_signals,
"out",
"netem", &json_netem
"signals", &json_signals
);
if (ret)
jerror(&err, "Failed to parse node %s", node_name(n));
#ifdef __linux__
ret = json_unpack_ex(json, &err, 0, "{ s?: { s?: o, s?: i } }",
"out",
"netem", &json_netem,
"fwmark", &n->fwmark
);
if (ret)
jerror(&err, "Failed to parse node %s", node_name(n));
#endif /* __linux__ */
nt = node_type_lookup(type);
assert(nt == node_type(n));
@ -396,18 +408,18 @@ int node_start(struct node *n)
#ifdef __linux__
/* Set fwmark for outgoing packets if netem is enabled for this node */
if (n->mark) {
if (n->fwmark) {
int fds[16];
int num_sds = node_netem_fds(n, fds);
for (int i = 0; i < num_sds; i++) {
int fd = fds[i];
ret = setsockopt(fd, SOL_SOCKET, SO_MARK, &n->mark, sizeof(n->mark));
ret = setsockopt(fd, SOL_SOCKET, SO_MARK, &n->fwmark, sizeof(n->fwmark));
if (ret)
serror("Failed to set FW mark for outgoing packets");
else
debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", fd, n->mark);
debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", fd, n->fwmark);
}
}
#endif /* __linux__ */
@ -657,7 +669,7 @@ char * node_name_long(struct node *n)
strcatf(&n->_name_long, ", out.netem=%s", n->tc_qdisc ? "yes" : "no");
if (n->tc_qdisc)
strcatf(&n->_name_long, ", mark=%d", n->mark);
strcatf(&n->_name_long, ", fwmark=%d", n->fwmark);
#endif /* WITH_NETEM */
/* Append node-type specific details */

View file

@ -58,6 +58,7 @@ static struct plugin p;
static int rtp_set_rate(struct node *n, double rate)
{
struct rtp *r = (struct rtp *) n->_vd;
int ratio;
switch (r->rtcp.throttle_mode) {
case RTCP_THROTTLE_HOOK_LIMIT_RATE:
@ -65,7 +66,10 @@ static int rtp_set_rate(struct node *n, double rate)
break;
case RTCP_THROTTLE_HOOK_DECIMATE:
decimate_set_ratio(r->rtcp.throttle_hook, r->rate / rate);
ratio = r->rate / rate;
if (ratio == 0)
ratio = 1;
decimate_set_ratio(r->rtcp.throttle_hook, ratio);
break;
case RTCP_THROTTLE_DISABLED:
@ -75,7 +79,7 @@ static int rtp_set_rate(struct node *n, double rate)
return -1;
}
debug(5, "Set rate limiting for node %s to %f", node_name(n), rate);
info("Set rate limiting for node %s to %f", node_name(n), rate);
return 0;
}
@ -87,7 +91,10 @@ static int rtp_aimd(struct node *n, double loss_frac)
int ret;
double rate;
if (loss_frac < 1e-3)
if (!r->rtcp.enabled)
return -1;
if (loss_frac < 0.01)
rate = r->aimd.last_rate + r->aimd.a;
else
rate = r->aimd.last_rate * r->aimd.b;
@ -98,7 +105,10 @@ static int rtp_aimd(struct node *n, double loss_frac)
if (ret)
return ret;
fprintf(r->aimd.log, "%d\t%f\t%f\n", r->rtcp.num_rrs, loss_frac, rate);
if (r->aimd.log)
fprintf(r->aimd.log, "%d\t%f\t%f\n", r->rtcp.num_rrs, loss_frac, rate);
info("AIMD: %d\t%f\t%f", r->rtcp.num_rrs, loss_frac, rate);
return 0;
}
@ -112,7 +122,8 @@ int rtp_init(struct node *n)
r->aimd.a = 10;
r->aimd.b = 0.5;
r->aimd.last_rate = 100;
r->aimd.last_rate = 2000;
r->aimd.log = NULL;
r->rtcp.enabled = false;
r->rtcp.throttle_mode = RTCP_THROTTLE_DISABLED;
@ -163,7 +174,7 @@ int rtp_parse(struct node *n, json_t *cfg)
/* AIMD */
if (json_aimd) {
ret = json_unpack_ex(json_rtcp, &err, 0, "{ s?: F, s?: F, s?: F }",
ret = json_unpack_ex(json_aimd, &err, 0, "{ s?: F, s?: F, s?: F }",
"a", &r->aimd.a,
"b", &r->aimd.b,
"start_rate", &r->aimd.last_rate
@ -207,7 +218,7 @@ int rtp_parse(struct node *n, json_t *cfg)
/* Format */
r->format = format_type_lookup(format);
if(!r->format)
if (!r->format)
error("Invalid format '%s' for node %s", format, node_name(n));
/* Remote address */
@ -280,6 +291,9 @@ char * rtp_print(struct node *n)
}
strcatf(&buf, ", rtcp.mode=%s, rtcp.throttle_mode=%s", mode, throttle_mode);
if (r->rtcp.mode == RTCP_MODE_AIMD)
strcatf(&buf, ", aimd.a=%f, aimd.b=%f, aimd.start_rate=%f", r->aimd.a, r->aimd.b, r->aimd.last_rate);
}
free(local);
@ -315,16 +329,16 @@ static void rtcp_handler(const struct sa *src, struct rtcp_msg *msg, void *arg)
/* source not used */
(void) src;
debug(5, "rtcp: recv %s", rtcp_type_name(msg->hdr.pt));
debug(5, "RTCP: recv %s", rtcp_type_name(msg->hdr.pt));
if (msg->hdr.pt == RTCP_SR) {
if(msg->hdr.count > 0) {
if (msg->hdr.count > 0) {
const struct rtcp_rr *rr = &msg->r.sr.rrv[0];
debug(5, "rtp: fraction lost = %d", rr->fraction);
rtp_aimd(n, rr->fraction);
debug(5, "RTP: fraction lost = %d", rr->fraction);
rtp_aimd(n, (double) rr->fraction / 256);
}
else
warning("Received RTCP sender report with zero reception reports");
debug(5, "RTCP: Received sender report with zero reception reports");
}
r->rtcp.num_rrs++;
@ -469,14 +483,14 @@ int rtp_type_start(struct super_node *sn)
/* Initialize library */
ret = libre_init();
if (ret) {
error("Error initializing libre");
warning("Error initializing libre");
return ret;
}
/* Add worker thread */
ret = pthread_create(&re_pthread, NULL, th_func, NULL);
if (ret) {
error("Error creating rtp node type pthread");
warning("Error creating rtp node type pthread");
return ret;
}

View file

@ -45,6 +45,9 @@ NUM_SAMPLES=100
cat > ${CONFIG_FILE_SRC} << EOF
{
"logging" : {
"level" : "debug"
},
"nodes" : {
"rtp_node" : {
"type" : "rtp",
@ -77,6 +80,9 @@ EOF
cat > ${CONFIG_FILE_DEST} << EOF
{
"logging" : {
"level" : "debug"
},
"nodes" : {
"rtp_node" : {
"type" : "rtp",
@ -107,11 +113,16 @@ cat > ${CONFIG_FILE_DEST} << EOF
}
EOF
villas-signal mixed -v 5 -l ${NUM_SAMPLES} -n > ${INPUT_FILE}
VILLAS_LOG_PREFIX="[DEST] " \
villas-pipe -l ${NUM_SAMPLES} ${CONFIG_FILE_DEST} rtp_node > ${OUTPUT_FILE} &
PID=$!
villas-pipe ${CONFIG_FILE_SRC} rtp_node < ${INPUT_FILE}
sleep 1
VILLAS_LOG_PREFIX="[SIGN] " \
villas-signal mixed -v 5 -r ${RATE} -l ${NUM_SAMPLES} | tee ${INPUT_FILE} | \
VILLAS_LOG_PREFIX="[SRC] " \
villas-pipe ${CONFIG_FILE_SRC} rtp_node > ${OUTPUT_FILE}
# Compare data
villas-test-cmp ${CMPFLAGS} ${INPUT_FILE} ${OUTPUT_FILE}
@ -119,4 +130,6 @@ RC=$?
rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE}
kill $PID
exit $RC

View file

@ -0,0 +1,140 @@
#!/bin/bash
#
# Integration loopback test for villas-pipe.
#
# @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
# @author Marvin Klimke <marvin.klimke@rwth-aachen.de>
# @copyright 2014-2019, Institute for Automation of Complex Power Systems, EONERC
# @license GNU General Public License (version 3)
#
# VILLASnode
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
##################################################################################
if [ -n "${CI}" ]; then
echo "RTP tests are not ready yet"
exit 99
fi
SCRIPT=$(realpath $0)
SCRIPTPATH=$(dirname ${SCRIPT})
source ${SCRIPTPATH}/../../tools/integration-tests-helper.sh
CONFIG_FILE_SRC=$(mktemp)
CONFIG_FILE_DEST=$(mktemp)
INPUT_FILE=$(mktemp)
OUTPUT_FILE=$(mktemp)
FORMAT="villas.binary"
VECTORIZE="1"
RATE=500
NUM_SAMPLES=10000000
NUM_VALUES=5
cat > ${CONFIG_FILE_SRC} << EOF
{
"logging" : {
"level" : "info"
},
"nodes" : {
"rtp_node" : {
"type" : "rtp",
"format" : "${FORMAT}",
"vectorize" : ${VECTORIZE},
"rate" : ${RATE},
"rtcp" : {
"enabled" : true,
"mode" : "aimd",
"throttle_mode" : "decimate"
},
"aimd" : {
"a" : 10,
"b" : 0.75,
"start_rate" : ${RATE}
},
"in" : {
"address" : "0.0.0.0:12002",
"signals" : {
"count" : ${NUM_VALUES},
"type" : "float"
}
},
"out" : {
"address" : "127.0.0.1:12000",
"fwmark" : 123
}
}
}
}
EOF
cat > ${CONFIG_FILE_DEST} << EOF
{
"logging" : {
"level" : "info"
},
"nodes" : {
"rtp_node" : {
"type" : "rtp",
"format" : "${FORMAT}",
"vectorize" : ${VECTORIZE},
"rate" : ${RATE},
"rtcp": {
"enabled" : true,
"mode" : "aimd",
"throttle_mode" : "decimate"
},
"in" : {
"address" : "0.0.0.0:12000",
"signals" : {
"count" : ${NUM_VALUES},
"type" : "float"
}
},
"out" : {
"address" : "127.0.0.1:12002"
}
}
}
}
EOF
tc qdisc del dev lo root
tc qdisc add dev lo root handle 4000 prio bands 4 priomap 1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1
tc qdisc add dev lo parent 4000:3 tbf rate 40kbps burst 32kbit latency 200ms #peakrate 40kbps mtu 1000 minburst 1520
tc filter add dev lo protocol ip handle 123 fw flowid 4000:3
#exit
VILLAS_LOG_PREFIX="[DEST] " \
villas-pipe -l ${NUM_SAMPLES} ${CONFIG_FILE_DEST} rtp_node > ${OUTPUT_FILE} &
PID=$!
sleep 1
VILLAS_LOG_PREFIX="[SIGN] " \
villas-signal mixed -v ${NUM_VALUES} -r ${RATE} -l ${NUM_SAMPLES} | tee ${INPUT_FILE} | \
VILLAS_LOG_PREFIX="[SRC] " \
villas-pipe ${CONFIG_FILE_SRC} rtp_node > ${OUTPUT_FILE}
# Compare data
villas-test-cmp ${CMPFLAGS} ${INPUT_FILE} ${OUTPUT_FILE}
RC=$?
rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE}
kill $PID
exit $RC

27
tools/plots/plot_aimd.py Normal file
View file

@ -0,0 +1,27 @@
#!/usr/bin/env python3
import sys
import matplotlib as mpl
import numpy as np
import matplotlib.pyplot as plt
def read_datafile(file_name):
# the skiprows keyword is for heading, but I don't know if trailing lines
# can be specified
data = np.loadtxt(file_name, delimiter='\t', skiprows=1)
return data
filename = sys.argv[1]
data = read_datafile(filename)
print(data[:,0])
fig, ax1 = plt.subplots()
ax1.plot(data[:,0], data[:,1])
ax2 = ax1.twinx()
ax2.plot(data[:,0], data[:,2], c='red')
fig.tight_layout()
plt.show()