diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 46f64958c..13041023f 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -254,38 +254,23 @@ int socket_destroy(struct node *n) static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt) { - ssize_t bytes; int length; struct socket *s = n->_vd; + + char buf[MSG_MAX_PACKET_LEN]; + uint32_t *values = (uint32_t *) buf; + ssize_t bytes; - /* The GTNETv2-SKT protocol send every sample in a single packet. - * socket_read() receives a single packet. */ - int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1; - struct iovec iov[iov_len]; struct sample *smp = smps[0]; if (cnt < 1) return 0; - uint32_t header[3]; - if (s->header == SOCKET_HEADER_FAKE) { - iov[0].iov_base = header; - iov[0].iov_len = sizeof(header); - } - - /* Remaining values are payload */ - iov[iov_len-1].iov_base = &smp->data; - iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smp->capacity); - - struct msghdr mhdr = { - .msg_iov = iov, - .msg_iovlen = iov_len, - .msg_name = (struct sockaddr *) &s->remote, - .msg_namelen = sizeof(s->remote) - }; - + union sockaddr_union src; + socklen_t srclen = sizeof(src); + /* Receive next sample */ - bytes = recvmsg(s->sd, &mhdr, MSG_TRUNC); + bytes = recvfrom(s->sd, buf, sizeof(buf), 0, &src.sa, &srclen); if (bytes == 0) error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */ else if (bytes < 0) @@ -295,32 +280,52 @@ static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt) recv(s->sd, NULL, 0, 0); /* empty receive buffer */ return -1; } - - /* Convert message to host endianess */ - for (int i = 0; i < ARRAY_LEN(header); i++) - header[i] = s->endian == SOCKET_ENDIAN_BIG - ? be32toh(header[i]) - : le32toh(header[i]); - - for (int i = 0; i < bytes / SAMPLE_DATA_LEN(1); i++) - smp->data[i].i = s->endian == SOCKET_ENDIAN_BIG - ? be32toh(smp->data[i].i) - : le32toh(smp->data[i].i); - - if (s->header == SOCKET_HEADER_FAKE) - length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1); - else - length = bytes / SAMPLE_DATA_LEN(1); - - if (length > smp->capacity) { - warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity); - length = smp->capacity; + + length = bytes / 4; + + /* Strip IP header from packet */ + if (s->layer == SOCKET_LAYER_IP) { + struct ip *iphdr = (struct ip *) buf; + + length -= iphdr->ip_hl; + values += iphdr->ip_hl; + } + + /* SOCK_RAW IP sockets to not provide the IP protocol number via recvmsg() + * So we simply set it ourself. */ + if (s->layer == SOCKET_LAYER_IP) { + switch (src.sa.sa_family) { + case AF_INET: src.sin.sin_port = s->remote.sin.sin_port; break; + case AF_INET6: src.sin6.sin6_port = s->remote.sin6.sin6_port; break; + } } + if (s->verify_source && socket_compare_addr(&src.sa, &s->remote.sa) != 0) { + char *buf = socket_print_addr((struct sockaddr *) &src); + warn("Received packet from unauthorized source: %s", buf); + free(buf); + + return 0; + } + + /* Convert packet contents to host endianess */ + for (int i = 0; i < length; i++) + values[i] = s->endian == SOCKET_ENDIAN_BIG + ? be32toh(values[i]) + : le32toh(values[i]); + if (s->header == SOCKET_HEADER_FAKE) { - smp->sequence = header[0]; - smp->ts.origin.tv_sec = header[1]; - smp->ts.origin.tv_nsec = header[2]; + if (length < 3) { + warn("Node %s received a packet with no fake header. Skipping...", node_name(n)); + return 0; + } + + smp->sequence = values[0]; + smp->ts.origin.tv_sec = values[1]; + smp->ts.origin.tv_nsec = values[2]; + + values += 3; + length -= 3; } else { smp->sequence = n->sequence++; /* Fake sequence no generated by VILLASnode */ @@ -328,6 +333,13 @@ static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt) smp->ts.origin.tv_nsec = 0; } + if (length > smp->capacity) { + warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity); + length = smp->capacity; + } + + memcpy(smp->data, values, SAMPLE_DATA_LEN(length)); + smp->ts.received.tv_sec = 0; smp->ts.received.tv_nsec = 0; @@ -341,32 +353,51 @@ static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cn int ret; struct socket *s = n->_vd; - char data[MSG_MAX_PACKET_LEN]; + char buf[MSG_MAX_PACKET_LEN]; + char *bufptr = buf; ssize_t bytes; - struct sockaddr_storage src; + union sockaddr_union src; socklen_t srclen = sizeof(src); - /* Receive message from socket */ - bytes = recvfrom(s->sd, data, sizeof(data), 0, (struct sockaddr *) &src, &srclen); + /* Receive next sample */ + bytes = recvfrom(s->sd, bufptr, sizeof(buf), 0, &src.sa, &srclen); if (bytes == 0) - error("Remote node %s closed the connection", node_name(n)); + error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */ else if (bytes < 0) - serror("Failed receive packet from node %s", node_name(n)); - else if (bytes < MSG_LEN(1) || bytes % 4 != 0) { - warn("Received invalid packet for node %s", node_name(n)); + serror("Failed recv from node %s", node_name(n)); + else if (bytes % 4 != 0) { + warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes); + recv(s->sd, NULL, 0, 0); /* empty receive buffer */ + return -1; + } + + /* Strip IP header from packet */ + if (s->layer == SOCKET_LAYER_IP) { + struct ip *iphdr = (struct ip *) bufptr; + + bytes -= iphdr->ip_hl * 4; + bufptr += iphdr->ip_hl * 4; + } + + /* SOCK_RAW IP sockets to not provide the IP protocol number via recvmsg() + * So we simply set it ourself. */ + if (s->layer == SOCKET_LAYER_IP) { + switch (src.sa.sa_family) { + case AF_INET: src.sin.sin_port = s->remote.sin.sin_port; break; + case AF_INET6: src.sin6.sin6_port = s->remote.sin6.sin6_port; break; + } + } + + if (s->verify_source && socket_compare_addr(&src.sa, &s->remote.sa) != 0) { + char *buf = socket_print_addr((struct sockaddr *) &src); + warn("Received packet from unauthorized source: %s", buf); + free(buf); + return 0; } - if (s->verify_source && socket_compare_addr((struct sockaddr *) &src, (struct sockaddr *) &s->remote) != 0) { - char *buf = socket_print_addr((struct sockaddr *) &src); - - warn("Received packet from unauthorized source: %s", buf); - - free(buf); - } - - ret = msg_buffer_to_samples(smps, cnt, data, bytes); + ret = msg_buffer_to_samples(smps, cnt, bufptr, bytes); if (ret < 0) warn("Received invalid packet from node: %s", node_name(n)); @@ -610,16 +641,17 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye if (!mac) error("Failed to parse MAC address: %s", node); - memcpy(&sa->sll.sll_addr, &mac->ether_addr_octet, 6); + memcpy(&sa->sll.sll_addr, &mac->ether_addr_octet, ETHER_ADDR_LEN); /* Get interface index from name */ + nl_init(); struct nl_cache *cache = nl_cache_mngt_require("route/link"); struct rtnl_link *link = rtnl_link_get_by_name(cache, ifname); if (!link) error("Failed to get network interface: '%s'", ifname); - sa->sll.sll_protocol = htons((proto) ? strtol(proto, NULL, 0) : ETH_P_VILLAS); - sa->sll.sll_halen = 6; + sa->sll.sll_protocol = htons(proto ? strtol(proto, NULL, 0) : ETH_P_VILLAS); + sa->sll.sll_halen = ETHER_ADDR_LEN; sa->sll.sll_family = AF_PACKET; sa->sll.sll_ifindex = rtnl_link_get_ifindex(link);