1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

socket: lift limitation of packet lengths (was 1500 bytes)

This commit is contained in:
Steffen Vogel 2017-10-27 19:15:23 +02:00
parent 08a8f29117
commit 7d6ca30f8a
2 changed files with 46 additions and 18 deletions

View file

@ -53,7 +53,7 @@
struct io_format;
/** The maximum length of a packet which contains stuct msg. */
#define SOCKET_MAX_PACKET_LEN 1500
#define SOCKET_INITIAL_BUFFER_LEN 1500
enum socket_layer {
SOCKET_LAYER_ETH,

View file

@ -330,32 +330,43 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
int ret;
struct socket *s = (struct socket *) n->_vd;
char buf[SOCKET_MAX_PACKET_LEN];
char *bufptr = buf;
char *buf, *ptr;
ssize_t buflen;
ssize_t bytes;
size_t rbytes;
union sockaddr_union src;
socklen_t srclen = sizeof(src);
/* Get size of next packet */
buflen = recvfrom(s->sd, NULL, 0, MSG_TRUNC | MSG_PEEK, &src.sa, &srclen);
if (buflen < 0)
return -1;
buf = alloc(buflen);
if (!buf)
return -1;
/* Receive next sample */
bytes = recvfrom(s->sd, bufptr, sizeof(buf), 0, &src.sa, &srclen);
bytes = recvfrom(s->sd, buf, buflen, 0, &src.sa, &srclen);
if (bytes < 0)
serror("Failed recv from node %s", node_name(n));
ptr = buf;
/* Strip IP header from packet */
if (s->layer == SOCKET_LAYER_IP) {
struct ip *iphdr = (struct ip *) bufptr;
struct ip *iphdr = (struct ip *) ptr;
bytes -= iphdr->ip_hl * 4;
bufptr += iphdr->ip_hl * 4;
bytes -= iphdr->ip_hl * 4;
ptr += iphdr->ip_hl * 4;
}
/* SOCK_RAW IP sockets to not provide the IP protocol number via recvmsg()
* So we simply set it ourself. */
if (s->layer == SOCKET_LAYER_IP) {
switch (src.sa.sa_family) {
case AF_INET: src.sin.sin_port = s->remote.sin.sin_port; break;
case AF_INET: src.sin.sin_port = s->remote.sin.sin_port; break;
case AF_INET6: src.sin6.sin6_port = s->remote.sin6.sin6_port; break;
}
}
@ -365,13 +376,16 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
warn("Received packet from unauthorized source: %s", buf);
free(buf);
return 0;
ret = 0;
goto out;
}
ret = io_format_sscan(s->format, bufptr, bytes, &rbytes, smps, cnt, 0);
ret = io_format_sscan(s->format, ptr, bytes, &rbytes, smps, cnt, 0);
if (bytes != rbytes)
warn("Received invalid packet from node: %s bytes=%zu, rbytes=%zu", node_name(n), bytes, rbytes);
if (ret < 0 || bytes != rbytes)
warn("Received invalid packet from node: %s ret=%d, bytes=%zu, rbytes=%zu", node_name(n), ret, bytes, rbytes);
out: free(buf);
return ret;
}
@ -380,17 +394,29 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = (struct socket *) n->_vd;
char data[SOCKET_MAX_PACKET_LEN];
int ret;
char *buf;
size_t buflen;
ssize_t bytes;
size_t wbytes;
ret = io_format_sprint(s->format, data, sizeof(data), &wbytes, smps, cnt, SAMPLE_HAS_ALL);
if (ret < 0)
buflen = SOCKET_INITIAL_BUFFER_LEN;
buf = alloc(buflen);
if (!buf)
return -1;
retry: ret = io_format_sprint(s->format, buf, buflen, &wbytes, smps, cnt, SAMPLE_HAS_ALL);
if (ret < 0)
goto out;
if (wbytes <= 0)
return 0;
goto out;
if (wbytes > buflen) {
buflen = wbytes;
buf = realloc(buf, buflen);
goto retry;
}
/* Send message */
socklen_t addrlen = 0;
@ -401,7 +427,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
case SOCKET_LAYER_UNIX: addrlen = SUN_LEN(&s->remote.sun); break;
}
bytes = sendto(s->sd, data, wbytes, 0, (struct sockaddr *) &s->remote, addrlen);
bytes = sendto(s->sd, buf, wbytes, 0, (struct sockaddr *) &s->remote, addrlen);
if (bytes < 0) {
if ((errno == EPERM) ||
(errno == ENOENT && s->layer == SOCKET_LAYER_UNIX))
@ -413,7 +439,9 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
if (bytes != wbytes)
warn("Partial send to node %s", node_name(n));
return cnt;
out: free(buf);
return ret;
}
int socket_parse(struct node *n, json_t *cfg)