diff --git a/etc/example.conf b/etc/example.conf index 2df8b33a4..9f212df34 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -66,6 +66,7 @@ nodes = { vectorize = 30, # Receive and sent 30 samples per message (combining). netem = { # Network emulation settings + enabled = true, # Those settings can be specified for each node invidually! delay = 100000, # Additional latency in microseconds jitter = 30000, # Jitter in uS @@ -73,6 +74,15 @@ nodes = { loss = 10 # Packet loss in percent duplicate = 10, # Duplication in percent corrupt = 10 # Corruption in percent + }, + + multicast = { # IGMP multicast is only support for layer = (ip|udp) + enabled = true, + + group = "224.1.2.3", # The multicast group. Must be within 224.0.0.0/4 + interface = "1.2.3.4", # The IP address of the interface which should receive multicast packets. + ttl = 128, # The time to live for outgoing multicast packets. + loop = false, # Whether or not to loopback outgoing multicast packets to the local host. } }, ethernet_node = { diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index e7b7f9196..bd6692a18 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -34,6 +34,7 @@ #include #include #include +#include #include "node.h" @@ -73,6 +74,14 @@ struct socket { union sockaddr_union local; /**< Local address of the socket */ union sockaddr_union remote; /**< Remote address of the socket */ + + /* Multicast options */ + struct multicast { + int enabled; /**< Is multicast enabled? */ + unsigned char loop; /** Loopback multicast packets to local host? */ + unsigned char ttl; /**< The time to live for multicast packets. */ + struct ip_mreq mreq; /**< A multicast group to join. */ + } multicast; struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */ struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */ diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 86bd8675d..07108247c 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -142,6 +142,20 @@ char * socket_print(struct node *n) char *remote = socket_print_addr((struct sockaddr *) &s->remote); buf = strf("layer=%s, header=%s, endian=%s, local=%s, remote=%s", layer, header, endian, local, remote); + + if (s->multicast.enabled) { + char group[INET_ADDRSTRLEN]; + char interface[INET_ADDRSTRLEN]; + + inet_ntop(AF_INET, &s->multicast.mreq.imr_multiaddr, group, sizeof(group)); + inet_ntop(AF_INET, &s->multicast.mreq.imr_interface, interface, sizeof(interface)); + + strcatf(&buf, ", multicast.enabled=%s", s->multicast.enabled ? "yes" : "no"); + strcatf(&buf, ", multicast.loop=%s", s->multicast.loop ? "yes" : "no"); + strcatf(&buf, ", multicast.group=%s", group); + strcatf(&buf, ", multicast.interface=%s", s->multicast.mreq.imr_interface.s_addr == INADDR_ANY ? "any" : interface); + strcatf(&buf, ", multicast.ttl=%u", s->multicast.ttl); + } free(local); free(remote); @@ -158,11 +172,16 @@ int socket_start(struct node *n) if (s->local.sa.sa_family != s->remote.sa.sa_family) error("Address families of local and remote must match!"); + if (s->layer != SOCKET_LAYER_IP && s->layer != SOCKET_LAYER_UDP) { + if (s->multicast.enabled) + error("Multicast is only supported for the IP / UDP layers"); + } + if (s->layer == SOCKET_LAYER_IP) { if (ntohs(s->local.sin.sin_port) != ntohs(s->remote.sin.sin_port)) error("IP protocol numbers of local and remote must match!"); } - else if(s->layer == SOCKET_LAYER_ETH) { + else if (s->layer == SOCKET_LAYER_ETH) { if (ntohs(s->local.sll.sll_protocol) != ntohs(s->remote.sll.sll_protocol)) error("Ethertypes of local and remote must match!"); @@ -195,6 +214,20 @@ int socket_start(struct node *n) else debug(LOG_SOCKET | 4, "Set FW mark for socket (sd=%u) to %u", s->sd, s->mark); } + + if (s->multicast.enabled) { + ret = setsockopt(s->sd, IPPROTO_IP, IP_MULTICAST_LOOP, &s->multicast.loop, sizeof(s->multicast.loop)); + if (ret) + serror("Failed to set multicast loop option"); + + ret = setsockopt(s->sd, IPPROTO_IP, IP_MULTICAST_TTL, &s->multicast.ttl, sizeof(s->multicast.ttl)); + if (ret) + serror("Failed to set multicast ttl option"); + + ret = setsockopt(s->sd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &s->multicast.mreq, sizeof(s->multicast.mreq)); + if (ret) + serror("Failed to join multicast group"); + } /* Set socket priority, QoS or TOS IP options */ int prio; @@ -234,7 +267,14 @@ int socket_reverse(struct node *n) int socket_stop(struct node *n) { + int ret; struct socket *s = n->_vd; + + if (s->multicast.enabled) { + ret = setsockopt(s->sd, IPPROTO_IP, IP_DROP_MEMBERSHIP, &s->multicast.mreq, sizeof(s->multicast.mreq)); + if (ret) + serror("Failed to leave multicast group"); + } if (s->sd >= 0) close(s->sd); @@ -495,7 +535,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) int socket_parse(struct node *n, config_setting_t *cfg) { - config_setting_t *cfg_netem; + config_setting_t *cfg_netem, *cfg_multicast; const char *local, *remote, *layer, *hdr, *endian; int ret; @@ -560,6 +600,46 @@ int socket_parse(struct node *n, config_setting_t *cfg) cerror(cfg, "Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), gai_strerror(ret)); } + + cfg_multicast = config_setting_get_member(cfg, "multicast"); + if (cfg_multicast) { + const char *group, *interface; + + if (!config_setting_lookup_bool(cfg_multicast, "enabled", &s->multicast.enabled)) + s->multicast.enabled = true; + + if (!config_setting_lookup_string(cfg_multicast, "group", &group)) + cerror(cfg_multicast, "The multicast group requires a 'group' setting."); + else { + ret = inet_aton(group, &s->multicast.mreq.imr_multiaddr); + if (!ret) { + cerror(cfg_multicast, "Failed to resolve multicast group address '%s' of node %s", + group, node_name(n)); + } + } + + if (!config_setting_lookup_string(cfg_multicast, "interface", &interface)) + s->multicast.mreq.imr_interface.s_addr = INADDR_ANY; + else { + ret = inet_aton(group, &s->multicast.mreq.imr_interface); + if (!ret) { + cerror(cfg_multicast, "Failed to resolve multicast interface address '%s' of node %s", + interface, node_name(n)); + } + } + + int loop; + if (!config_setting_lookup_bool(cfg_multicast, "loop", &loop)) + s->multicast.loop = 0; + else + s->multicast.loop = loop; + + int ttl; + if (!config_setting_lookup_int(cfg_multicast, "ttl", &ttl)) + s->multicast.ttl = 255; + else + s->multicast.ttl = ttl; + } cfg_netem = config_setting_get_member(cfg, "netem"); if (cfg_netem) {