1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

adapted socket node-type to support new packet format

This commit is contained in:
Steffen Vogel 2017-04-23 21:54:24 +02:00
parent d6f2697e1b
commit a3222da312
2 changed files with 244 additions and 245 deletions

View file

@ -42,7 +42,11 @@ union sockaddr_union {
struct socket {
int sd; /**> The socket descriptor */
int mark; /**> Socket mark for netem, routing and filtering */
int endian; /** Endianness of the data sent/received by the node */
enum {
SOCKET_ENDIAN_LITTLE,
SOCKET_ENDIAN_BIG
} endian; /** Endianness of the data sent/received by the node */
enum socket_layer layer; /**> The OSI / IP layer which should be used for this socket */
enum socket_header header; /**> Payload header type */

View file

@ -9,14 +9,7 @@
#include <netinet/ip.h>
#include <netinet/ether.h>
#include <arpa/inet.h>
#ifdef __linux__
#include <byteswap.h>
#elif defined(__PPC__) /* Xilinx toolchain */
#include <xil_io.h>
#define bswap_16(x) Xil_EndianSwap16(x)
#define bswap_32(x) Xil_EndianSwap32(x)
#endif
#include <endian.h>
#include "nodes/socket.h"
#include "config.h"
@ -26,6 +19,7 @@
#include "kernel/nl.h"
#include "kernel/tc.h"
#include "msg.h"
#include "msg_format.h"
#include "sample.h"
#include "queue.h"
#include "plugin.h"
@ -126,8 +120,8 @@ char * socket_print(struct node *n)
endian = "auto";
else {
switch (s->endian) {
case MSG_ENDIAN_LITTLE: endian = "little"; break;
case MSG_ENDIAN_BIG: endian = "big"; break;
case SOCKET_ENDIAN_LITTLE: endian = "little"; break;
case SOCKET_ENDIAN_BIG: endian = "big"; break;
}
}
@ -231,254 +225,255 @@ int socket_destroy(struct node *n)
return 0;
}
static int socket_read_none(struct node *n, struct sample *smps[], unsigned cnt)
{
ssize_t bytes;
int length;
struct socket *s = n->_vd;
/* The GTNETv2-SKT protocol send every sample in a single packet.
* socket_read() receives a single packet. */
int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1;
struct iovec iov[iov_len];
struct sample *smp = smps[0];
if (cnt < 1)
return 0;
uint32_t header[3];
if (s->header == SOCKET_HEADER_FAKE) {
iov[0].iov_base = header;
iov[0].iov_len = sizeof(header);
}
/* Remaining values are payload */
iov[iov_len-1].iov_base = &smp->data;
iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smp->capacity);
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = iov_len,
.msg_name = (struct sockaddr *) &s->remote,
.msg_namelen = sizeof(s->remote)
};
/* Receive next sample */
bytes = recvmsg(s->sd, &mhdr, MSG_TRUNC);
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */
else if (bytes < 0)
serror("Failed recv from node %s", node_name(n));
else if (bytes % 4 != 0) {
warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes);
recv(s->sd, NULL, 0, 0); /* empty receive buffer */
return -1;
}
/* Convert message to host endianess */
for (int i = 0; i < ARRAY_LEN(header); i++)
header[i] = s->endian == SOCKET_ENDIAN_BIG
? be32toh(header[i])
: le32toh(header[i]);
for (int i = 0; i < bytes / SAMPLE_DATA_LEN(1); i++)
smp->data[i].i = s->endian == SOCKET_ENDIAN_BIG
? be32toh(smp->data[i].i)
: le32toh(smp->data[i].i);
if (s->header == SOCKET_HEADER_FAKE)
length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1);
else
length = bytes / SAMPLE_DATA_LEN(1);
if (length > smp->capacity) {
warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity);
length = smp->capacity;
}
if (s->header == SOCKET_HEADER_FAKE) {
smp->sequence = header[0];
smp->ts.origin.tv_sec = header[1];
smp->ts.origin.tv_nsec = header[2];
}
else {
smp->sequence = n->sequence++; /* Fake sequence no generated by VILLASnode */
smp->ts.origin.tv_sec = -1;
smp->ts.origin.tv_nsec = -1;
}
smp->ts.received.tv_sec = -1;
smp->ts.received.tv_nsec = -1;
smp->length = length;
return 1; /* GTNET-SKT sends every sample in a single packet */
}
static int socket_read_villas(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
int ret;
ssize_t bytes;
/* Peak into message header of the first sample and to get total packet size. */
bytes = recv(s->sd, NULL, 0, MSG_PEEK | MSG_TRUNC);
if (bytes < MSG_LEN(1) || bytes % 4 != 0) {
warn("Received invalid packet for node %s", node_name(n));
recv(s->sd, NULL, 0, 0); /* empty receive buffer */
return -1;
}
char data[bytes];
/* Receive message from socket */
bytes = recv(s->sd, data, bytes, 0);
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n));
else if (bytes < 0)
serror("Failed receive packet from node %s", node_name(n));
int received = 0;
char *ptr = data;
struct msg *msg = (struct msg *) ptr;
struct sample *smp = smps[received];
while (ptr < data + bytes - sizeof(struct msg) && received < cnt) {
msg_ntoh(msg);
ret = msg_verify(msg);
if (ret) {
warn("Received invalid packet for node %s", node_name(n));
return -1;
}
smp->length = msg->length;
smp->sequence = msg->sequence;
smp->ts.origin = MSG_TS(msg);
smp->ts.received.tv_sec = -1;
smp->ts.received.tv_nsec = -1;
memcpy(smp->data, msg->data, SAMPLE_DATA_LEN(msg->length));
ptr += MSG_LEN(msg->length);
msg = (struct msg *) ptr;
smp = smps[++received];
}
return received;
}
static int socket_write_none(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
int sent = 0;
ssize_t bytes;
if (cnt < 1)
return 0;
for (int i = 0; i < cnt; i++) {
int off = s->header == SOCKET_HEADER_FAKE ? 3 : 0;
int len = smps[i]->length + off;
uint32_t data[len];
/* First three values are sequence, seconds and nano-seconds timestamps */
if (s->header == SOCKET_HEADER_FAKE) {
data[0] = smps[i]->sequence;
data[1] = smps[i]->ts.origin.tv_sec;
data[2] = smps[i]->ts.origin.tv_nsec;
}
for (int j = 0; j < smps[i]->length; j++)
data[off + j] = s->endian == SOCKET_ENDIAN_BIG
? htobe32(smps[i]->data[j].i)
: htole32(smps[i]->data[j].i);
bytes = sendto(s->sd, data, len * sizeof(data[0]), 0,
(struct sockaddr *) &s->remote, sizeof(s->remote));
if (bytes < 0)
serror("Failed send to node %s", node_name(n));
sent++;
}
return sent;
}
static int socket_write_villas(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
ssize_t bytes = 0;
for (int i = 0; i < cnt; i++)
bytes += MSG_LEN(smps[i]->length);
char data[bytes], *ptr = data;
struct msg *msg = (struct msg *) ptr;
for (int i = 0; i < cnt; i++) {
*msg = MSG_INIT(smps[i]->length, smps[i]->sequence);
msg->ts.sec = smps[i]->ts.origin.tv_sec;
msg->ts.nsec = smps[i]->ts.origin.tv_nsec;
memcpy(msg->data, smps[i]->data, MSG_DATA_LEN(smps[i]->length));
msg_hton(msg);
ptr += MSG_LEN(msg->length);
msg = (struct msg *) ptr;
}
/* Send message */
bytes = sendto(s->sd, data, bytes, 0, (struct sockaddr *) &s->remote, sizeof(s->remote));
if (bytes < 0)
serror("Failed send to node %s", node_name(n));
return cnt;
}
int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
int samples, ret, received, length;
ssize_t bytes;
if (s->header == SOCKET_HEADER_NONE || s->header == SOCKET_HEADER_FAKE) {
if (cnt < 1)
return 0;
/* The GTNETv2-SKT protocol send every sample in a single packet.
* socket_read() receives a single packet. */
int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1;
struct iovec iov[iov_len];
struct sample *smp = smps[0];
uint32_t header[3];
if (s->header == SOCKET_HEADER_FAKE) {
iov[0].iov_base = header;
iov[0].iov_len = sizeof(header);
}
/* Remaining values are payload */
iov[iov_len-1].iov_base = &smp->data;
iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smp->capacity);
switch (s->header) {
case SOCKET_HEADER_NONE:
case SOCKET_HEADER_FAKE:
return socket_read_none(n, smps, cnt);
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = iov_len,
.msg_name = (struct sockaddr *) &s->remote,
.msg_namelen = sizeof(s->remote)
};
/* Receive next sample */
bytes = recvmsg(s->sd, &mhdr, MSG_TRUNC);
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */
else if (bytes < 0)
serror("Failed recv from node %s", node_name(n));
else if (bytes % 4 != 0) {
warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes);
recv(s->sd, NULL, 0, 0); /* empty receive buffer */
return -1;
}
/* Convert message to host endianess */
if (s->endian != MSG_ENDIAN_HOST) {
for (int i = 0; i < ARRAY_LEN(header); i++)
header[i] = bswap_32(header[i]);
for (int i = 0; i < bytes / SAMPLE_DATA_LEN(1); i++)
smp->data[i].i = bswap_32(smp->data[i].i);
}
if (s->header == SOCKET_HEADER_FAKE)
length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1);
else
length = bytes / SAMPLE_DATA_LEN(1);
if (length > smp->capacity) {
warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity);
length = smp->capacity;
}
if (s->header == SOCKET_HEADER_FAKE) {
smp->sequence = header[0];
smp->ts.origin.tv_sec = header[1];
smp->ts.origin.tv_nsec = header[2];
}
else {
smp->sequence = n->sequence++; /* Fake sequence no generated by VILLASnode */
smp->ts.origin.tv_sec = -1;
smp->ts.origin.tv_nsec = -1;
}
smp->ts.received.tv_sec = -1;
smp->ts.received.tv_nsec = -1;
smp->length = length;
received = 1; /* GTNET-SKT sends every sample in a single packet */
}
else {
struct msg msgs[cnt];
struct msg hdr;
struct iovec iov[2*cnt];
struct msghdr mhdr = {
.msg_iov = iov
};
/* Peak into message header of the first sample and to get total packet size. */
bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC);
if (bytes < sizeof(struct msg) || bytes % 4 != 0) {
warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes);
recv(s->sd, NULL, 0, 0); /* empty receive buffer */
return -1;
}
ret = msg_verify(&hdr);
if (ret) {
warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes);
recv(s->sd, NULL, 0, 0); /* empty receive buffer */
return -1;
}
/* Convert message to host endianess */
if (hdr.endian != MSG_ENDIAN_HOST)
msg_hdr_swap(&hdr);
samples = bytes / MSG_LEN(hdr.length);
if (samples > cnt) {
warn("Node %s received more samples than supported. Dropping %u samples", node_name(n), samples - cnt);
samples = cnt;
}
/* We expect that all received samples have the same amount of values! */
for (int i = 0; i < samples; i++) {
iov[2*i+0].iov_base = &msgs[i];
iov[2*i+0].iov_len = MSG_LEN(0);
iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]);
iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.length);
mhdr.msg_iovlen += 2;
if (hdr.length > smps[i]->capacity)
error("Node %s received more values than supported. Dropping %d values.", node_name(n), hdr.length - smps[i]->capacity);
}
/* Receive message from socket */
bytes = recvmsg(s->sd, &mhdr, 0); //--? samples - cnt samples dropped
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n));
else if (bytes < 0)
serror("Failed recv from node %s", node_name(n));
for (received = 0; received < samples; received++) {
struct msg *m = &msgs[received];
struct sample *smp = smps[received];
ret = msg_verify(m);
if (ret)
break;
if (m->length != hdr.length)
break;
/* Convert message to host endianess */
if (m->endian != MSG_ENDIAN_HOST) {
msg_hdr_swap(m);
for (int i = 0; i < m->length; i++)
smp->data[i].i = bswap_32(smp->data[i].i);
}
smp->length = m->length;
smp->sequence = m->sequence;
smp->ts.origin = MSG_TS(m);
smp->ts.received.tv_sec = -1;
smp->ts.received.tv_nsec = -1;
}
case SOCKET_HEADER_DEFAULT:
return socket_read_villas(n, smps, cnt);
}
debug(LOG_SOCKET | 17, "Received message of %zd bytes: %u samples", bytes, received);
return received;
return -1;
}
int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
ssize_t bytes;
int sent = 0;
/* Construct iovecs */
if (s->header == SOCKET_HEADER_NONE || s->header == SOCKET_HEADER_FAKE) {
if (cnt < 1)
return 0;
for (int i = 0; i < cnt; i++) {
int off = s->header == SOCKET_HEADER_FAKE ? 3 : 0;
int len = smps[i]->length + off;
uint32_t data[len];
/* First three values are sequence, seconds and nano-seconds timestamps */
if (s->header == SOCKET_HEADER_FAKE) {
data[0] = smps[i]->sequence;
data[1] = smps[i]->ts.origin.tv_sec;
data[2] = smps[i]->ts.origin.tv_nsec;
}
for (int j = 0; j < smps[i]->length; j++) {
if (s->endian == MSG_ENDIAN_HOST)
data[off + j] = smps[i]->data[j].i;
else
data[off + j] = bswap_32(smps[i]->data[j].i);
}
bytes = sendto(s->sd, data, len * sizeof(data[0]), 0,
(struct sockaddr *) &s->remote, sizeof(s->remote));
if (bytes < 0)
serror("Failed send to node %s", node_name(n));
sent++;
debug(LOG_SOCKET | 17, "Sent packet of %zd bytes with 1 sample", bytes);
}
switch (s->header) {
case SOCKET_HEADER_NONE:
case SOCKET_HEADER_FAKE:
return socket_write_none(n, smps, cnt);
case SOCKET_HEADER_DEFAULT:
return socket_write_villas(n, smps, cnt);
}
else {
struct msg msgs[cnt];
struct iovec iov[2*cnt];
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = ARRAY_LEN(iov),
.msg_name = (struct sockaddr *) &s->remote,
.msg_namelen = sizeof(s->remote)
};
for (int i = 0; i < cnt; i++) {
msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence);
msgs[i].ts.sec = smps[i]->ts.origin.tv_sec;
msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec;
iov[i*2+0].iov_base = &msgs[i];
iov[i*2+0].iov_len = MSG_LEN(0);
iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]);
iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length);
}
/* Send message */
bytes = sendmsg(s->sd, &mhdr, 0);
if (bytes < 0)
serror("Failed send to node %s", node_name(n));
sent = cnt; /** @todo Find better way to determine how many values we actually sent */
debug(LOG_SOCKET | 17, "Sent packet of %zd bytes with %u samples", bytes, cnt);
}
return sent;
return -1;
}
int socket_parse(struct node *n, config_setting_t *cfg)
{
config_setting_t *cfg_netem;
const char *local, *remote, *layer, *hdr, *endian;
int ret;
@ -513,12 +508,12 @@ int socket_parse(struct node *n, config_setting_t *cfg)
}
if (!config_setting_lookup_string(cfg, "endian", &endian))
s->endian = MSG_ENDIAN_BIG;
s->endian = SOCKET_ENDIAN_BIG;
else {
if (!strcmp(endian, "big") || !strcmp(endian, "network"))
s->endian = MSG_ENDIAN_BIG;
s->endian = SOCKET_ENDIAN_BIG;
else if (!strcmp(endian, "little"))
s->endian = MSG_ENDIAN_LITTLE;
s->endian = SOCKET_ENDIAN_LITTLE;
else
cerror(cfg, "Invalid endianness type '%s' for node %s", endian, node_name(n));
}
@ -541,7 +536,7 @@ int socket_parse(struct node *n, config_setting_t *cfg)
remote, node_name(n), gai_strerror(ret));
}
config_setting_t *cfg_netem = config_setting_get_member(cfg, "netem");
cfg_netem = config_setting_get_member(cfg, "netem");
if (cfg_netem) {
int enabled = 1;
if (!config_setting_lookup_bool(cfg_netem, "enabled", &enabled) || enabled)