diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index 52bb2f3c3..b06c02fb4 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -53,7 +53,7 @@ struct io_format; /** The maximum length of a packet which contains stuct msg. */ -#define SOCKET_MAX_PACKET_LEN 1500 +#define SOCKET_INITIAL_BUFFER_LEN 1500 enum socket_layer { SOCKET_LAYER_ETH, diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 18959e6b1..8fdd79719 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -330,32 +330,43 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) int ret; struct socket *s = (struct socket *) n->_vd; - char buf[SOCKET_MAX_PACKET_LEN]; - char *bufptr = buf; + char *buf, *ptr; + ssize_t buflen; ssize_t bytes; size_t rbytes; union sockaddr_union src; socklen_t srclen = sizeof(src); + /* Get size of next packet */ + buflen = recvfrom(s->sd, NULL, 0, MSG_TRUNC | MSG_PEEK, &src.sa, &srclen); + if (buflen < 0) + return -1; + + buf = alloc(buflen); + if (!buf) + return -1; + /* Receive next sample */ - bytes = recvfrom(s->sd, bufptr, sizeof(buf), 0, &src.sa, &srclen); + bytes = recvfrom(s->sd, buf, buflen, 0, &src.sa, &srclen); if (bytes < 0) serror("Failed recv from node %s", node_name(n)); + ptr = buf; + /* Strip IP header from packet */ if (s->layer == SOCKET_LAYER_IP) { - struct ip *iphdr = (struct ip *) bufptr; + struct ip *iphdr = (struct ip *) ptr; - bytes -= iphdr->ip_hl * 4; - bufptr += iphdr->ip_hl * 4; + bytes -= iphdr->ip_hl * 4; + ptr += iphdr->ip_hl * 4; } /* SOCK_RAW IP sockets to not provide the IP protocol number via recvmsg() * So we simply set it ourself. */ if (s->layer == SOCKET_LAYER_IP) { switch (src.sa.sa_family) { - case AF_INET: src.sin.sin_port = s->remote.sin.sin_port; break; + case AF_INET: src.sin.sin_port = s->remote.sin.sin_port; break; case AF_INET6: src.sin6.sin6_port = s->remote.sin6.sin6_port; break; } } @@ -365,13 +376,16 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) warn("Received packet from unauthorized source: %s", buf); free(buf); - return 0; + ret = 0; + goto out; } - ret = io_format_sscan(s->format, bufptr, bytes, &rbytes, smps, cnt, 0); + ret = io_format_sscan(s->format, ptr, bytes, &rbytes, smps, cnt, 0); - if (bytes != rbytes) - warn("Received invalid packet from node: %s bytes=%zu, rbytes=%zu", node_name(n), bytes, rbytes); + if (ret < 0 || bytes != rbytes) + warn("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes); + +out: free(buf); return ret; } @@ -380,17 +394,29 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = (struct socket *) n->_vd; - char data[SOCKET_MAX_PACKET_LEN]; int ret; + char *buf; + size_t buflen; ssize_t bytes; size_t wbytes; - ret = io_format_sprint(s->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL); - if (ret < 0) + buflen = SOCKET_INITIAL_BUFFER_LEN; + buf = alloc(buflen); + if (!buf) return -1; +retry: ret = io_format_sprint(s->format, buf, buflen, &wbytes, smps, cnt, SAMPLE_HAS_ALL); + if (ret < 0) + goto out; + if (wbytes <= 0) - return 0; + goto out; + + if (wbytes > buflen) { + buflen = wbytes; + buf = realloc(buf, buflen); + goto retry; + } /* Send message */ socklen_t addrlen = 0; @@ -401,7 +427,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) case SOCKET_LAYER_UNIX: addrlen = SUN_LEN(&s->remote.sun); break; } - bytes = sendto(s->sd, data, wbytes, 0, (struct sockaddr *) &s->remote, addrlen); + bytes = sendto(s->sd, buf, wbytes, 0, (struct sockaddr *) &s->remote, addrlen); if (bytes < 0) { if ((errno == EPERM) || (errno == ENOENT && s->layer == SOCKET_LAYER_UNIX)) @@ -413,7 +439,9 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) if (bytes != wbytes) warn("Partial send to node %s", node_name(n)); - return cnt; +out: free(buf); + + return ret; } int socket_parse(struct node *n, json_t *cfg)