diff --git a/etc/example.conf b/etc/example.conf index 2df8b33a4..9f212df34 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -66,6 +66,7 @@ nodes = { vectorize = 30, # Receive and sent 30 samples per message (combining). netem = { # Network emulation settings + enabled = true, # Those settings can be specified for each node invidually! delay = 100000, # Additional latency in microseconds jitter = 30000, # Jitter in uS @@ -73,6 +74,15 @@ nodes = { loss = 10 # Packet loss in percent duplicate = 10, # Duplication in percent corrupt = 10 # Corruption in percent + }, + + multicast = { # IGMP multicast is only support for layer = (ip|udp) + enabled = true, + + group = "224.1.2.3", # The multicast group. Must be within 224.0.0.0/4 + interface = "1.2.3.4", # The IP address of the interface which should receive multicast packets. + ttl = 128, # The time to live for outgoing multicast packets. + loop = false, # Whether or not to loopback outgoing multicast packets to the local host. } }, ethernet_node = { diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index e7b7f9196..bd6692a18 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -34,6 +34,7 @@ #include #include #include +#include #include "node.h" @@ -73,6 +74,14 @@ struct socket { union sockaddr_union local; /**< Local address of the socket */ union sockaddr_union remote; /**< Remote address of the socket */ + + /* Multicast options */ + struct multicast { + int enabled; /**< Is multicast enabled? */ + unsigned char loop; /** Loopback multicast packets to local host? */ + unsigned char ttl; /**< The time to live for multicast packets. */ + struct ip_mreq mreq; /**< A multicast group to join. */ + } multicast; struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */ struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */ diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 86bd8675d..92637d91a 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -142,6 +142,20 @@ char * socket_print(struct node *n) char *remote = socket_print_addr((struct sockaddr *) &s->remote); buf = strf("layer=%s, header=%s, endian=%s, local=%s, remote=%s", layer, header, endian, local, remote); + + if (s->multicast.enabled) { + char group[INET_ADDRSTRLEN]; + char interface[INET_ADDRSTRLEN]; + + inet_ntop(AF_INET, &s->multicast.mreq.imr_multiaddr, group, sizeof(group)); + inet_ntop(AF_INET, &s->multicast.mreq.imr_interface, interface, sizeof(interface)); + + strcatf(&buf, ", multicast.enabled=%s", s->multicast.enabled ? "yes" : "no"); + strcatf(&buf, ", multicast.loop=%s", s->multicast.loop ? "yes" : "no"); + strcatf(&buf, ", multicast.group=%s", group); + strcatf(&buf, ", multicast.interface=%s", s->multicast.mreq.imr_interface.s_addr == INADDR_ANY ? "any" : interface); + strcatf(&buf, ", multicast.ttl=%u", s->multicast.ttl); + } free(local); free(remote); @@ -158,11 +172,20 @@ int socket_start(struct node *n) if (s->local.sa.sa_family != s->remote.sa.sa_family) error("Address families of local and remote must match!"); + if (s->multicast.enabled) { + if (s->local.sa.sa_family != AF_INET) + error("Multicast is only supported by IPv4 for node %s", node_name(n)); + + uint32_t addr = ntohl(s->multicast.mreq.imr_multiaddr.s_addr); + if ((addr >> 28) != 14) + error("Multicast group address of node %s must be within 224.0.0.0/4", node_name(n)); + } + if (s->layer == SOCKET_LAYER_IP) { if (ntohs(s->local.sin.sin_port) != ntohs(s->remote.sin.sin_port)) error("IP protocol numbers of local and remote must match!"); } - else if(s->layer == SOCKET_LAYER_ETH) { + else if (s->layer == SOCKET_LAYER_ETH) { if (ntohs(s->local.sll.sll_protocol) != ntohs(s->remote.sll.sll_protocol)) error("Ethertypes of local and remote must match!"); @@ -195,6 +218,20 @@ int socket_start(struct node *n) else debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", s->sd, s->mark); } + + if (s->multicast.enabled) { + ret = setsockopt(s->sd, IPPROTO_IP, IP_MULTICAST_LOOP, &s->multicast.loop, sizeof(s->multicast.loop)); + if (ret) + serror("Failed to set multicast loop option"); + + ret = setsockopt(s->sd, IPPROTO_IP, IP_MULTICAST_TTL, &s->multicast.ttl, sizeof(s->multicast.ttl)); + if (ret) + serror("Failed to set multicast ttl option"); + + ret = setsockopt(s->sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &s->multicast.mreq, sizeof(s->multicast.mreq)); + if (ret) + serror("Failed to join multicast group"); + } /* Set socket priority, QoS or TOS IP options */ int prio; @@ -234,7 +271,14 @@ int socket_reverse(struct node *n) int socket_stop(struct node *n) { + int ret; struct socket *s = n->_vd; + + if (s->multicast.enabled) { + ret = setsockopt(s->sd, IPPROTO_IP, IP_DROP_MEMBERSHIP, &s->multicast.mreq, sizeof(s->multicast.mreq)); + if (ret) + serror("Failed to leave multicast group"); + } if (s->sd >= 0) close(s->sd); @@ -495,7 +539,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) int socket_parse(struct node *n, config_setting_t *cfg) { - config_setting_t *cfg_netem; + config_setting_t *cfg_netem, *cfg_multicast; const char *local, *remote, *layer, *hdr, *endian; int ret; @@ -560,6 +604,46 @@ int socket_parse(struct node *n, config_setting_t *cfg) cerror(cfg, "Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), gai_strerror(ret)); } + + cfg_multicast = config_setting_get_member(cfg, "multicast"); + if (cfg_multicast) { + const char *group, *interface; + + if (!config_setting_lookup_bool(cfg_multicast, "enabled", &s->multicast.enabled)) + s->multicast.enabled = true; + + if (!config_setting_lookup_string(cfg_multicast, "group", &group)) + cerror(cfg_multicast, "The multicast group requires a 'group' setting."); + else { + ret = inet_aton(group, &s->multicast.mreq.imr_multiaddr); + if (!ret) { + cerror(cfg_multicast, "Failed to resolve multicast group address '%s' of node %s", + group, node_name(n)); + } + } + + if (!config_setting_lookup_string(cfg_multicast, "interface", &interface)) + s->multicast.mreq.imr_interface.s_addr = INADDR_ANY; + else { + ret = inet_aton(group, &s->multicast.mreq.imr_interface); + if (!ret) { + cerror(cfg_multicast, "Failed to resolve multicast interface address '%s' of node %s", + interface, node_name(n)); + } + } + + int loop; + if (!config_setting_lookup_bool(cfg_multicast, "loop", &loop)) + s->multicast.loop = 0; + else + s->multicast.loop = loop; + + int ttl; + if (!config_setting_lookup_int(cfg_multicast, "ttl", &ttl)) + s->multicast.ttl = 255; + else + s->multicast.ttl = ttl; + } cfg_netem = config_setting_get_member(cfg, "netem"); if (cfg_netem) { diff --git a/tests/integration/pipe-loopback-multicast.sh b/tests/integration/pipe-loopback-multicast.sh new file mode 100755 index 000000000..94ad637ee --- /dev/null +++ b/tests/integration/pipe-loopback-multicast.sh @@ -0,0 +1,60 @@ +#!/bin/bash +# +# Integration loopback test for villas-pipe. +# +# @author Steffen Vogel +# @copyright 2017, Institute for Automation of Complex Power Systems, EONERC +# @license GNU General Public License (version 3) +# +# VILLASnode +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +################################################################################## + +CONFIG_FILE=$(mktemp) +INPUT_FILE=$(mktemp) +OUTPUT_FILE=$(mktemp) + +cat > ${CONFIG_FILE} << EOF +nodes = { + node1 = { + type = "socket"; + layer = "udp"; + + local = "*:12000"; + remote = "224.1.2.3:12000"; + + multicast = { # IGMP multicast is only support for layer = (ip|udp) + enabled = true, + + group = "224.1.2.3", # The multicast group. Must be within 224.0.0.0/4 + loop = true # Whether or not to loopback outgoing multicast packets to the local host. + } + } +} +EOF + +# Generate test data +villas-signal random -l 10 -n > ${INPUT_FILE} + +# We delay EOF of the INPUT_FILE by 1 second in order to wait for incoming data to be received +villas-pipe ${CONFIG_FILE} node1 > ${OUTPUT_FILE} < <(sleep 0.5; cat ${INPUT_FILE}; sleep 0.5; echo -n) + +# Comapre data +villas-test-cmp ${INPUT_FILE} ${OUTPUT_FILE} +RC=$? + +rm ${OUTPUT_FILE} ${INPUT_FILE} ${CONFIG_FILE} + +exit $RC \ No newline at end of file