diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index 542fcb404..1d08dcb25 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -42,7 +42,11 @@ union sockaddr_union { struct socket { int sd; /**> The socket descriptor */ int mark; /**> Socket mark for netem, routing and filtering */ - int endian; /** Endianness of the data sent/received by the node */ + + enum { + SOCKET_ENDIAN_LITTLE, + SOCKET_ENDIAN_BIG + } endian; /** Endianness of the data sent/received by the node */ enum socket_layer layer; /**> The OSI / IP layer which should be used for this socket */ enum socket_header header; /**> Payload header type */ diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 7ed72305c..33899b749 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -9,14 +9,7 @@ #include #include #include - -#ifdef __linux__ - #include -#elif defined(__PPC__) /* Xilinx toolchain */ - #include - #define bswap_16(x) Xil_EndianSwap16(x) - #define bswap_32(x) Xil_EndianSwap32(x) -#endif +#include #include "nodes/socket.h" #include "config.h" @@ -26,6 +19,7 @@ #include "kernel/nl.h" #include "kernel/tc.h" #include "msg.h" +#include "msg_format.h" #include "sample.h" #include "queue.h" #include "plugin.h" @@ -126,8 +120,8 @@ char * socket_print(struct node *n) endian = "auto"; else { switch (s->endian) { - case MSG_ENDIAN_LITTLE: endian = "little"; break; - case MSG_ENDIAN_BIG: endian = "big"; break; + case SOCKET_ENDIAN_LITTLE: endian = "little"; break; + case SOCKET_ENDIAN_BIG: endian = "big"; break; } } @@ -231,254 +225,255 @@ int socket_destroy(struct node *n) return 0; } +static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt) +{ + ssize_t bytes; + int length; + struct socket *s = n->_vd; + + /* The GTNETv2-SKT protocol send every sample in a single packet. + * socket_read() receives a single packet. */ + int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1; + struct iovec iov[iov_len]; + struct sample *smp = smps[0]; + + if (cnt < 1) + return 0; + + uint32_t header[3]; + if (s->header == SOCKET_HEADER_FAKE) { + iov[0].iov_base = header; + iov[0].iov_len = sizeof(header); + } + + /* Remaining values are payload */ + iov[iov_len-1].iov_base = &smp->data; + iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smp->capacity); + + struct msghdr mhdr = { + .msg_iov = iov, + .msg_iovlen = iov_len, + .msg_name = (struct sockaddr *) &s->remote, + .msg_namelen = sizeof(s->remote) + }; + + /* Receive next sample */ + bytes = recvmsg(s->sd, &mhdr, MSG_TRUNC); + if (bytes == 0) + error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */ + else if (bytes < 0) + serror("Failed recv from node %s", node_name(n)); + else if (bytes % 4 != 0) { + warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes); + recv(s->sd, NULL, 0, 0); /* empty receive buffer */ + return -1; + } + + /* Convert message to host endianess */ + for (int i = 0; i < ARRAY_LEN(header); i++) + header[i] = s->endian == SOCKET_ENDIAN_BIG + ? be32toh(header[i]) + : le32toh(header[i]); + + for (int i = 0; i < bytes / SAMPLE_DATA_LEN(1); i++) + smp->data[i].i = s->endian == SOCKET_ENDIAN_BIG + ? be32toh(smp->data[i].i) + : le32toh(smp->data[i].i); + + if (s->header == SOCKET_HEADER_FAKE) + length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1); + else + length = bytes / SAMPLE_DATA_LEN(1); + + if (length > smp->capacity) { + warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity); + length = smp->capacity; + } + + if (s->header == SOCKET_HEADER_FAKE) { + smp->sequence = header[0]; + smp->ts.origin.tv_sec = header[1]; + smp->ts.origin.tv_nsec = header[2]; + } + else { + smp->sequence = n->sequence++; /* Fake sequence no generated by VILLASnode */ + smp->ts.origin.tv_sec = -1; + smp->ts.origin.tv_nsec = -1; + } + + smp->ts.received.tv_sec = -1; + smp->ts.received.tv_nsec = -1; + + smp->length = length; + + return 1; /* GTNET-SKT sends every sample in a single packet */ +} + +static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cnt) +{ + struct socket *s = n->_vd; + + int ret; + ssize_t bytes; + + /* Peak into message header of the first sample and to get total packet size. */ + bytes = recv(s->sd, NULL, 0, MSG_PEEK | MSG_TRUNC); + if (bytes < MSG_LEN(1) || bytes % 4 != 0) { + warn("Received invalid packet for node %s", node_name(n)); + recv(s->sd, NULL, 0, 0); /* empty receive buffer */ + return -1; + } + + char data[bytes]; + + /* Receive message from socket */ + bytes = recv(s->sd, data, bytes, 0); + if (bytes == 0) + error("Remote node %s closed the connection", node_name(n)); + else if (bytes < 0) + serror("Failed receive packet from node %s", node_name(n)); + + int received = 0; + char *ptr = data; + + struct msg *msg = (struct msg *) ptr; + struct sample *smp = smps[received]; + + while (ptr < data + bytes - sizeof(struct msg) && received < cnt) { + msg_ntoh(msg); + + ret = msg_verify(msg); + if (ret) { + warn("Received invalid packet for node %s", node_name(n)); + return -1; + } + + smp->length = msg->length; + smp->sequence = msg->sequence; + smp->ts.origin = MSG_TS(msg); + smp->ts.received.tv_sec = -1; + smp->ts.received.tv_nsec = -1; + + memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(msg->length)); + + ptr += MSG_LEN(msg->length); + + msg = (struct msg *) ptr; + smp = smps[++received]; + } + + return received; +} + +static int socket_write_none(struct node *n, struct sample *smps[], unsigned cnt) +{ + struct socket *s = n->_vd; + + int sent = 0; + ssize_t bytes; + + if (cnt < 1) + return 0; + + for (int i = 0; i < cnt; i++) { + int off = s->header == SOCKET_HEADER_FAKE ? 3 : 0; + int len = smps[i]->length + off; + uint32_t data[len]; + + /* First three values are sequence, seconds and nano-seconds timestamps */ + if (s->header == SOCKET_HEADER_FAKE) { + data[0] = smps[i]->sequence; + data[1] = smps[i]->ts.origin.tv_sec; + data[2] = smps[i]->ts.origin.tv_nsec; + } + + for (int j = 0; j < smps[i]->length; j++) + data[off + j] = s->endian == SOCKET_ENDIAN_BIG + ? htobe32(smps[i]->data[j].i) + : htole32(smps[i]->data[j].i); + + bytes = sendto(s->sd, data, len * sizeof(data[0]), 0, + (struct sockaddr *) &s->remote, sizeof(s->remote)); + if (bytes < 0) + serror("Failed send to node %s", node_name(n)); + + sent++; + } + + return sent; +} + +static int socket_write_villas(struct node *n, struct sample *smps[], unsigned cnt) +{ + struct socket *s = n->_vd; + + ssize_t bytes = 0; + + for (int i = 0; i < cnt; i++) + bytes += MSG_LEN(smps[i]->length); + + char data[bytes], *ptr = data; + + struct msg *msg = (struct msg *) ptr; + + for (int i = 0; i < cnt; i++) { + *msg = MSG_INIT(smps[i]->length, smps[i]->sequence); + + msg->ts.sec = smps[i]->ts.origin.tv_sec; + msg->ts.nsec = smps[i]->ts.origin.tv_nsec; + + memcpy(msg->data, smps[i]->data, MSG_DATA_LEN(smps[i]->length)); + + msg_hton(msg); + + ptr += MSG_LEN(msg->length); + + msg = (struct msg *) ptr; + } + + /* Send message */ + bytes = sendto(s->sd, data, bytes, 0, (struct sockaddr *) &s->remote, sizeof(s->remote)); + if (bytes < 0) + serror("Failed send to node %s", node_name(n)); + + return cnt; +} + int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; - int samples, ret, received, length; - ssize_t bytes; - - if (s->header == SOCKET_HEADER_NONE || s->header == SOCKET_HEADER_FAKE) { - if (cnt < 1) - return 0; - - /* The GTNETv2-SKT protocol send every sample in a single packet. - * socket_read() receives a single packet. */ - int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1; - struct iovec iov[iov_len]; - struct sample *smp = smps[0]; - - uint32_t header[3]; - if (s->header == SOCKET_HEADER_FAKE) { - iov[0].iov_base = header; - iov[0].iov_len = sizeof(header); - } - - /* Remaining values are payload */ - iov[iov_len-1].iov_base = &smp->data; - iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smp->capacity); + switch (s->header) { + case SOCKET_HEADER_NONE: + case SOCKET_HEADER_FAKE: + return socket_read_none(n, smps, cnt); - struct msghdr mhdr = { - .msg_iov = iov, - .msg_iovlen = iov_len, - .msg_name = (struct sockaddr *) &s->remote, - .msg_namelen = sizeof(s->remote) - }; - - /* Receive next sample */ - bytes = recvmsg(s->sd, &mhdr, MSG_TRUNC); - if (bytes == 0) - error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */ - else if (bytes < 0) - serror("Failed recv from node %s", node_name(n)); - else if (bytes % 4 != 0) { - warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes); - recv(s->sd, NULL, 0, 0); /* empty receive buffer */ - return -1; - } - - /* Convert message to host endianess */ - if (s->endian != MSG_ENDIAN_HOST) { - for (int i = 0; i < ARRAY_LEN(header); i++) - header[i] = bswap_32(header[i]); - - for (int i = 0; i < bytes / SAMPLE_DATA_LEN(1); i++) - smp->data[i].i = bswap_32(smp->data[i].i); - } - - if (s->header == SOCKET_HEADER_FAKE) - length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1); - else - length = bytes / SAMPLE_DATA_LEN(1); - - if (length > smp->capacity) { - warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity); - length = smp->capacity; - } - - if (s->header == SOCKET_HEADER_FAKE) { - smp->sequence = header[0]; - smp->ts.origin.tv_sec = header[1]; - smp->ts.origin.tv_nsec = header[2]; - } - else { - smp->sequence = n->sequence++; /* Fake sequence no generated by VILLASnode */ - smp->ts.origin.tv_sec = -1; - smp->ts.origin.tv_nsec = -1; - } - - smp->ts.received.tv_sec = -1; - smp->ts.received.tv_nsec = -1; - - smp->length = length; - - received = 1; /* GTNET-SKT sends every sample in a single packet */ - } - else { - struct msg msgs[cnt]; - struct msg hdr; - struct iovec iov[2*cnt]; - struct msghdr mhdr = { - .msg_iov = iov - }; - - /* Peak into message header of the first sample and to get total packet size. */ - bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); - if (bytes < sizeof(struct msg) || bytes % 4 != 0) { - warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes); - recv(s->sd, NULL, 0, 0); /* empty receive buffer */ - return -1; - } - - ret = msg_verify(&hdr); - if (ret) { - warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); - recv(s->sd, NULL, 0, 0); /* empty receive buffer */ - return -1; - } - - /* Convert message to host endianess */ - if (hdr.endian != MSG_ENDIAN_HOST) - msg_hdr_swap(&hdr); - - samples = bytes / MSG_LEN(hdr.length); - if (samples > cnt) { - warn("Node %s received more samples than supported. Dropping %u samples", node_name(n), samples - cnt); - samples = cnt; - } - - /* We expect that all received samples have the same amount of values! */ - for (int i = 0; i < samples; i++) { - iov[2*i+0].iov_base = &msgs[i]; - iov[2*i+0].iov_len = MSG_LEN(0); - - iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.length); - - mhdr.msg_iovlen += 2; - - if (hdr.length > smps[i]->capacity) - error("Node %s received more values than supported. Dropping %d values.", node_name(n), hdr.length - smps[i]->capacity); - } - - /* Receive message from socket */ - bytes = recvmsg(s->sd, &mhdr, 0); //--? samples - cnt samples dropped - if (bytes == 0) - error("Remote node %s closed the connection", node_name(n)); - else if (bytes < 0) - serror("Failed recv from node %s", node_name(n)); - - for (received = 0; received < samples; received++) { - struct msg *m = &msgs[received]; - struct sample *smp = smps[received]; - - ret = msg_verify(m); - if (ret) - break; - - if (m->length != hdr.length) - break; - - /* Convert message to host endianess */ - if (m->endian != MSG_ENDIAN_HOST) { - msg_hdr_swap(m); - - for (int i = 0; i < m->length; i++) - smp->data[i].i = bswap_32(smp->data[i].i); - } - - smp->length = m->length; - smp->sequence = m->sequence; - smp->ts.origin = MSG_TS(m); - smp->ts.received.tv_sec = -1; - smp->ts.received.tv_nsec = -1; - } + case SOCKET_HEADER_DEFAULT: + return socket_read_villas(n, smps, cnt); } - debug(LOG_SOCKET | 17, "Received message of %zd bytes: %u samples", bytes, received); - - return received; + return -1; } int socket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; - ssize_t bytes; - int sent = 0; - /* Construct iovecs */ - if (s->header == SOCKET_HEADER_NONE || s->header == SOCKET_HEADER_FAKE) { - if (cnt < 1) - return 0; - - for (int i = 0; i < cnt; i++) { - int off = s->header == SOCKET_HEADER_FAKE ? 3 : 0; - int len = smps[i]->length + off; - uint32_t data[len]; - - /* First three values are sequence, seconds and nano-seconds timestamps */ - if (s->header == SOCKET_HEADER_FAKE) { - data[0] = smps[i]->sequence; - data[1] = smps[i]->ts.origin.tv_sec; - data[2] = smps[i]->ts.origin.tv_nsec; - } - - for (int j = 0; j < smps[i]->length; j++) { - if (s->endian == MSG_ENDIAN_HOST) - data[off + j] = smps[i]->data[j].i; - else - data[off + j] = bswap_32(smps[i]->data[j].i); - } - - bytes = sendto(s->sd, data, len * sizeof(data[0]), 0, - (struct sockaddr *) &s->remote, sizeof(s->remote)); - if (bytes < 0) - serror("Failed send to node %s", node_name(n)); - - sent++; - - debug(LOG_SOCKET | 17, "Sent packet of %zd bytes with 1 sample", bytes); - } + switch (s->header) { + case SOCKET_HEADER_NONE: + case SOCKET_HEADER_FAKE: + return socket_write_none(n, smps, cnt); + + case SOCKET_HEADER_DEFAULT: + return socket_write_villas(n, smps, cnt); } - else { - struct msg msgs[cnt]; - struct iovec iov[2*cnt]; - struct msghdr mhdr = { - .msg_iov = iov, - .msg_iovlen = ARRAY_LEN(iov), - .msg_name = (struct sockaddr *) &s->remote, - .msg_namelen = sizeof(s->remote) - }; - - for (int i = 0; i < cnt; i++) { - - msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); - - msgs[i].ts.sec = smps[i]->ts.origin.tv_sec; - msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec; - - iov[i*2+0].iov_base = &msgs[i]; - iov[i*2+0].iov_len = MSG_LEN(0); - - iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); - } - - /* Send message */ - bytes = sendmsg(s->sd, &mhdr, 0); - if (bytes < 0) - serror("Failed send to node %s", node_name(n)); - - sent = cnt; /** @todo Find better way to determine how many values we actually sent */ - - debug(LOG_SOCKET | 17, "Sent packet of %zd bytes with %u samples", bytes, cnt); - } - - return sent; + + return -1; } int socket_parse(struct node *n, config_setting_t *cfg) { + config_setting_t *cfg_netem; const char *local, *remote, *layer, *hdr, *endian; int ret; @@ -513,12 +508,12 @@ int socket_parse(struct node *n, config_setting_t *cfg) } if (!config_setting_lookup_string(cfg, "endian", &endian)) - s->endian = MSG_ENDIAN_BIG; + s->endian = SOCKET_ENDIAN_BIG; else { if (!strcmp(endian, "big") || !strcmp(endian, "network")) - s->endian = MSG_ENDIAN_BIG; + s->endian = SOCKET_ENDIAN_BIG; else if (!strcmp(endian, "little")) - s->endian = MSG_ENDIAN_LITTLE; + s->endian = SOCKET_ENDIAN_LITTLE; else cerror(cfg, "Invalid endianness type '%s' for node %s", endian, node_name(n)); } @@ -541,7 +536,7 @@ int socket_parse(struct node *n, config_setting_t *cfg) remote, node_name(n), gai_strerror(ret)); } - config_setting_t *cfg_netem = config_setting_get_member(cfg, "netem"); + cfg_netem = config_setting_get_member(cfg, "netem"); if (cfg_netem) { int enabled = 1; if (!config_setting_lookup_bool(cfg_netem, "enabled", &enabled) || enabled)