mirror of
https://git.rwth-aachen.de/acs/public/villas/node/
synced 2025-03-09 00:00:00 +01:00
socket: use pre-allocated buffers to avoid dynamic allocations and syscalls in critical path
This commit is contained in:
parent
97181dc461
commit
06ef762ad9
2 changed files with 41 additions and 38 deletions
|
@ -104,6 +104,11 @@ struct socket {
|
|||
struct rtnl_qdisc *tc_qdisc; /**< libnl3: Network emulator queuing discipline */
|
||||
struct rtnl_cls *tc_classifier; /**< libnl3: Firewall mark classifier */
|
||||
#endif /* WITH_NETEM */
|
||||
|
||||
struct {
|
||||
char *buf; /**< Buffer for receiving messages */
|
||||
size_t buflen;
|
||||
} in, out;
|
||||
};
|
||||
|
||||
|
||||
|
|
|
@ -338,6 +338,17 @@ int socket_start(struct node *n)
|
|||
#endif /* __linux__ */
|
||||
}
|
||||
|
||||
s->out.buflen = SOCKET_INITIAL_BUFFER_LEN;
|
||||
s->out.buf = alloc(s->out.buflen);
|
||||
if (!s->out.buf)
|
||||
return -1;
|
||||
|
||||
s->in.buflen = SOCKET_INITIAL_BUFFER_LEN;
|
||||
s->in.buf = alloc(s->in.buflen);
|
||||
if (!s->in.buf)
|
||||
return -1;
|
||||
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -374,6 +385,9 @@ int socket_stop(struct node *n)
|
|||
if (ret)
|
||||
return ret;
|
||||
|
||||
free(s->in.buf);
|
||||
free(s->out.buf);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -394,29 +408,21 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r
|
|||
int ret;
|
||||
struct socket *s = (struct socket *) n->_vd;
|
||||
|
||||
char *buf, *ptr;
|
||||
ssize_t buflen;
|
||||
char *ptr;
|
||||
ssize_t bytes;
|
||||
size_t rbytes;
|
||||
|
||||
union sockaddr_union src;
|
||||
socklen_t srclen = sizeof(src);
|
||||
|
||||
/* Get size of next packet */
|
||||
buflen = recvfrom(s->sd, NULL, 0, MSG_TRUNC | MSG_PEEK, &src.sa, &srclen);
|
||||
if (buflen < 0)
|
||||
return -1;
|
||||
|
||||
buf = alloc(buflen);
|
||||
if (!buf)
|
||||
return -1;
|
||||
|
||||
/* Receive next sample */
|
||||
bytes = recvfrom(s->sd, buf, buflen, 0, &src.sa, &srclen);
|
||||
bytes = recvfrom(s->sd, s->in.buf, s->in.buflen, 0, &src.sa, &srclen);
|
||||
if (bytes < 0)
|
||||
serror("Failed recv from node %s", node_name(n));
|
||||
else if (bytes == 0)
|
||||
return 0;
|
||||
|
||||
ptr = buf;
|
||||
ptr = s->in.buf;
|
||||
|
||||
/* Strip IP header from packet */
|
||||
if (s->layer == SOCKET_LAYER_IP) {
|
||||
|
@ -445,8 +451,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r
|
|||
warn("Received packet from unauthorized source: %s", buf);
|
||||
free(buf);
|
||||
|
||||
ret = 0;
|
||||
goto out;
|
||||
return 0;
|
||||
}
|
||||
|
||||
ret = io_sscan(&s->io, ptr, bytes, &rbytes, smps, cnt);
|
||||
|
@ -454,8 +459,6 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *r
|
|||
if (ret < 0 || bytes != rbytes)
|
||||
warn("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes);
|
||||
|
||||
out: free(buf);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -464,26 +467,23 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *
|
|||
struct socket *s = (struct socket *) n->_vd;
|
||||
|
||||
int ret;
|
||||
char *buf;
|
||||
size_t buflen;
|
||||
ssize_t bytes;
|
||||
size_t wbytes;
|
||||
|
||||
buflen = SOCKET_INITIAL_BUFFER_LEN;
|
||||
buf = alloc(buflen);
|
||||
if (!buf)
|
||||
retry: ret = io_sprint(&s->io, s->out.buf, s->out.buflen, &wbytes, smps, cnt);
|
||||
if (ret < 0) {
|
||||
warning("Failed to format payload: reason=%d", ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (wbytes == 0) {
|
||||
warning("Failed to format payload: wbytes=%zu", wbytes);
|
||||
return -1;
|
||||
}
|
||||
|
||||
retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt);
|
||||
if (ret < 0)
|
||||
goto out;
|
||||
|
||||
if (wbytes <= 0)
|
||||
goto out;
|
||||
|
||||
if (wbytes > buflen) {
|
||||
buflen = wbytes;
|
||||
buf = realloc(buf, buflen);
|
||||
if (wbytes > s->out.buflen) {
|
||||
s->out.buflen = wbytes;
|
||||
s->out.buf = realloc(s->out.buf, s->out.buflen);
|
||||
goto retry;
|
||||
}
|
||||
|
||||
|
@ -511,7 +511,7 @@ retry: ret = io_sprint(&s->io, buf, buflen, &wbytes, smps, cnt);
|
|||
addrlen = sizeof(s->local);
|
||||
}
|
||||
|
||||
retry2: bytes = sendto(s->sd, buf, wbytes, 0, (struct sockaddr *) &s->remote, addrlen);
|
||||
retry2: bytes = sendto(s->sd, s->out.buf, wbytes, 0, (struct sockaddr *) &s->remote, addrlen);
|
||||
if (bytes < 0) {
|
||||
if ((errno == EPERM) ||
|
||||
(errno == ENOENT && s->layer == SOCKET_LAYER_UNIX))
|
||||
|
@ -521,15 +521,13 @@ retry2: bytes = sendto(s->sd, buf, wbytes, 0, (struct sockaddr *) &s->remote, ad
|
|||
goto retry2;
|
||||
}
|
||||
else
|
||||
serror("Failed send to node %s", node_name(n));
|
||||
warning("Failed sendto() to node %s", node_name(n));
|
||||
}
|
||||
|
||||
if (bytes != wbytes)
|
||||
warn("Partial send to node %s", node_name(n));
|
||||
warning("Partial sendto() to node %s", node_name(n));
|
||||
|
||||
out: free(buf);
|
||||
|
||||
return ret;
|
||||
return cnt;
|
||||
}
|
||||
|
||||
int socket_parse(struct node *n, json_t *cfg)
|
||||
|
|
Loading…
Add table
Reference in a new issue