diff --git a/config.h b/config.h index 087f7b3f9..2ca343bbe 100644 --- a/config.h +++ b/config.h @@ -24,6 +24,9 @@ #define DEFAULT_VALUES 64 #define DEFAULT_QUEUELEN 1024 +/** Whether or not to send / receive timestamp & sequence number as first values of payload */ +#define GTNET_SKT_HEADER 1 + /** Width of log output in characters */ #define LOG_WIDTH 132 diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 257da3f67..f6c3e3f12 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -217,13 +217,37 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) if (s->header == SOCKET_HEADER_GTNET_SKT) { if (cnt < 1) return 0; - + /* The GTNETv2-SKT protocol send every sample in a single packet. * socket_read() receives a single packet. */ struct sample *smp = smps[0]; + +#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER + uint32_t header[3]; + + struct iovec iov[] = { + { /* First three values are sequence, seconds and nano-seconds */ + .iov_base = header, + .iov_len = sizeof(header) + }, +#else + struct iovec iov[] = { +#endif + { /* Remaining values are payload */ + .iov_base = &smp->data, + .iov_len = SAMPLE_DATA_LEN(smp->capacity) + } + }; + + struct msghdr mhdr = { + .msg_iov = iov, + .msg_iovlen = ARRAY_LEN(iov), + .msg_name = (struct sockaddr *) &s->remote, + .msg_namelen = sizeof(s->remote) + }; /* Receive next sample */ - bytes = recv(s->sd, &smp->data[0], SAMPLE_DATA_LEN(smp->capacity), MSG_TRUNC); + bytes = recvmsg(s->sd, &mhdr, MSG_TRUNC); if (bytes == 0) error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */ else if (bytes < 0) @@ -234,9 +258,11 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) return -1; } - debug(3, "Received %zd bytes", bytes); - +#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER + length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1); +#else length = bytes / SAMPLE_DATA_LEN(1); +#endif if (length > smp->capacity) { warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity); length = smp->capacity; @@ -245,7 +271,18 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) /** @todo Should we generate sequence no here manually? * Or maybe optinally use the first data value as a sequence? * However this would require the RTDS model to be changed. */ - smp->sequence = 0; +#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER + smp->sequence = header[0]; + smp->ts.origin.tv_sec = header[1]; + smp->ts.origin.tv_nsec = header[2]; +#else + smp->sequence = -1; + smp->ts.origin.tv_sec = -1; + smp->ts.origin.tv_nsec = -1; +#endif + smp->ts.received.tv_sec = -1; + smp->ts.received.tv_nsec = -1; + smp->length = length; received = 1; /* GTNET-SKT sends every sample in a single packet */ @@ -322,6 +359,8 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) smp->length = m->length; smp->sequence = m->sequence; smp->ts.origin = MSG_TS(m); + smp->ts.received.tv_sec = -1; + smp->ts.received.tv_nsec = -1; } } @@ -342,7 +381,35 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) return 0; for (int i = 0; i < cnt; i++) { - bytes = sendto(s->sd, &smps[i]->data, SAMPLE_DATA_LEN(smps[i]->length), 0, (struct sockaddr *) &s->remote, sizeof(s->remote)); +#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER + uint32_t header[] = { + smps[i]->sequence, + smps[i]->ts.origin.tv_sec, + smps[i]->ts.origin.tv_nsec + }; + + struct iovec iov[] = { + { /* First three values are sequence, seconds and nano-seconds */ + .iov_base = header, + .iov_len = sizeof(header) + }, +#else + struct iovec iov[] = { +#endif + { /* Remaining values are payload */ + .iov_base = &smps[i]->data, + .iov_len = SAMPLE_DATA_LEN(smps[i]->length) + } + }; + + struct msghdr mhdr = { + .msg_iov = iov, + .msg_iovlen = ARRAY_LEN(iov), + .msg_name = (struct sockaddr *) &s->remote, + .msg_namelen = sizeof(s->remote) + }; + + bytes = sendmsg(s->sd, &mhdr, 0); if (bytes < 0) serror("Failed send to node %s", node_name(n)); @@ -413,9 +480,9 @@ int socket_parse(struct node *n, config_setting_t *cfg) if (!config_setting_lookup_string(cfg, "header", &hdr)) s->header = SOCKET_HEADER_DEFAULT; else { - if (!strcmp(hdr, "gtnet-skt")) + if (!strcmp(hdr, "gtnet-skt") || (!strcmp(hdr, "none"))) s->header = SOCKET_HEADER_GTNET_SKT; - else if (!strcmp(hdr, "default")) + else if (!strcmp(hdr, "default") || !strcmp(hdr, "villas")) s->header = SOCKET_HEADER_DEFAULT; else cerror(cfg, "Invalid application header type '%s' for node %s", hdr, node_name(n));