1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

socket: rewrite and fixes for layer=„ip“ and layer=„eth“

This commit is contained in:
Steffen Vogel 2017-05-28 18:49:49 +02:00
parent d4fe802a9c
commit e639b67015

View file

@ -254,38 +254,23 @@ int socket_destroy(struct node *n)
static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt)
{
ssize_t bytes;
int length;
struct socket *s = n->_vd;
char buf[MSG_MAX_PACKET_LEN];
uint32_t *values = (uint32_t *) buf;
ssize_t bytes;
/* The GTNETv2-SKT protocol send every sample in a single packet.
* socket_read() receives a single packet. */
int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1;
struct iovec iov[iov_len];
struct sample *smp = smps[0];
if (cnt < 1)
return 0;
uint32_t header[3];
if (s->header == SOCKET_HEADER_FAKE) {
iov[0].iov_base = header;
iov[0].iov_len = sizeof(header);
}
/* Remaining values are payload */
iov[iov_len-1].iov_base = &smp->data;
iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smp->capacity);
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = iov_len,
.msg_name = (struct sockaddr *) &s->remote,
.msg_namelen = sizeof(s->remote)
};
union sockaddr_union src;
socklen_t srclen = sizeof(src);
/* Receive next sample */
bytes = recvmsg(s->sd, &mhdr, MSG_TRUNC);
bytes = recvfrom(s->sd, buf, sizeof(buf), 0, &src.sa, &srclen);
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */
else if (bytes < 0)
@ -295,32 +280,52 @@ static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt)
recv(s->sd, NULL, 0, 0); /* empty receive buffer */
return -1;
}
/* Convert message to host endianess */
for (int i = 0; i < ARRAY_LEN(header); i++)
header[i] = s->endian == SOCKET_ENDIAN_BIG
? be32toh(header[i])
: le32toh(header[i]);
for (int i = 0; i < bytes / SAMPLE_DATA_LEN(1); i++)
smp->data[i].i = s->endian == SOCKET_ENDIAN_BIG
? be32toh(smp->data[i].i)
: le32toh(smp->data[i].i);
if (s->header == SOCKET_HEADER_FAKE)
length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1);
else
length = bytes / SAMPLE_DATA_LEN(1);
if (length > smp->capacity) {
warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity);
length = smp->capacity;
length = bytes / 4;
/* Strip IP header from packet */
if (s->layer == SOCKET_LAYER_IP) {
struct ip *iphdr = (struct ip *) buf;
length -= iphdr->ip_hl;
values += iphdr->ip_hl;
}
/* SOCK_RAW IP sockets to not provide the IP protocol number via recvmsg()
* So we simply set it ourself. */
if (s->layer == SOCKET_LAYER_IP) {
switch (src.sa.sa_family) {
case AF_INET: src.sin.sin_port = s->remote.sin.sin_port; break;
case AF_INET6: src.sin6.sin6_port = s->remote.sin6.sin6_port; break;
}
}
if (s->verify_source && socket_compare_addr(&src.sa, &s->remote.sa) != 0) {
char *buf = socket_print_addr((struct sockaddr *) &src);
warn("Received packet from unauthorized source: %s", buf);
free(buf);
return 0;
}
/* Convert packet contents to host endianess */
for (int i = 0; i < length; i++)
values[i] = s->endian == SOCKET_ENDIAN_BIG
? be32toh(values[i])
: le32toh(values[i]);
if (s->header == SOCKET_HEADER_FAKE) {
smp->sequence = header[0];
smp->ts.origin.tv_sec = header[1];
smp->ts.origin.tv_nsec = header[2];
if (length < 3) {
warn("Node %s received a packet with no fake header. Skipping...", node_name(n));
return 0;
}
smp->sequence = values[0];
smp->ts.origin.tv_sec = values[1];
smp->ts.origin.tv_nsec = values[2];
values += 3;
length -= 3;
}
else {
smp->sequence = n->sequence++; /* Fake sequence no generated by VILLASnode */
@ -328,6 +333,13 @@ static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt)
smp->ts.origin.tv_nsec = 0;
}
if (length > smp->capacity) {
warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity);
length = smp->capacity;
}
memcpy(smp->data, values, SAMPLE_DATA_LEN(length));
smp->ts.received.tv_sec = 0;
smp->ts.received.tv_nsec = 0;
@ -341,32 +353,51 @@ static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cn
int ret;
struct socket *s = n->_vd;
char data[MSG_MAX_PACKET_LEN];
char buf[MSG_MAX_PACKET_LEN];
char *bufptr = buf;
ssize_t bytes;
struct sockaddr_storage src;
union sockaddr_union src;
socklen_t srclen = sizeof(src);
/* Receive message from socket */
bytes = recvfrom(s->sd, data, sizeof(data), 0, (struct sockaddr *) &src, &srclen);
/* Receive next sample */
bytes = recvfrom(s->sd, bufptr, sizeof(buf), 0, &src.sa, &srclen);
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n));
error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */
else if (bytes < 0)
serror("Failed receive packet from node %s", node_name(n));
else if (bytes < MSG_LEN(1) || bytes % 4 != 0) {
warn("Received invalid packet for node %s", node_name(n));
serror("Failed recv from node %s", node_name(n));
else if (bytes % 4 != 0) {
warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes);
recv(s->sd, NULL, 0, 0); /* empty receive buffer */
return -1;
}
/* Strip IP header from packet */
if (s->layer == SOCKET_LAYER_IP) {
struct ip *iphdr = (struct ip *) bufptr;
bytes -= iphdr->ip_hl * 4;
bufptr += iphdr->ip_hl * 4;
}
/* SOCK_RAW IP sockets to not provide the IP protocol number via recvmsg()
* So we simply set it ourself. */
if (s->layer == SOCKET_LAYER_IP) {
switch (src.sa.sa_family) {
case AF_INET: src.sin.sin_port = s->remote.sin.sin_port; break;
case AF_INET6: src.sin6.sin6_port = s->remote.sin6.sin6_port; break;
}
}
if (s->verify_source && socket_compare_addr(&src.sa, &s->remote.sa) != 0) {
char *buf = socket_print_addr((struct sockaddr *) &src);
warn("Received packet from unauthorized source: %s", buf);
free(buf);
return 0;
}
if (s->verify_source && socket_compare_addr((struct sockaddr *) &src, (struct sockaddr *) &s->remote) != 0) {
char *buf = socket_print_addr((struct sockaddr *) &src);
warn("Received packet from unauthorized source: %s", buf);
free(buf);
}
ret = msg_buffer_to_samples(smps, cnt, data, bytes);
ret = msg_buffer_to_samples(smps, cnt, bufptr, bytes);
if (ret < 0)
warn("Received invalid packet from node: %s", node_name(n));
@ -610,16 +641,17 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye
if (!mac)
error("Failed to parse MAC address: %s", node);
memcpy(&sa->sll.sll_addr, &mac->ether_addr_octet, 6);
memcpy(&sa->sll.sll_addr, &mac->ether_addr_octet, ETHER_ADDR_LEN);
/* Get interface index from name */
nl_init();
struct nl_cache *cache = nl_cache_mngt_require("route/link");
struct rtnl_link *link = rtnl_link_get_by_name(cache, ifname);
if (!link)
error("Failed to get network interface: '%s'", ifname);
sa->sll.sll_protocol = htons((proto) ? strtol(proto, NULL, 0) : ETH_P_VILLAS);
sa->sll.sll_halen = 6;
sa->sll.sll_protocol = htons(proto ? strtol(proto, NULL, 0) : ETH_P_VILLAS);
sa->sll.sll_halen = ETHER_ADDR_LEN;
sa->sll.sll_family = AF_PACKET;
sa->sll.sll_ifindex = rtnl_link_get_ifindex(link);