1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Add app_hdr tag for GT-NET-Socket

GT-NET-Socket uses a tag in the config file to distinguish from default
socket node type application layer header (struct msg). For now all the
values are encoded in the struct sample without any header (no timestamp
or sequence number).
Minor improvement in pipe.c: replace 'goto' with do while
This commit is contained in:
Umar Farooq 2016-07-17 01:01:43 +02:00
parent d589c71ab6
commit 2f47be0ac5
8 changed files with 190 additions and 93 deletions

View file

@ -35,6 +35,7 @@ LDLIBS = -pthread -lrt -lm -lconfig -lvillas
CFLAGS += -std=c11 -Iinclude/ -I. -MMD -mcx16
CFLAGS += -Wall -fdiagnostics-color=auto
CFLAGS += -D_GIT_REV='"$(GIT_REV)"' -D_POSIX_C_SOURCE=200809L -D_GNU_SOURCE=1 -DV=$(V)
#CFLAGS += -std=c99 -pedantic -ansi
LDFLAGS += -L. -Wl,-rpath,'$$ORIGIN'
# pkg-config dependencies

View file

@ -32,9 +32,11 @@ nodes = {
node1 = {
type = "socket",
layer = "udp",
local = "*:12000", # Local ip:port, use '*' for random port
local = "127.0.0.1:12000", # Local ip:port, use '*' for random port
remote = "127.0.0.1:12001",
combine = 5,
app_hdr = "gtskt", # app_hdr can be gtskt, deafult or none
#vectorize = 0, # number of samples to fetch per iteration from the socket. TODO correct vectorize logic
netem = {
enabled = false,
delay = 1000000,
@ -45,9 +47,10 @@ nodes = {
node2 = {
type = "socket",
layer = "udp",
local = "*:12002", # Local ip:port, use '*' for random port
remote = "127.0.0.1:12003"
combine = 30
local = "127.0.0.1:12001", # Local ip:port, use '*' for random port
remote = "127.0.0.1:12002",
combine = 30,
app_hdr = "gtskt"
}
};
@ -55,6 +58,6 @@ paths = (
{
in = "node1", # Name of the node we listen to (see above)
out = "node2", # And we loop back to the origin
hook = ["decimate", "print"]
hook = ["decimate:2", "print"]
}
);

View file

@ -31,7 +31,7 @@ enum sample_flags {
SAMPLE_OFFSET = 2,
SAMPLE_SEQUENCE = 4,
SAMPLE_VALUES = 8,
SAMPLE_ALL = 16-1
SAMPLE_ALL = 16-1
};
struct sample {

View file

@ -28,6 +28,12 @@ enum socket_layer {
LAYER_UDP
};
enum app_hdr_type {
APP_HDR_NONE, /** No header in the payload */
APP_HDR_GTSKT, /** No header in the payload, same as HDR_NONE*/
APP_HDR_DEFAULT /** Default header in the payload, (see msg_format.h) */
};
union sockaddr_union {
struct sockaddr sa;
struct sockaddr_in sin;
@ -48,6 +54,8 @@ struct socket {
union sockaddr_union local;
/** Remote address of the socket */
union sockaddr_union remote;
/** Payload header type */
enum app_hdr_type app_hdr;
/** libnl3: Network emulator queuing discipline */
struct rtnl_qdisc *tc_qdisc;

View file

@ -241,7 +241,7 @@ int config_parse_node(config_setting_t *cfg, struct list *nodes, struct settings
config_setting_t *cfg_vectorize = config_setting_lookup(cfg, "vectorize");
if (n->vectorize <= 0)
cerror(cfg_vectorize, "Invalid value for `vectorize`. Must be natural number!");
cerror(cfg_vectorize, "Invalid value for `vectorize` %d. Must be natural number!", n->vectorize);
if (vt->vectorize && vt->vectorize < n->vectorize)
cerror(cfg_vectorize, "Invalid value for `vectorize`. Node type %s requires a number smaller than %d!",
node_name_type(n), vt->vectorize);

View file

@ -28,7 +28,7 @@ static void path_write(struct path *p, bool resend)
/* The first message in the chunk which we want to send */
if (resend)
base = p->in->received - cnt; /* we simply resent the last vector of samples */
base = p->in->received - cnt; /* we simply resend the last vector of samples */
else {
base = n->sent;
}

View file

@ -36,6 +36,8 @@ static struct node_type vt;
/* Private static storage */
struct list interfaces;
uint8_t vector_length = 0; //TODO intialize vectorize option in socket with constant value
int socket_init(int argc, char * argv[], config_setting_t *cfg)
{
if (getuid() != 0)
@ -91,6 +93,7 @@ int socket_deinit()
return 0;
}
//TODO: Add app_header printing
char * socket_print(struct node *n)
{
struct socket *s = n->_vd;
@ -204,79 +207,123 @@ int socket_destroy(struct node *n)
int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
{
//TODO Vectorize setting...for gtskt set to 0--?
struct socket *s = n->_vd;
int samples, ret, received;
int samples, ret, received, smp_count, values_per_sample = 1;
ssize_t bytes;
struct msg msgs[cnt];
struct msg hdr;
struct iovec iov[2*cnt];
if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT)
smp_count = cnt;
else /** Default case if(s->app_hdr == HDR_DEFAULT)*/
smp_count = 2*cnt;
float sample_value;
struct iovec iov[smp_count];
struct msghdr mhdr = {
.msg_iov = iov
};
/* Peak into message header of the first sample and to get total packet size. */
bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC);
if (bytes < sizeof(struct msg) || bytes % 4 != 0) {
warn("Packet size is invalid");
return -1;
}
ret = msg_verify(&hdr);
if (ret) {
warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes);
return -1;
if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) {
bytes = recv(s->sd, &sample_value, SAMPLE_DATA_LEN(1), MSG_PEEK | MSG_TRUNC);
if (bytes < sizeof(float) || bytes % 4 != 0) {
warn("Packet size is invalid");
return -1;
}
samples = bytes / sizeof(sample_value);
if (samples > cnt) {
warn("Received more samples than supported. Dropping %u samples", samples - cnt);
samples = cnt;
}
/* We add one value per sample */
for (int i = 0; i < samples; i++) {
iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]);
iov[i].iov_len = SAMPLE_DATA_LEN(values_per_sample); //TODO add logic for number of values per sample
mhdr.msg_iovlen += 1;
}
/* Receive message from socket */
bytes = recvmsg(s->sd, &mhdr, 0);
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n));
else if (bytes < 0)
serror("Failed recv from node %s", node_name(n));
for (received = 0; received < samples; received++) {
struct sample *smp = smps[received];
smp->length = values_per_sample;
//TODO see if s->sequence value is needed
//TODO see if s->ts.origin and smp->ts.received value is needed, essentially requiring a header
}
}
/* Convert message to host endianess */
if (hdr.endian != MSG_ENDIAN_HOST)
msg_swap(&hdr);
samples = bytes / MSG_LEN(hdr.values);
if (samples > cnt) {
warn("Received more samples than supported. Dropping %u samples", samples - cnt);
samples = cnt;
}
/* We expect that all received samples have the same amount of values! */
for (int i = 0; i < samples; i++) {
iov[2*i+0].iov_base = &msgs[i];
iov[2*i+0].iov_len = MSG_LEN(0);
else { //if(s->app_hdr == APP_HDR_DEFAULT)
struct msg msgs[cnt];
struct msg hdr;
iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]);
iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values);
mhdr.msg_iovlen += 2;
}
/* Peak into message header of the first sample and to get total packet size. */
bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC);
if (bytes < sizeof(struct msg) || bytes % 4 != 0) {
warn("Packet size is invalid");
return -1;
}
/* Receive message from socket */
bytes = recvmsg(s->sd, &mhdr, 0);
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n));
else if (bytes < 0)
serror("Failed recv from node %s", node_name(n));
for (received = 0; received < samples; received++) {
struct msg *m = &msgs[received];
struct sample *s = smps[received];
ret = msg_verify(&hdr);
if (ret) {
warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes);
return -1;
}
ret = msg_verify(m);
if (ret)
break;
if (m->values != hdr.values)
break;
/* Convert message to host endianess */
if (m->endian != MSG_ENDIAN_HOST)
msg_swap(m);
if (hdr.endian != MSG_ENDIAN_HOST)
msg_swap(&hdr);
samples = bytes / MSG_LEN(hdr.values);
if (samples > cnt) {
warn("Received more samples than supported. Dropping %u samples", samples - cnt);
samples = cnt;
}
s->length = m->values;
s->sequence = m->sequence;
s->ts.origin = MSG_TS(m);
/* We expect that all received samples have the same amount of values! */
for (int i = 0; i < samples; i++) {
iov[2*i+0].iov_base = &msgs[i];
iov[2*i+0].iov_len = MSG_LEN(0);
iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]);
iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values);
mhdr.msg_iovlen += 2;
}
/* Receive message from socket */
bytes = recvmsg(s->sd, &mhdr, 0); //--? samples - cnt samples dropped
if (bytes == 0)
error("Remote node %s closed the connection", node_name(n));
else if (bytes < 0)
serror("Failed recv from node %s", node_name(n));
for (received = 0; received < samples; received++) {
struct msg *m = &msgs[received];
struct sample *smp = smps[received];
ret = msg_verify(m);
if (ret)
break;
if (m->values != hdr.values)
break;
/* Convert message to host endianess */
if (m->endian != MSG_ENDIAN_HOST)
msg_swap(m);
smp->length = m->values;
smp->sequence = m->sequence;
smp->ts.origin = MSG_TS(m);
}
}
debug(DBG_SOCKET | 17, "Received message of %zd bytes: %u samples", bytes, received);
@ -288,28 +335,44 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
{
struct socket *s = n->_vd;
ssize_t bytes;
struct msg msgs[cnt];
struct iovec iov[2*cnt];
int smp_count, values_per_sample = 1;
if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT)
smp_count = cnt;
else /** Default case if(s->app_hdr == APP_HDR_DEFAULT)*/
smp_count = 2*cnt;
struct iovec iov[smp_count];
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = ARRAY_LEN(iov)
};
/* Construct iovecs */
for (int i = 0; i < cnt; i++) {
msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence);
msgs[i].ts.sec = smps[i]->ts.origin.tv_sec;
msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec;
iov[i*2+0].iov_base = &msgs[i];
iov[i*2+0].iov_len = MSG_LEN(0);
iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]);
iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length);
if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) {
for (int i = 0; i < cnt; i++) {
iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]);
iov[i].iov_len = SAMPLE_DATA_LEN(values_per_sample);
}
}
else { //if(s->app_hdr == APP_HDR_DEFAULT
struct msg msgs[cnt];
for (int i = 0; i < cnt; i++) {
msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence);
msgs[i].ts.sec = smps[i]->ts.origin.tv_sec;
msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec;
iov[i*2+0].iov_base = &msgs[i];
iov[i*2+0].iov_len = MSG_LEN(0);
iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]);
iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length);
}
}
/* Specify destination address for connection-less procotols */
switch (s->layer) {
case LAYER_UDP:
@ -332,7 +395,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
int socket_parse(struct node *n, config_setting_t *cfg)
{
const char *local, *remote, *layer;
const char *local, *remote, *layer, *app_hdr, *vectorize;
int ret;
struct socket *s = n->_vd;
@ -355,6 +418,24 @@ int socket_parse(struct node *n, config_setting_t *cfg)
if (!config_setting_lookup_string(cfg, "local", &local))
cerror(cfg, "Missing local address for node %s", node_name(n));
if (!config_setting_lookup_string(cfg, "app_hdr", &app_hdr))
cerror(cfg, "Missing application header config for node %s", node_name(n));
if(!strcmp(app_hdr, "gtskt"))
s->app_hdr = APP_HDR_GTSKT;
else if(!strcmp(app_hdr, "none"))
s->app_hdr = APP_HDR_NONE;
else if(!strcmp(app_hdr, "default"))
s->app_hdr = APP_HDR_DEFAULT;
else
cerror(cfg, "Invalid application header type '%s' for node %s", app_hdr, node_name(n));
if (!config_setting_lookup_string(cfg, "vectorize", &vectorize)) {
info("Setting vectorize value to %d", vector_length);
vector_length = 0;
}
vector_length = strtol(vectorize, NULL, 0);
ret = socket_parse_addr(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE);
if (ret) {
cerror(cfg, "Failed to resolve local address '%s' of node %s: %s",
@ -515,7 +596,8 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye
static struct node_type vt = {
.name = "socket",
.description = "BSD network sockets",
.vectorize = 0, /* unlimited */
//.vectorize = vector_length, ////TODO intialize vectorize option in socket with constant value
.vectorize = 0,
.size = sizeof(struct socket),
.destroy = socket_destroy,
.reverse = socket_reverse,

View file

@ -86,17 +86,20 @@ void * send_loop(void *ctx)
for (;;) {
for (int i = 0; i < node->vectorize; i++) {
struct sample *s = smps[i];
int reason;
int reason, retry;
retry: reason = sample_fscan(stdin, s, NULL);
if (reason < 0) {
if (feof(stdin))
return NULL;
else {
warn("Skipped invalid message message: reason=%d", reason);
goto retry;
do {
retry = 0;
reason = sample_fscan(stdin, s, NULL);
if (reason < 0) {
if (feof(stdin))
return NULL;
else {
warn("Skipped invalid message message: reason=%d", reason);
retry = 1;
}
}
}
} while (retry);
}
node_write(node, smps, node->vectorize);