From 545103ff1c61fb766e6623e316c96d84be10142f Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 10 Sep 2016 20:58:46 -0400 Subject: [PATCH] rewrote socket_read(): there ave been some misunderstandings --- lib/socket.c | 83 +++++++++++++++++++++++----------------------------- 1 file changed, 37 insertions(+), 46 deletions(-) diff --git a/lib/socket.c b/lib/socket.c index 71f5a511c..ab08eebc2 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -212,70 +212,61 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; - int samples, ret, received, smp_count; + int samples, ret, received; ssize_t bytes; - if (s->header == SOCKET_HEADER_GTNET_SKT) - smp_count = cnt; - else - smp_count = 2*cnt; - - struct msg msgs[cnt]; - struct msg hdr; - - float sample_value; - struct iovec iov[smp_count]; - struct msghdr mhdr = { - .msg_iov = iov - }; - if (s->header == SOCKET_HEADER_GTNET_SKT) { - bytes = recv(s->sd, &sample_value, SAMPLE_DATA_LEN(1), MSG_PEEK | MSG_TRUNC); - if (bytes < sizeof(float) || bytes % 4 != 0) { - warn("Packet size is invalid"); + if (cnt < 1) + return 0; + + /* The GTNETv2-SKT protocol send every sample in a single packet. + * socket_read() receives a single packet. */ + struct sample *smp = smps[0]; + + /* Receive next sample */ + bytes = recv(s->sd, &smp->values[0], SAMPLE_DATA_LEN(smp->length), MSG_PEEK | MSG_TRUNC); + if (bytes == 0) + error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */ + else if (bytes < 0) + serror("Failed recv from node %s", node_name(n)); + else if (bytes % 4 != 0) { + warn("Packet size is invalid. Must be multiple of 4 bytes."); + recv(s->sd, NULL, 0, 0); /* empty receive buffer */ return -1; } - samples = bytes / sizeof(sample_value); + received = bytes / sizeof(smp->values[0]); + if (received > smp->length) { + warn("Node %s received more samples than supported. Dropping %u samples", node_name(n), received - smp->length); + received = smp->length; + } - if (samples > cnt) { - warn("Received more samples than supported. Dropping %u samples", samples - cnt); - samples = cnt; - } - /* We add one value per sample */ - for (int i = 0; i < samples; i++) { - iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[i].iov_len = SAMPLE_DATA_LEN(1); /** values per sample is 1 while reading */ - mhdr.msg_iovlen += 1; - } - /* Receive message from socket */ - bytes = recvmsg(s->sd, &mhdr, 0); - if (bytes == 0) - error("Remote node %s closed the connection", node_name(n)); - else if (bytes < 0) - serror("Failed recv from node %s", node_name(n)); - - for (received = 0; received < samples; received++) { - struct sample *smp = smps[received]; - smp->length = 1; - /** @todo see if s->sequence value is needed */ - /** @todo see if s->ts.origin and smp->ts.received value is needed, essentially requiring a header */ - } + /** @todo Should we generate sequence no here manually? + * Or maybe optinally use the first data value as a sequence? + * However this would require the RTDS model to be changed. */ + smp->sequence = 0; + smp->length = received; } - else { + struct msg msgs[cnt]; + struct msg hdr; + struct iovec iov[2*cnt]; + struct msghdr mhdr = { + .msg_iov = iov + }; + /* Peak into message header of the first sample and to get total packet size. */ bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); if (bytes < sizeof(struct msg) || bytes % 4 != 0) { - warn("Packet size is invalid"); - recv(s->sd, &hdr, sizeof(struct msg), 0); + warn("Packet size is invalid. Must be multiple of 4 bytes."); + recv(s->sd, &hdr, sizeof(struct msg), 0); /* empty receive buffer */ return -1; } ret = msg_verify(&hdr); if (ret) { warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); - recv(s->sd, &hdr, sizeof(struct msg), 0); + recv(s->sd, NULL, 0, 0); /* empty receive buffer */ return -1; }