diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index 796e90e84..52bb2f3c3 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -58,7 +58,8 @@ struct io_format; enum socket_layer { SOCKET_LAYER_ETH, SOCKET_LAYER_IP, - SOCKET_LAYER_UDP + SOCKET_LAYER_UDP, + SOCKET_LAYER_UNIX }; union sockaddr_union { diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 0afe867a1..18959e6b1 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -66,6 +66,11 @@ int socket_init(struct super_node *sn) struct socket *s = (struct socket *) n->_vd; struct rtnl_link *link; + if (s->layer != SOCKET_LAYER_ETH || + s->layer != SOCKET_LAYER_IP || + s->layer != SOCKET_LAYER_UDP) + continue; + /* Determine outgoing interface */ ret = if_get_egress((struct sockaddr *) &s->remote, &link); if (ret) { @@ -130,6 +135,7 @@ char * socket_print(struct node *n) case SOCKET_LAYER_UDP: layer = "udp"; break; case SOCKET_LAYER_IP: layer = "ip"; break; case SOCKET_LAYER_ETH: layer = "eth"; break; + case SOCKET_LAYER_UNIX: layer = "unix"; break; } char *local = socket_print_addr((struct sockaddr *) &s->local); @@ -163,16 +169,9 @@ int socket_start(struct node *n) int ret; /* Some checks on the addresses */ - if (s->local.sa.sa_family != s->remote.sa.sa_family) - error("Address families of local and remote must match!"); - - if (s->multicast.enabled) { - if (s->local.sa.sa_family != AF_INET) - error("Multicast is only supported by IPv4 for node %s", node_name(n)); - - uint32_t addr = ntohl(s->multicast.mreq.imr_multiaddr.s_addr); - if ((addr >> 28) != 14) - error("Multicast group address of node %s must be within 224.0.0.0/4", node_name(n)); + if (s->layer != SOCKET_LAYER_UNIX) { + if (s->local.sa.sa_family != s->remote.sa.sa_family) + error("Address families of local and remote must match!"); } if (s->layer == SOCKET_LAYER_IP) { @@ -189,6 +188,15 @@ int socket_start(struct node *n) } #endif /* __linux__ */ + if (s->multicast.enabled) { + if (s->local.sa.sa_family != AF_INET) + error("Multicast is only supported by IPv4 for node %s", node_name(n)); + + uint32_t addr = ntohl(s->multicast.mreq.imr_multiaddr.s_addr); + if ((addr >> 28) != 14) + error("Multicast group address of node %s must be within 224.0.0.0/4", node_name(n)); + } + /* Create socket */ switch (s->layer) { case SOCKET_LAYER_UDP: s->sd = socket(s->local.sa.sa_family, SOCK_DGRAM, IPPROTO_UDP); break; @@ -196,6 +204,7 @@ int socket_start(struct node *n) #ifdef __linux__ case SOCKET_LAYER_ETH: s->sd = socket(s->local.sa.sa_family, SOCK_DGRAM, s->local.sll.sll_protocol); break; #endif /* __linux__ */ + case SOCKET_LAYER_UNIX: s->sd = socket(s->local.sa.sa_family, SOCK_DGRAM, 0); break; default: error("Invalid socket type!"); } @@ -204,7 +213,18 @@ int socket_start(struct node *n) serror("Failed to create socket"); /* Bind socket for receiving */ - ret = bind(s->sd, (struct sockaddr *) &s->local, sizeof(s->local)); + socklen_t addrlen = 0; + if (s->layer == SOCKET_LAYER_UNIX) { + ret = unlink(s->local.sun.sun_path); + if (ret) + return ret; + + addrlen = SUN_LEN(&s->local.sun); + } + else + addrlen = sizeof(s->local); + + ret = bind(s->sd, (struct sockaddr *) &s->local, addrlen); if (ret < 0) serror("Failed to bind socket"); @@ -284,8 +304,11 @@ int socket_stop(struct node *n) serror("Failed to leave multicast group"); } - if (s->sd >= 0) - close(s->sd); + if (s->sd >= 0) { + ret = close(s->sd); + if (ret) + return ret; + } return 0; } @@ -370,9 +393,18 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) return 0; /* Send message */ - bytes = sendto(s->sd, data, wbytes, 0, (struct sockaddr *) &s->remote, sizeof(s->remote)); + socklen_t addrlen = 0; + switch (s->layer) { + case SOCKET_LAYER_UDP: + case SOCKET_LAYER_IP: + case SOCKET_LAYER_ETH: addrlen = sizeof(s->remote); break; + case SOCKET_LAYER_UNIX: addrlen = SUN_LEN(&s->remote.sun); break; + } + + bytes = sendto(s->sd, data, wbytes, 0, (struct sockaddr *) &s->remote, addrlen); if (bytes < 0) { - if (errno == EPERM) + if ((errno == EPERM) || + (errno == ENOENT && s->layer == SOCKET_LAYER_UNIX)) warn("Failed send to node %s: %s", node_name(n), strerror(errno)); else serror("Failed send to node %s", node_name(n)); @@ -395,11 +427,15 @@ int socket_parse(struct node *n, json_t *cfg) int ret; json_t *json_multicast = NULL; + json_t *json_netem = NULL; json_error_t err; /* Default values */ s->layer = SOCKET_LAYER_UDP; s->verify_source = 0; +#ifdef WITH_NETEM + s->tc_qdisc = NULL; +#endif /* WITH_NETEM */ ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s: s, s: s, s?: b, s?: o, s?: s }", "layer", &layer, @@ -427,22 +463,24 @@ int socket_parse(struct node *n, json_t *cfg) #endif /*__linux__ */ else if (!strcmp(layer, "udp")) s->layer = SOCKET_LAYER_UDP; + else if (!strcmp(layer, "unix") || !strcmp(layer, "local")) + s->layer = SOCKET_LAYER_UNIX; else error("Invalid layer '%s' for node %s", layer, node_name(n)); } - ret = socket_parse_addr(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE); - if (ret) { - error("Failed to resolve local address '%s' of node %s: %s", - local, node_name(n), gai_strerror(ret)); - } - ret = socket_parse_addr(remote, (struct sockaddr *) &s->remote, s->layer, 0); if (ret) { error("Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), gai_strerror(ret)); } + ret = socket_parse_addr(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE); + if (ret) { + error("Failed to resolve local address '%s' of node %s: %s", + local, node_name(n), gai_strerror(ret)); + } + if (json_multicast) { const char *group, *interface = NULL; @@ -477,11 +515,8 @@ int socket_parse(struct node *n, json_t *cfg) } } -#ifdef WITH_NETEM - json_t *json_netem; - - json_netem = json_object_get(cfg, "netem"); if (json_netem) { +#ifdef WITH_NETEM int enabled = 1; ret = json_unpack_ex(json_netem, &err, 0, "{ s?: b }", "enabled", &enabled); @@ -492,10 +527,8 @@ int socket_parse(struct node *n, json_t *cfg) tc_parse(&s->tc_qdisc, json_netem); else s->tc_qdisc = NULL; - } - else - s->tc_qdisc = NULL; #endif /* WITH_NETEM */ + } return 0; } @@ -522,6 +555,9 @@ char * socket_print_addr(struct sockaddr *saddr) strcatf(&buf, ":%02x", sa->sll.sll_addr[i]); break; #endif /* __linux__ */ + case AF_UNIX: + strcatf(&buf, "%s", sa->sun.sun_path); + break; default: error("Unknown address family: '%u'", sa->sa.sa_family); @@ -559,8 +595,15 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye char *copy = strdup(addr); int ret; + if (layer == SOCKET_LAYER_UNIX) { /* Format: "/path/to/socket" */ + sa->sun.sun_family = AF_UNIX; + + strncpy(sa->sun.sun_path, addr, sizeof(sa->sun.sun_path)); + + ret = 0; + } #ifdef __linux__ - if (layer == SOCKET_LAYER_ETH) { /* Format: "ab:cd:ef:12:34:56%ifname:protocol" */ + else if (layer == SOCKET_LAYER_ETH) { /* Format: "ab:cd:ef:12:34:56%ifname:protocol" */ /* Split string */ char *node = strtok(copy, "%"); char *ifname = strtok(NULL, ":"); @@ -587,8 +630,8 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye ret = 0; } - else { /* Format: "192.168.0.10:12001" */ #endif /* __linux__ */ + else { /* Format: "192.168.0.10:12001" */ struct addrinfo hint = { .ai_flags = flags, .ai_family = AF_UNSPEC @@ -633,9 +676,8 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye memcpy(sa, result->ai_addr, result->ai_addrlen); freeaddrinfo(result); } -#ifdef __linux__ } -#endif /* __linux__ */ + free(copy); return ret; @@ -676,6 +718,7 @@ int socket_compare_addr(struct sockaddr *x, struct sockaddr *y) CMP(xu->sll.sll_halen, yu->sll.sll_halen); return memcmp(xu->sll.sll_addr, yu->sll.sll_addr, xu->sll.sll_halen); #endif /* __linux__ */ + default: return -1; } @@ -713,3 +756,4 @@ static struct plugin p = { REGISTER_PLUGIN(&p) LIST_INIT_STATIC(&p.node.instances) +