1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-30 00:00:11 +01:00

rewrote socket_read(): there ave been some misunderstandings

This commit is contained in:
Steffen Vogel 2016-09-10 20:58:46 -04:00
parent fe92747aa9
commit 545103ff1c

View file

@ -212,70 +212,61 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
{ {
struct socket *s = n->_vd; struct socket *s = n->_vd;
int samples, ret, received, smp_count; int samples, ret, received;
ssize_t bytes; ssize_t bytes;
if (s->header == SOCKET_HEADER_GTNET_SKT) if (s->header == SOCKET_HEADER_GTNET_SKT) {
smp_count = cnt; if (cnt < 1)
else return 0;
smp_count = 2*cnt;
/* The GTNETv2-SKT protocol send every sample in a single packet.
* socket_read() receives a single packet. */
struct sample *smp = smps[0];
/* Receive next sample */
bytes = recv(s->sd, &smp->values[0], SAMPLE_DATA_LEN(smp->length), MSG_PEEK | MSG_TRUNC);
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */
else if (bytes < 0)
serror("Failed recv from node %s", node_name(n));
else if (bytes % 4 != 0) {
warn("Packet size is invalid. Must be multiple of 4 bytes.");
recv(s->sd, NULL, 0, 0); /* empty receive buffer */
return -1;
}
received = bytes / sizeof(smp->values[0]);
if (received > smp->length) {
warn("Node %s received more samples than supported. Dropping %u samples", node_name(n), received - smp->length);
received = smp->length;
}
/** @todo Should we generate sequence no here manually?
* Or maybe optinally use the first data value as a sequence?
* However this would require the RTDS model to be changed. */
smp->sequence = 0;
smp->length = received;
}
else {
struct msg msgs[cnt]; struct msg msgs[cnt];
struct msg hdr; struct msg hdr;
struct iovec iov[2*cnt];
float sample_value;
struct iovec iov[smp_count];
struct msghdr mhdr = { struct msghdr mhdr = {
.msg_iov = iov .msg_iov = iov
}; };
if (s->header == SOCKET_HEADER_GTNET_SKT) {
bytes = recv(s->sd, &sample_value, SAMPLE_DATA_LEN(1), MSG_PEEK | MSG_TRUNC);
if (bytes < sizeof(float) || bytes % 4 != 0) {
warn("Packet size is invalid");
return -1;
}
samples = bytes / sizeof(sample_value);
if (samples > cnt) {
warn("Received more samples than supported. Dropping %u samples", samples - cnt);
samples = cnt;
}
/* We add one value per sample */
for (int i = 0; i < samples; i++) {
iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]);
iov[i].iov_len = SAMPLE_DATA_LEN(1); /** values per sample is 1 while reading */
mhdr.msg_iovlen += 1;
}
/* Receive message from socket */
bytes = recvmsg(s->sd, &mhdr, 0);
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n));
else if (bytes < 0)
serror("Failed recv from node %s", node_name(n));
for (received = 0; received < samples; received++) {
struct sample *smp = smps[received];
smp->length = 1;
/** @todo see if s->sequence value is needed */
/** @todo see if s->ts.origin and smp->ts.received value is needed, essentially requiring a header */
}
}
else {
/* Peak into message header of the first sample and to get total packet size. */ /* Peak into message header of the first sample and to get total packet size. */
bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC);
if (bytes < sizeof(struct msg) || bytes % 4 != 0) { if (bytes < sizeof(struct msg) || bytes % 4 != 0) {
warn("Packet size is invalid"); warn("Packet size is invalid. Must be multiple of 4 bytes.");
recv(s->sd, &hdr, sizeof(struct msg), 0); recv(s->sd, &hdr, sizeof(struct msg), 0); /* empty receive buffer */
return -1; return -1;
} }
ret = msg_verify(&hdr); ret = msg_verify(&hdr);
if (ret) { if (ret) {
warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes);
recv(s->sd, &hdr, sizeof(struct msg), 0); recv(s->sd, NULL, 0, 0); /* empty receive buffer */
return -1; return -1;
} }