diff --git a/include/villas/node.h b/include/villas/node.h index 32670f8f4..d57553402 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -47,9 +47,7 @@ struct node int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */ int affinity; /**< CPU Affinity of this node */ - - int endian; /** Endianness of the data sent/received by the node */ - + unsigned long sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */ enum node_state { diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index 4a0a6c337..9f5cc7750 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -44,6 +44,7 @@ union sockaddr_union { struct socket { int sd; /**> The socket descriptor */ int mark; /**> Socket mark for netem, routing and filtering */ + int endian; /** Endianness of the data sent/received by the node */ enum socket_layer layer; /**> The OSI / IP layer which should be used for this socket */ enum socket_header header; /**> Payload header type */ diff --git a/include/villas/sample.h b/include/villas/sample.h index 4bd48938a..ec58f8bd8 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -17,14 +17,6 @@ #include #include -#ifdef __linux__ - #include -#elif defined(__PPC__) /* Xilinx toolchain */ - #include - #define bswap_16(x) Xil_EndianSwap16(x) - #define bswap_32(x) Xil_EndianSwap32(x) -#endif - /* Forward declarations */ struct pool; diff --git a/lib/cfg.c b/lib/cfg.c index 9c93a1148..d2781631e 100644 --- a/lib/cfg.c +++ b/lib/cfg.c @@ -279,7 +279,7 @@ int cfg_parse_nodelist(config_setting_t *cfg, struct list *list, struct list *al int cfg_parse_node(config_setting_t *cfg, struct list *nodes, struct settings *set) { - const char *type, *name, *endian; + const char *type, *name; int ret; struct node *n; @@ -319,17 +319,6 @@ int cfg_parse_node(config_setting_t *cfg, struct list *nodes, struct settings *s if (!config_setting_lookup_int(cfg, "affinity", &n->affinity)) n->affinity = set->affinity; - if (!config_setting_lookup_string(cfg, "endian", &endian)) - n->endian = LITTLE_ENDIAN; - else { - if(!strcmp(endian, "big")) - n->endian = BIG_ENDIAN; - else if (!strcmp(endian, "little")) - n->endian = LITTLE_ENDIAN; - else - cerror(cfg, "Invalid endianness type '%s' for node %s", endian, node_name(n)); - } - list_push(nodes, n); return ret; diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 64dfb90e1..9ab1bf712 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -19,6 +19,14 @@ #include #include +#ifdef __linux__ + #include +#elif defined(__PPC__) /* Xilinx toolchain */ + #include + #define bswap_16(x) Xil_EndianSwap16(x) + #define bswap_32(x) Xil_EndianSwap32(x) +#endif + #include "nodes/socket.h" #include "config.h" #include "utils.h" @@ -253,8 +261,21 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) recv(s->sd, NULL, 0, 0); /* empty receive buffer */ return -1; } + + /* Convert message to host endianess */ + if (s->endian != MSG_ENDIAN_HOST) { + for (int i = 0; i < ARRAY_LEN(header); i++) + header[i] = bswap_32(header[i]); + + for (int i = 0; i < bytes / SAMPLE_DATA_LEN(1); i++) + smp->data[i].i = bswap_32(smp->data[i].i); + } - length = (s->header == SOCKET_HEADER_FAKE ? bytes - sizeof(header) : bytes) / SAMPLE_DATA_LEN(1); + if (s->header == SOCKET_HEADER_FAKE) + length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1); + else + length = bytes / SAMPLE_DATA_LEN(1); + if (length > smp->capacity) { warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity); length = smp->capacity; @@ -275,7 +296,6 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) smp->ts.received.tv_nsec = -1; smp->length = length; - smp->endian = n->endian; received = 1; /* GTNET-SKT sends every sample in a single packet */ } @@ -372,33 +392,27 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) if (cnt < 1) return 0; - for (int i = 0; i < cnt; i++) { - int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1; - struct iovec iov[iov_len]; - + for (int i = 0; i < cnt; i++) { + int off = s->header == SOCKET_HEADER_FAKE ? 3 : 0; + int len = smps[i]->length + off; + uint32_t data[len]; + /* First three values are sequence, seconds and nano-seconds timestamps */ - uint32_t header[3]; if (s->header == SOCKET_HEADER_FAKE) { - header[0] = smps[i]->sequence; - header[1] = smps[i]->ts.origin.tv_sec; - header[2] = smps[i]->ts.origin.tv_nsec; - - iov[0].iov_base = header; - iov[0].iov_len = sizeof(header); + data[0] = smps[i]->sequence; + data[1] = smps[i]->ts.origin.tv_sec; + data[2] = smps[i]->ts.origin.tv_nsec; } - /* Remaining values are payload */ - iov[iov_len-1].iov_base = &smps[i]->data; - iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); - - struct msghdr mhdr = { - .msg_iov = iov, - .msg_iovlen = iov_len, - .msg_name = (struct sockaddr *) &s->remote, - .msg_namelen = sizeof(s->remote) - }; + for (int j = 0; j < smps[i]->length; j++) { + if (s->endian == MSG_ENDIAN_HOST) + data[off + j] = smps[i]->data[j].i; + else + data[off + j] = bswap_32(smps[i]->data[j].i); + } - bytes = sendmsg(s->sd, &mhdr, 0); + bytes = sendto(s->sd, data, len * sizeof(data[0]), 0, + (struct sockaddr *) &s->remote, sizeof(s->remote)); if (bytes < 0) serror("Failed send to node %s", node_name(n)); @@ -446,7 +460,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) int socket_parse(struct node *n, config_setting_t *cfg) { - const char *local, *remote, *layer, *hdr; + const char *local, *remote, *layer, *hdr, *endian; int ret; struct socket *s = n->_vd; @@ -478,6 +492,17 @@ int socket_parse(struct node *n, config_setting_t *cfg) else cerror(cfg, "Invalid application header type '%s' for node %s", hdr, node_name(n)); } + + if (!config_setting_lookup_string(cfg, "endian", &endian)) + s->endian = MSG_ENDIAN_BIG; + else { + if (!strcmp(endian, "big") || !strcmp(endian, "network")) + s->endian = MSG_ENDIAN_BIG; + else if (!strcmp(endian, "little")) + s->endian = MSG_ENDIAN_LITTLE; + else + cerror(cfg, "Invalid endianness type '%s' for node %s", endian, node_name(n)); + } if (!config_setting_lookup_string(cfg, "remote", &remote)) cerror(cfg, "Missing remote address for node %s", node_name(n)); diff --git a/lib/sample.c b/lib/sample.c index a3dc5eb46..f3040d3a0 100644 --- a/lib/sample.c +++ b/lib/sample.c @@ -55,14 +55,8 @@ int sample_print(char *buf, size_t len, struct sample *s, int flags) off += snprintf(buf + off, len - off, "(%u)", s->sequence); if (flags & SAMPLE_VALUES) { - for (int i = 0; i < s->length; i++) { - float display_float = s->data[i].f; - if(s->endian == BIG_ENDIAN) { - uint32_t * temp_int = (uint32_t *) &(display_float); - *temp_int = bswap_32(*temp_int); - } - off += snprintf(buf + off, len - off, "\t%.6f", display_float); - } + for (int i = 0; i < s->length; i++) + off += snprintf(buf + off, len - off, "\t%.6f", s->data[i].f); } off += snprintf(buf + off, len - off, "\n");