1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

add new msg_buffer() functions to read / write multiple struct sample from / to a buffer using the struct msg wire protocol.

This commit is contained in:
Steffen Vogel 2017-05-23 09:33:42 +02:00
parent 1e5c143ff9
commit f6909de568
3 changed files with 112 additions and 54 deletions

View file

@ -25,6 +25,7 @@
/* Forward declarations. */
struct msg;
struct sample;
/** Swaps the byte-order of the message.
*
@ -48,4 +49,17 @@ void msg_hton(struct msg *m);
* @retval 0 The message header is valid.
* @retval <0 The message header is invalid.
*/
int msg_verify(struct msg *m);
int msg_verify(struct msg *m);
/** Copy fields from \p msg into \p smp. */
int msg_to_sample(struct msg *msg, struct sample *smp);
/** Copy fields form \p smp into \p msg. */
int msg_from_sample(struct msg *msg, struct sample *smp);
/** Copy / read struct msg's from buffer \p buf to / fram samples \p smps. */
ssize_t msg_buffer_from_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len);
/** Read struct sample's from buffer \p buf into samples \p smps. */
int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len);

View file

@ -21,9 +21,12 @@
*********************************************************************************/
#include <arpa/inet.h>
#include <string.h>
#include "msg.h"
#include "msg_format.h"
#include "sample.h"
#include "utils.h"
void msg_ntoh(struct msg *m)
{
@ -67,4 +70,83 @@ int msg_verify(struct msg *m)
return -3;
else
return 0;
}
int msg_to_sample(struct msg *msg, struct sample *smp)
{
int ret;
msg_ntoh(msg);
ret = msg_verify(msg);
if (ret)
return -1;
smp->length = MIN(msg->length, smp->capacity);
smp->sequence = msg->sequence;
smp->ts.origin = MSG_TS(msg);
smp->ts.received.tv_sec = -1;
smp->ts.received.tv_nsec = -1;
memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(smp->length));
return 0;
}
int msg_from_sample(struct msg *msg, struct sample *smp)
{
*msg = MSG_INIT(smp->length, smp->sequence);
msg->ts.sec = smp->ts.origin.tv_sec;
msg->ts.nsec = smp->ts.origin.tv_nsec;
memcpy(msg->data, smp->data, MSG_DATA_LEN(smp->length));
msg_hton(msg);
return 0;
}
ssize_t msg_buffer_from_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len)
{
int ret, i = 0;
char *ptr = buf;
struct msg *msg = (struct msg *) ptr;
struct sample *smp = smps[i];
while (ptr < buf + len && i < cnt) {
ret = msg_from_sample(msg, smp);
if (ret)
return ret;
ptr += MSG_LEN(smp->length);
msg = (struct msg *) ptr;
smp = smps[++i];
}
return ptr - buf;
}
int msg_buffer_to_samples(struct sample *smps[], unsigned cnt, char *buf, size_t len)
{
int ret, i = 0;
char *ptr = buf;
struct msg *msg = (struct msg *) ptr;
struct sample *smp = smps[i];
while (ptr < buf + len && i < cnt) {
ret = msg_to_sample(msg, smp);
if (ret)
return ret;
ptr += MSG_LEN(smp->length);
msg = (struct msg *) ptr;
smp = smps[++i];
}
return i;
}

View file

@ -348,37 +348,16 @@ static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cn
error("Remote node %s closed the connection", node_name(n));
else if (bytes < 0)
serror("Failed receive packet from node %s", node_name(n));
int received = 0;
char *ptr = data;
struct msg *msg = (struct msg *) ptr;
struct sample *smp = smps[received];
while (ptr < data + bytes - sizeof(struct msg) && received < cnt) {
msg_ntoh(msg);
ret = msg_verify(msg);
if (ret) {
warn("Received invalid packet for node %s", node_name(n));
return -1;
}
smp->length = msg->length;
smp->sequence = msg->sequence;
smp->ts.origin = MSG_TS(msg);
smp->ts.received.tv_sec = -1;
smp->ts.received.tv_nsec = -1;
memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(msg->length));
ptr += MSG_LEN(msg->length);
msg = (struct msg *) ptr;
smp = smps[++received];
else if (bytes < MSG_LEN(1) || bytes % 4 != 0) {
warn("Received invalid packet for node %s", node_name(n));
return 0;
}
return received;
ret = msg_buffer_to_samples(smps, cnt, data, bytes);
if (ret < 0)
warn("Received invalid packet from node: %s", node_name(n));
return ret;
}
static int socket_write_none(struct node *n, struct sample *smps[], unsigned cnt)
@ -423,32 +402,15 @@ static int socket_write_villas(struct node *n, struct sample *smps[], unsigned c
{
struct socket *s = n->_vd;
ssize_t bytes = 0;
char data[MAX_PACKETLEN];
ssize_t bytes = 0, sent;
for (int i = 0; i < cnt; i++)
bytes += MSG_LEN(smps[i]->length);
char data[bytes], *ptr = data;
struct msg *msg = (struct msg *) ptr;
for (int i = 0; i < cnt; i++) {
*msg = MSG_INIT(smps[i]->length, smps[i]->sequence);
msg->ts.sec = smps[i]->ts.origin.tv_sec;
msg->ts.nsec = smps[i]->ts.origin.tv_nsec;
memcpy(msg->data, smps[i]->data, MSG_DATA_LEN(smps[i]->length));
msg_hton(msg);
ptr += MSG_LEN(msg->length);
msg = (struct msg *) ptr;
}
sent = msg_buffer_from_samples(smps, cnt, data, sizeof(data));
if (sent < 0)
return -1;
/* Send message */
bytes = sendto(s->sd, data, bytes, 0, (struct sockaddr *) &s->remote, sizeof(s->remote));
bytes = sendto(s->sd, data, sent, 0, (struct sockaddr *) &s->remote, sizeof(s->remote));
if (bytes < 0)
serror("Failed send to node %s", node_name(n));