From 2f47be0ac5ef758dcdffffc34bd4d0e22437c36f Mon Sep 17 00:00:00 2001 From: Umar Farooq Date: Sun, 17 Jul 2016 01:01:43 +0200 Subject: [PATCH] Add app_hdr tag for GT-NET-Socket GT-NET-Socket uses a tag in the config file to distinguish from default socket node type application layer header (struct msg). For now all the values are encoded in the struct sample without any header (no timestamp or sequence number). Minor improvement in pipe.c: replace 'goto' with do while --- Makefile | 1 + etc/loopback.conf | 13 ++- include/sample.h | 2 +- include/socket.h | 8 ++ lib/cfg.c | 2 +- lib/path.c | 2 +- lib/socket.c | 234 +++++++++++++++++++++++++++++++--------------- src/pipe.c | 21 +++-- 8 files changed, 190 insertions(+), 93 deletions(-) diff --git a/Makefile b/Makefile index c917c7e08..69fd1b4e9 100644 --- a/Makefile +++ b/Makefile @@ -35,6 +35,7 @@ LDLIBS = -pthread -lrt -lm -lconfig -lvillas CFLAGS += -std=c11 -Iinclude/ -I. -MMD -mcx16 CFLAGS += -Wall -fdiagnostics-color=auto CFLAGS += -D_GIT_REV='"$(GIT_REV)"' -D_POSIX_C_SOURCE=200809L -D_GNU_SOURCE=1 -DV=$(V) +#CFLAGS += -std=c99 -pedantic -ansi LDFLAGS += -L. -Wl,-rpath,'$$ORIGIN' # pkg-config dependencies diff --git a/etc/loopback.conf b/etc/loopback.conf index 9939254ff..572556516 100644 --- a/etc/loopback.conf +++ b/etc/loopback.conf @@ -32,9 +32,11 @@ nodes = { node1 = { type = "socket", layer = "udp", - local = "*:12000", # Local ip:port, use '*' for random port + local = "127.0.0.1:12000", # Local ip:port, use '*' for random port remote = "127.0.0.1:12001", combine = 5, + app_hdr = "gtskt", # app_hdr can be gtskt, deafult or none + #vectorize = 0, # number of samples to fetch per iteration from the socket. TODO correct vectorize logic netem = { enabled = false, delay = 1000000, @@ -45,9 +47,10 @@ nodes = { node2 = { type = "socket", layer = "udp", - local = "*:12002", # Local ip:port, use '*' for random port - remote = "127.0.0.1:12003" - combine = 30 + local = "127.0.0.1:12001", # Local ip:port, use '*' for random port + remote = "127.0.0.1:12002", + combine = 30, + app_hdr = "gtskt" } }; @@ -55,6 +58,6 @@ paths = ( { in = "node1", # Name of the node we listen to (see above) out = "node2", # And we loop back to the origin - hook = ["decimate", "print"] + hook = ["decimate:2", "print"] } ); diff --git a/include/sample.h b/include/sample.h index 2a6c02767..a73c016ce 100644 --- a/include/sample.h +++ b/include/sample.h @@ -31,7 +31,7 @@ enum sample_flags { SAMPLE_OFFSET = 2, SAMPLE_SEQUENCE = 4, SAMPLE_VALUES = 8, - SAMPLE_ALL = 16-1 + SAMPLE_ALL = 16-1 }; struct sample { diff --git a/include/socket.h b/include/socket.h index 4af49ed20..6c517ac97 100644 --- a/include/socket.h +++ b/include/socket.h @@ -28,6 +28,12 @@ enum socket_layer { LAYER_UDP }; +enum app_hdr_type { + APP_HDR_NONE, /** No header in the payload */ + APP_HDR_GTSKT, /** No header in the payload, same as HDR_NONE*/ + APP_HDR_DEFAULT /** Default header in the payload, (see msg_format.h) */ +}; + union sockaddr_union { struct sockaddr sa; struct sockaddr_in sin; @@ -48,6 +54,8 @@ struct socket { union sockaddr_union local; /** Remote address of the socket */ union sockaddr_union remote; + /** Payload header type */ + enum app_hdr_type app_hdr; /** libnl3: Network emulator queuing discipline */ struct rtnl_qdisc *tc_qdisc; diff --git a/lib/cfg.c b/lib/cfg.c index 8498f71c2..126e483e5 100644 --- a/lib/cfg.c +++ b/lib/cfg.c @@ -241,7 +241,7 @@ int config_parse_node(config_setting_t *cfg, struct list *nodes, struct settings config_setting_t *cfg_vectorize = config_setting_lookup(cfg, "vectorize"); if (n->vectorize <= 0) - cerror(cfg_vectorize, "Invalid value for `vectorize`. Must be natural number!"); + cerror(cfg_vectorize, "Invalid value for `vectorize` %d. Must be natural number!", n->vectorize); if (vt->vectorize && vt->vectorize < n->vectorize) cerror(cfg_vectorize, "Invalid value for `vectorize`. Node type %s requires a number smaller than %d!", node_name_type(n), vt->vectorize); diff --git a/lib/path.c b/lib/path.c index 585c6b20b..403721fb3 100644 --- a/lib/path.c +++ b/lib/path.c @@ -28,7 +28,7 @@ static void path_write(struct path *p, bool resend) /* The first message in the chunk which we want to send */ if (resend) - base = p->in->received - cnt; /* we simply resent the last vector of samples */ + base = p->in->received - cnt; /* we simply resend the last vector of samples */ else { base = n->sent; } diff --git a/lib/socket.c b/lib/socket.c index db84f136d..c0e94da28 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -36,6 +36,8 @@ static struct node_type vt; /* Private static storage */ struct list interfaces; +uint8_t vector_length = 0; //TODO intialize vectorize option in socket with constant value + int socket_init(int argc, char * argv[], config_setting_t *cfg) { if (getuid() != 0) @@ -91,6 +93,7 @@ int socket_deinit() return 0; } +//TODO: Add app_header printing char * socket_print(struct node *n) { struct socket *s = n->_vd; @@ -204,79 +207,123 @@ int socket_destroy(struct node *n) int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { + //TODO Vectorize setting...for gtskt set to 0--? + struct socket *s = n->_vd; - int samples, ret, received; + int samples, ret, received, smp_count, values_per_sample = 1; ssize_t bytes; - - struct msg msgs[cnt]; - struct msg hdr; - - struct iovec iov[2*cnt]; + + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) + smp_count = cnt; + else /** Default case if(s->app_hdr == HDR_DEFAULT)*/ + smp_count = 2*cnt; + + float sample_value; + struct iovec iov[smp_count]; struct msghdr mhdr = { .msg_iov = iov }; - /* Peak into message header of the first sample and to get total packet size. */ - bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); - if (bytes < sizeof(struct msg) || bytes % 4 != 0) { - warn("Packet size is invalid"); - return -1; - } - - ret = msg_verify(&hdr); - if (ret) { - warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); - return -1; + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) { + bytes = recv(s->sd, &sample_value, SAMPLE_DATA_LEN(1), MSG_PEEK | MSG_TRUNC); + if (bytes < sizeof(float) || bytes % 4 != 0) { + warn("Packet size is invalid"); + return -1; + } + + samples = bytes / sizeof(sample_value); + + if (samples > cnt) { + warn("Received more samples than supported. Dropping %u samples", samples - cnt); + samples = cnt; + } + /* We add one value per sample */ + for (int i = 0; i < samples; i++) { + iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[i].iov_len = SAMPLE_DATA_LEN(values_per_sample); //TODO add logic for number of values per sample + mhdr.msg_iovlen += 1; + } + /* Receive message from socket */ + bytes = recvmsg(s->sd, &mhdr, 0); + if (bytes == 0) + error("Remote node %s closed the connection", node_name(n)); + else if (bytes < 0) + serror("Failed recv from node %s", node_name(n)); + + for (received = 0; received < samples; received++) { + struct sample *smp = smps[received]; + smp->length = values_per_sample; + //TODO see if s->sequence value is needed + //TODO see if s->ts.origin and smp->ts.received value is needed, essentially requiring a header + } } - /* Convert message to host endianess */ - if (hdr.endian != MSG_ENDIAN_HOST) - msg_swap(&hdr); - - samples = bytes / MSG_LEN(hdr.values); - - if (samples > cnt) { - warn("Received more samples than supported. Dropping %u samples", samples - cnt); - samples = cnt; - } - - /* We expect that all received samples have the same amount of values! */ - for (int i = 0; i < samples; i++) { - iov[2*i+0].iov_base = &msgs[i]; - iov[2*i+0].iov_len = MSG_LEN(0); + else { //if(s->app_hdr == APP_HDR_DEFAULT) + struct msg msgs[cnt]; + struct msg hdr; - iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values); - - mhdr.msg_iovlen += 2; - } + /* Peak into message header of the first sample and to get total packet size. */ + bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); + if (bytes < sizeof(struct msg) || bytes % 4 != 0) { + warn("Packet size is invalid"); + return -1; + } - /* Receive message from socket */ - bytes = recvmsg(s->sd, &mhdr, 0); - if (bytes == 0) - error("Remote node %s closed the connection", node_name(n)); - else if (bytes < 0) - serror("Failed recv from node %s", node_name(n)); - - for (received = 0; received < samples; received++) { - struct msg *m = &msgs[received]; - struct sample *s = smps[received]; + ret = msg_verify(&hdr); + if (ret) { + warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); + return -1; + } - ret = msg_verify(m); - if (ret) - break; - - if (m->values != hdr.values) - break; - /* Convert message to host endianess */ - if (m->endian != MSG_ENDIAN_HOST) - msg_swap(m); + if (hdr.endian != MSG_ENDIAN_HOST) + msg_swap(&hdr); + + samples = bytes / MSG_LEN(hdr.values); + + if (samples > cnt) { + warn("Received more samples than supported. Dropping %u samples", samples - cnt); + samples = cnt; + } - s->length = m->values; - s->sequence = m->sequence; - s->ts.origin = MSG_TS(m); + /* We expect that all received samples have the same amount of values! */ + for (int i = 0; i < samples; i++) { + iov[2*i+0].iov_base = &msgs[i]; + iov[2*i+0].iov_len = MSG_LEN(0); + + iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values); + + mhdr.msg_iovlen += 2; + } + + /* Receive message from socket */ + bytes = recvmsg(s->sd, &mhdr, 0); //--? samples - cnt samples dropped + if (bytes == 0) + error("Remote node %s closed the connection", node_name(n)); + else if (bytes < 0) + serror("Failed recv from node %s", node_name(n)); + + for (received = 0; received < samples; received++) { + struct msg *m = &msgs[received]; + struct sample *smp = smps[received]; + + ret = msg_verify(m); + if (ret) + break; + + if (m->values != hdr.values) + break; + + /* Convert message to host endianess */ + if (m->endian != MSG_ENDIAN_HOST) + msg_swap(m); + + smp->length = m->values; + smp->sequence = m->sequence; + smp->ts.origin = MSG_TS(m); + } } debug(DBG_SOCKET | 17, "Received message of %zd bytes: %u samples", bytes, received); @@ -288,28 +335,44 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; ssize_t bytes; - - struct msg msgs[cnt]; - struct iovec iov[2*cnt]; + + int smp_count, values_per_sample = 1; + + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) + smp_count = cnt; + else /** Default case if(s->app_hdr == APP_HDR_DEFAULT)*/ + smp_count = 2*cnt; + + struct iovec iov[smp_count]; struct msghdr mhdr = { .msg_iov = iov, .msg_iovlen = ARRAY_LEN(iov) }; - + /* Construct iovecs */ - for (int i = 0; i < cnt; i++) { - msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); - - msgs[i].ts.sec = smps[i]->ts.origin.tv_sec; - msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec; - - iov[i*2+0].iov_base = &msgs[i]; - iov[i*2+0].iov_len = MSG_LEN(0); - - iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) { + for (int i = 0; i < cnt; i++) { + iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[i].iov_len = SAMPLE_DATA_LEN(values_per_sample); + } } + else { //if(s->app_hdr == APP_HDR_DEFAULT + struct msg msgs[cnt]; + for (int i = 0; i < cnt; i++) { + + msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); + + msgs[i].ts.sec = smps[i]->ts.origin.tv_sec; + msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec; + + iov[i*2+0].iov_base = &msgs[i]; + iov[i*2+0].iov_len = MSG_LEN(0); + iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); + } + } + /* Specify destination address for connection-less procotols */ switch (s->layer) { case LAYER_UDP: @@ -332,7 +395,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) int socket_parse(struct node *n, config_setting_t *cfg) { - const char *local, *remote, *layer; + const char *local, *remote, *layer, *app_hdr, *vectorize; int ret; struct socket *s = n->_vd; @@ -355,6 +418,24 @@ int socket_parse(struct node *n, config_setting_t *cfg) if (!config_setting_lookup_string(cfg, "local", &local)) cerror(cfg, "Missing local address for node %s", node_name(n)); + if (!config_setting_lookup_string(cfg, "app_hdr", &app_hdr)) + cerror(cfg, "Missing application header config for node %s", node_name(n)); + + if(!strcmp(app_hdr, "gtskt")) + s->app_hdr = APP_HDR_GTSKT; + else if(!strcmp(app_hdr, "none")) + s->app_hdr = APP_HDR_NONE; + else if(!strcmp(app_hdr, "default")) + s->app_hdr = APP_HDR_DEFAULT; + else + cerror(cfg, "Invalid application header type '%s' for node %s", app_hdr, node_name(n)); + + if (!config_setting_lookup_string(cfg, "vectorize", &vectorize)) { + info("Setting vectorize value to %d", vector_length); + vector_length = 0; + } + vector_length = strtol(vectorize, NULL, 0); + ret = socket_parse_addr(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE); if (ret) { cerror(cfg, "Failed to resolve local address '%s' of node %s: %s", @@ -515,7 +596,8 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye static struct node_type vt = { .name = "socket", .description = "BSD network sockets", - .vectorize = 0, /* unlimited */ + //.vectorize = vector_length, ////TODO intialize vectorize option in socket with constant value + .vectorize = 0, .size = sizeof(struct socket), .destroy = socket_destroy, .reverse = socket_reverse, diff --git a/src/pipe.c b/src/pipe.c index 32b426d6b..732ea8aee 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -86,17 +86,20 @@ void * send_loop(void *ctx) for (;;) { for (int i = 0; i < node->vectorize; i++) { struct sample *s = smps[i]; - int reason; + int reason, retry; -retry: reason = sample_fscan(stdin, s, NULL); - if (reason < 0) { - if (feof(stdin)) - return NULL; - else { - warn("Skipped invalid message message: reason=%d", reason); - goto retry; + do { + retry = 0; + reason = sample_fscan(stdin, s, NULL); + if (reason < 0) { + if (feof(stdin)) + return NULL; + else { + warn("Skipped invalid message message: reason=%d", reason); + retry = 1; + } } - } + } while (retry); } node_write(node, smps, node->vectorize);