diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index f877c9170..6dd26b9b3 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -104,6 +104,11 @@ struct socket { struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */ struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */ #endif /* WITH_NETEM */ + + struct { + char *buf; /**< Buffer for receiving messages */ + size_t buflen; + } in, out; }; diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 457eb77ad..e6c541735 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -338,6 +338,17 @@ int socket_start(struct node *n) #endif /* __linux__ */ } + s->out.buflen = SOCKET_INITIAL_BUFFER_LEN; + s->out.buf = alloc(s->out.buflen); + if (!s->out.buf) + return -1; + + s->in.buflen = SOCKET_INITIAL_BUFFER_LEN; + s->in.buf = alloc(s->in.buflen); + if (!s->in.buf) + return -1; + + return 0; } @@ -374,6 +385,9 @@ int socket_stop(struct node *n) if (ret) return ret; + free(s->in.buf); + free(s->out.buf); + return 0; } @@ -394,29 +408,21 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r int ret; struct socket *s = (struct socket *) n->_vd; - char *buf, *ptr; - ssize_t buflen; + char *ptr; ssize_t bytes; size_t rbytes; union sockaddr_union src; socklen_t srclen = sizeof(src); - /* Get size of next packet */ - buflen = recvfrom(s->sd, NULL, 0, MSG_TRUNC | MSG_PEEK, &src.sa, &srclen); - if (buflen < 0) - return -1; - - buf = alloc(buflen); - if (!buf) - return -1; - /* Receive next sample */ - bytes = recvfrom(s->sd, buf, buflen, 0, &src.sa, &srclen); + bytes = recvfrom(s->sd, s->in.buf, s->in.buflen, 0, &src.sa, &srclen); if (bytes < 0) serror("Failed recv from node %s", node_name(n)); + else if (bytes == 0) + return 0; - ptr = buf; + ptr = s->in.buf; /* Strip IP header from packet */ if (s->layer == SOCKET_LAYER_IP) { @@ -445,8 +451,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r warn("Received packet from unauthorized source: %s", buf); free(buf); - ret = 0; - goto out; + return 0; } ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, cnt); @@ -454,8 +459,6 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r if (ret < 0 || bytes != rbytes) warn("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes); -out: free(buf); - return ret; } @@ -464,26 +467,23 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned * struct socket *s = (struct socket *) n->_vd; int ret; - char *buf; - size_t buflen; ssize_t bytes; size_t wbytes; - buflen = SOCKET_INITIAL_BUFFER_LEN; - buf = alloc(buflen); - if (!buf) +retry: ret = io_sprint(&s->io, s->out.buf, s->out.buflen, &wbytes, smps, cnt); + if (ret < 0) { + warning("Failed to format payload: reason=%d", ret); + return ret; + } + + if (wbytes == 0) { + warning("Failed to format payload: wbytes=%zu", wbytes); return -1; + } -retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt); - if (ret < 0) - goto out; - - if (wbytes <= 0) - goto out; - - if (wbytes > buflen) { - buflen = wbytes; - buf = realloc(buf, buflen); + if (wbytes > s->out.buflen) { + s->out.buflen = wbytes; + s->out.buf = realloc(s->out.buf, s->out.buflen); goto retry; } @@ -511,7 +511,7 @@ retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt); addrlen = sizeof(s->local); } -retry2: bytes = sendto(s->sd, buf, wbytes, 0, (struct sockaddr *) &s->remote, addrlen); +retry2: bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *) &s->remote, addrlen); if (bytes < 0) { if ((errno == EPERM) || (errno == ENOENT && s->layer == SOCKET_LAYER_UNIX)) @@ -521,15 +521,13 @@ retry2: bytes = sendto(s->sd, buf, wbytes, 0, (struct sockaddr *) &s->remote, ad goto retry2; } else - serror("Failed send to node %s", node_name(n)); + warning("Failed sendto() to node %s", node_name(n)); } if (bytes != wbytes) - warn("Partial send to node %s", node_name(n)); + warning("Partial sendto() to node %s", node_name(n)); -out: free(buf); - - return ret; + return cnt; } int socket_parse(struct node *n, json_t *cfg)