diff --git a/include/villas/msg.h b/include/villas/msg.h index 27c8e8bb6..133a457d9 100644 --- a/include/villas/msg.h +++ b/include/villas/msg.h @@ -25,6 +25,7 @@ /* Forward declarations. */ struct msg; +struct sample; /** Swaps the byte-order of the message. * @@ -48,4 +49,17 @@ void msg_hton(struct msg *m); * @retval 0 The message header is valid. * @retval <0 The message header is invalid. */ -int msg_verify(struct msg *m); \ No newline at end of file +int msg_verify(struct msg *m); + +/** Copy fields from \p msg into \p smp. */ +int msg_to_sample(struct msg *msg, struct sample *smp); + +/** Copy fields form \p smp into \p msg. */ +int msg_from_sample(struct msg *msg, struct sample *smp); + + +/** Copy / read struct msg's from buffer \p buf to / fram samples \p smps. */ +ssize_t msg_buffer_from_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len); + +/** Read struct sample's from buffer \p buf into samples \p smps. */ +int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len); diff --git a/lib/msg.c b/lib/msg.c index b857515b2..183b42669 100644 --- a/lib/msg.c +++ b/lib/msg.c @@ -21,9 +21,12 @@ *********************************************************************************/ #include +#include #include "msg.h" #include "msg_format.h" +#include "sample.h" +#include "utils.h" void msg_ntoh(struct msg *m) { @@ -67,4 +70,83 @@ int msg_verify(struct msg *m) return -3; else return 0; +} + +int msg_to_sample(struct msg *msg, struct sample *smp) +{ + int ret; + + msg_ntoh(msg); + + ret = msg_verify(msg); + if (ret) + return -1; + + smp->length = MIN(msg->length, smp->capacity); + smp->sequence = msg->sequence; + smp->ts.origin = MSG_TS(msg); + smp->ts.received.tv_sec = -1; + smp->ts.received.tv_nsec = -1; + + memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(smp->length)); + + return 0; +} + +int msg_from_sample(struct msg *msg, struct sample *smp) +{ + *msg = MSG_INIT(smp->length, smp->sequence); + + msg->ts.sec = smp->ts.origin.tv_sec; + msg->ts.nsec = smp->ts.origin.tv_nsec; + + memcpy(msg->data, smp->data, MSG_DATA_LEN(smp->length)); + + msg_hton(msg); + + return 0; +} + +ssize_t msg_buffer_from_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len) +{ + int ret, i = 0; + char *ptr = buf; + + struct msg *msg = (struct msg *) ptr; + struct sample *smp = smps[i]; + + while (ptr < buf + len && i < cnt) { + ret = msg_from_sample(msg, smp); + if (ret) + return ret; + + ptr += MSG_LEN(smp->length); + + msg = (struct msg *) ptr; + smp = smps[++i]; + } + + return ptr - buf; +} + +int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len) +{ + int ret, i = 0; + char *ptr = buf; + + struct msg *msg = (struct msg *) ptr; + struct sample *smp = smps[i]; + + while (ptr < buf + len && i < cnt) { + ret = msg_to_sample(msg, smp); + if (ret) + return ret; + + ptr += MSG_LEN(smp->length); + + msg = (struct msg *) ptr; + smp = smps[++i]; + } + + return i; } \ No newline at end of file diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index 7b630218e..9eb8fcf9d 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -348,37 +348,16 @@ static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cn error("Remote node %s closed the connection", node_name(n)); else if (bytes < 0) serror("Failed receive packet from node %s", node_name(n)); - - int received = 0; - char *ptr = data; - - struct msg *msg = (struct msg *) ptr; - struct sample *smp = smps[received]; - - while (ptr < data + bytes - sizeof(struct msg) && received < cnt) { - msg_ntoh(msg); - - ret = msg_verify(msg); - if (ret) { - warn("Received invalid packet for node %s", node_name(n)); - return -1; - } - - smp->length = msg->length; - smp->sequence = msg->sequence; - smp->ts.origin = MSG_TS(msg); - smp->ts.received.tv_sec = -1; - smp->ts.received.tv_nsec = -1; - - memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(msg->length)); - - ptr += MSG_LEN(msg->length); - - msg = (struct msg *) ptr; - smp = smps[++received]; + else if (bytes < MSG_LEN(1) || bytes % 4 != 0) { + warn("Received invalid packet for node %s", node_name(n)); + return 0; } - - return received; + + ret = msg_buffer_to_samples(smps, cnt, data, bytes); + if (ret < 0) + warn("Received invalid packet from node: %s", node_name(n)); + + return ret; } static int socket_write_none(struct node *n, struct sample *smps[], unsigned cnt) @@ -423,32 +402,15 @@ static int socket_write_villas(struct node *n, struct sample *smps[], unsigned c { struct socket *s = n->_vd; - ssize_t bytes = 0; + char data[MAX_PACKETLEN]; + ssize_t bytes = 0, sent; - for (int i = 0; i < cnt; i++) - bytes += MSG_LEN(smps[i]->length); - - char data[bytes], *ptr = data; - - struct msg *msg = (struct msg *) ptr; - - for (int i = 0; i < cnt; i++) { - *msg = MSG_INIT(smps[i]->length, smps[i]->sequence); - - msg->ts.sec = smps[i]->ts.origin.tv_sec; - msg->ts.nsec = smps[i]->ts.origin.tv_nsec; - - memcpy(msg->data, smps[i]->data, MSG_DATA_LEN(smps[i]->length)); - - msg_hton(msg); - - ptr += MSG_LEN(msg->length); - - msg = (struct msg *) ptr; - } + sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data)); + if (sent < 0) + return -1; /* Send message */ - bytes = sendto(s->sd, data, bytes, 0, (struct sockaddr *) &s->remote, sizeof(s->remote)); + bytes = sendto(s->sd, data, sent, 0, (struct sockaddr *) &s->remote, sizeof(s->remote)); if (bytes < 0) serror("Failed send to node %s", node_name(n));