diff --git a/Makefile b/Makefile index c917c7e08..69fd1b4e9 100644 --- a/Makefile +++ b/Makefile @@ -35,6 +35,7 @@ LDLIBS = -pthread -lrt -lm -lconfig -lvillas CFLAGS += -std=c11 -Iinclude/ -I. -MMD -mcx16 CFLAGS += -Wall -fdiagnostics-color=auto CFLAGS += -D_GIT_REV='"$(GIT_REV)"' -D_POSIX_C_SOURCE=200809L -D_GNU_SOURCE=1 -DV=$(V) +#CFLAGS += -std=c99 -pedantic -ansi LDFLAGS += -L. -Wl,-rpath,'$$ORIGIN' # pkg-config dependencies diff --git a/etc/loopback.conf b/etc/loopback.conf index 9939254ff..572556516 100644 --- a/etc/loopback.conf +++ b/etc/loopback.conf @@ -32,9 +32,11 @@ nodes = { node1 = { type = "socket", layer = "udp", - local = "*:12000", # Local ip:port, use '*' for random port + local = "127.0.0.1:12000", # Local ip:port, use '*' for random port remote = "127.0.0.1:12001", combine = 5, + app_hdr = "gtskt", # app_hdr can be gtskt, deafult or none + #vectorize = 0, # number of samples to fetch per iteration from the socket. TODO correct vectorize logic netem = { enabled = false, delay = 1000000, @@ -45,9 +47,10 @@ nodes = { node2 = { type = "socket", layer = "udp", - local = "*:12002", # Local ip:port, use '*' for random port - remote = "127.0.0.1:12003" - combine = 30 + local = "127.0.0.1:12001", # Local ip:port, use '*' for random port + remote = "127.0.0.1:12002", + combine = 30, + app_hdr = "gtskt" } }; @@ -55,6 +58,6 @@ paths = ( { in = "node1", # Name of the node we listen to (see above) out = "node2", # And we loop back to the origin - hook = ["decimate", "print"] + hook = ["decimate:2", "print"] } ); diff --git a/include/sample.h b/include/sample.h index 2a6c02767..a73c016ce 100644 --- a/include/sample.h +++ b/include/sample.h @@ -31,7 +31,7 @@ enum sample_flags { SAMPLE_OFFSET = 2, SAMPLE_SEQUENCE = 4, SAMPLE_VALUES = 8, - SAMPLE_ALL = 16-1 + SAMPLE_ALL = 16-1 }; struct sample { diff --git a/include/socket.h b/include/socket.h index 4af49ed20..6c517ac97 100644 --- a/include/socket.h +++ b/include/socket.h @@ -28,6 +28,12 @@ enum socket_layer { LAYER_UDP }; +enum app_hdr_type { + APP_HDR_NONE, /** No header in the payload */ + APP_HDR_GTSKT, /** No header in the payload, same as HDR_NONE*/ + APP_HDR_DEFAULT /** Default header in the payload, (see msg_format.h) */ +}; + union sockaddr_union { struct sockaddr sa; struct sockaddr_in sin; @@ -48,6 +54,8 @@ struct socket { union sockaddr_union local; /** Remote address of the socket */ union sockaddr_union remote; + /** Payload header type */ + enum app_hdr_type app_hdr; /** libnl3: Network emulator queuing discipline */ struct rtnl_qdisc *tc_qdisc; diff --git a/lib/cfg.c b/lib/cfg.c index 8498f71c2..126e483e5 100644 --- a/lib/cfg.c +++ b/lib/cfg.c @@ -241,7 +241,7 @@ int config_parse_node(config_setting_t *cfg, struct list *nodes, struct settings config_setting_t *cfg_vectorize = config_setting_lookup(cfg, "vectorize"); if (n->vectorize <= 0) - cerror(cfg_vectorize, "Invalid value for `vectorize`. Must be natural number!"); + cerror(cfg_vectorize, "Invalid value for `vectorize` %d. Must be natural number!", n->vectorize); if (vt->vectorize && vt->vectorize < n->vectorize) cerror(cfg_vectorize, "Invalid value for `vectorize`. Node type %s requires a number smaller than %d!", node_name_type(n), vt->vectorize); diff --git a/lib/path.c b/lib/path.c index 585c6b20b..403721fb3 100644 --- a/lib/path.c +++ b/lib/path.c @@ -28,7 +28,7 @@ static void path_write(struct path *p, bool resend) /* The first message in the chunk which we want to send */ if (resend) - base = p->in->received - cnt; /* we simply resent the last vector of samples */ + base = p->in->received - cnt; /* we simply resend the last vector of samples */ else { base = n->sent; } diff --git a/lib/socket.c b/lib/socket.c index db84f136d..c0e94da28 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -36,6 +36,8 @@ static struct node_type vt; /* Private static storage */ struct list interfaces; +uint8_t vector_length = 0; //TODO intialize vectorize option in socket with constant value + int socket_init(int argc, char * argv[], config_setting_t *cfg) { if (getuid() != 0) @@ -91,6 +93,7 @@ int socket_deinit() return 0; } +//TODO: Add app_header printing char * socket_print(struct node *n) { struct socket *s = n->_vd; @@ -204,79 +207,123 @@ int socket_destroy(struct node *n) int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { + //TODO Vectorize setting...for gtskt set to 0--? + struct socket *s = n->_vd; - int samples, ret, received; + int samples, ret, received, smp_count, values_per_sample = 1; ssize_t bytes; - - struct msg msgs[cnt]; - struct msg hdr; - - struct iovec iov[2*cnt]; + + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) + smp_count = cnt; + else /** Default case if(s->app_hdr == HDR_DEFAULT)*/ + smp_count = 2*cnt; + + float sample_value; + struct iovec iov[smp_count]; struct msghdr mhdr = { .msg_iov = iov }; - /* Peak into message header of the first sample and to get total packet size. */ - bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); - if (bytes < sizeof(struct msg) || bytes % 4 != 0) { - warn("Packet size is invalid"); - return -1; - } - - ret = msg_verify(&hdr); - if (ret) { - warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); - return -1; + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) { + bytes = recv(s->sd, &sample_value, SAMPLE_DATA_LEN(1), MSG_PEEK | MSG_TRUNC); + if (bytes < sizeof(float) || bytes % 4 != 0) { + warn("Packet size is invalid"); + return -1; + } + + samples = bytes / sizeof(sample_value); + + if (samples > cnt) { + warn("Received more samples than supported. Dropping %u samples", samples - cnt); + samples = cnt; + } + /* We add one value per sample */ + for (int i = 0; i < samples; i++) { + iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[i].iov_len = SAMPLE_DATA_LEN(values_per_sample); //TODO add logic for number of values per sample + mhdr.msg_iovlen += 1; + } + /* Receive message from socket */ + bytes = recvmsg(s->sd, &mhdr, 0); + if (bytes == 0) + error("Remote node %s closed the connection", node_name(n)); + else if (bytes < 0) + serror("Failed recv from node %s", node_name(n)); + + for (received = 0; received < samples; received++) { + struct sample *smp = smps[received]; + smp->length = values_per_sample; + //TODO see if s->sequence value is needed + //TODO see if s->ts.origin and smp->ts.received value is needed, essentially requiring a header + } } - /* Convert message to host endianess */ - if (hdr.endian != MSG_ENDIAN_HOST) - msg_swap(&hdr); - - samples = bytes / MSG_LEN(hdr.values); - - if (samples > cnt) { - warn("Received more samples than supported. Dropping %u samples", samples - cnt); - samples = cnt; - } - - /* We expect that all received samples have the same amount of values! */ - for (int i = 0; i < samples; i++) { - iov[2*i+0].iov_base = &msgs[i]; - iov[2*i+0].iov_len = MSG_LEN(0); + else { //if(s->app_hdr == APP_HDR_DEFAULT) + struct msg msgs[cnt]; + struct msg hdr; - iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values); - - mhdr.msg_iovlen += 2; - } + /* Peak into message header of the first sample and to get total packet size. */ + bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); + if (bytes < sizeof(struct msg) || bytes % 4 != 0) { + warn("Packet size is invalid"); + return -1; + } - /* Receive message from socket */ - bytes = recvmsg(s->sd, &mhdr, 0); - if (bytes == 0) - error("Remote node %s closed the connection", node_name(n)); - else if (bytes < 0) - serror("Failed recv from node %s", node_name(n)); - - for (received = 0; received < samples; received++) { - struct msg *m = &msgs[received]; - struct sample *s = smps[received]; + ret = msg_verify(&hdr); + if (ret) { + warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); + return -1; + } - ret = msg_verify(m); - if (ret) - break; - - if (m->values != hdr.values) - break; - /* Convert message to host endianess */ - if (m->endian != MSG_ENDIAN_HOST) - msg_swap(m); + if (hdr.endian != MSG_ENDIAN_HOST) + msg_swap(&hdr); + + samples = bytes / MSG_LEN(hdr.values); + + if (samples > cnt) { + warn("Received more samples than supported. Dropping %u samples", samples - cnt); + samples = cnt; + } - s->length = m->values; - s->sequence = m->sequence; - s->ts.origin = MSG_TS(m); + /* We expect that all received samples have the same amount of values! */ + for (int i = 0; i < samples; i++) { + iov[2*i+0].iov_base = &msgs[i]; + iov[2*i+0].iov_len = MSG_LEN(0); + + iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values); + + mhdr.msg_iovlen += 2; + } + + /* Receive message from socket */ + bytes = recvmsg(s->sd, &mhdr, 0); //--? samples - cnt samples dropped + if (bytes == 0) + error("Remote node %s closed the connection", node_name(n)); + else if (bytes < 0) + serror("Failed recv from node %s", node_name(n)); + + for (received = 0; received < samples; received++) { + struct msg *m = &msgs[received]; + struct sample *smp = smps[received]; + + ret = msg_verify(m); + if (ret) + break; + + if (m->values != hdr.values) + break; + + /* Convert message to host endianess */ + if (m->endian != MSG_ENDIAN_HOST) + msg_swap(m); + + smp->length = m->values; + smp->sequence = m->sequence; + smp->ts.origin = MSG_TS(m); + } } debug(DBG_SOCKET | 17, "Received message of %zd bytes: %u samples", bytes, received); @@ -288,28 +335,44 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; ssize_t bytes; - - struct msg msgs[cnt]; - struct iovec iov[2*cnt]; + + int smp_count, values_per_sample = 1; + + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) + smp_count = cnt; + else /** Default case if(s->app_hdr == APP_HDR_DEFAULT)*/ + smp_count = 2*cnt; + + struct iovec iov[smp_count]; struct msghdr mhdr = { .msg_iov = iov, .msg_iovlen = ARRAY_LEN(iov) }; - + /* Construct iovecs */ - for (int i = 0; i < cnt; i++) { - msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); - - msgs[i].ts.sec = smps[i]->ts.origin.tv_sec; - msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec; - - iov[i*2+0].iov_base = &msgs[i]; - iov[i*2+0].iov_len = MSG_LEN(0); - - iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) { + for (int i = 0; i < cnt; i++) { + iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[i].iov_len = SAMPLE_DATA_LEN(values_per_sample); + } } + else { //if(s->app_hdr == APP_HDR_DEFAULT + struct msg msgs[cnt]; + for (int i = 0; i < cnt; i++) { + + msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); + + msgs[i].ts.sec = smps[i]->ts.origin.tv_sec; + msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec; + + iov[i*2+0].iov_base = &msgs[i]; + iov[i*2+0].iov_len = MSG_LEN(0); + iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); + } + } + /* Specify destination address for connection-less procotols */ switch (s->layer) { case LAYER_UDP: @@ -332,7 +395,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) int socket_parse(struct node *n, config_setting_t *cfg) { - const char *local, *remote, *layer; + const char *local, *remote, *layer, *app_hdr, *vectorize; int ret; struct socket *s = n->_vd; @@ -355,6 +418,24 @@ int socket_parse(struct node *n, config_setting_t *cfg) if (!config_setting_lookup_string(cfg, "local", &local)) cerror(cfg, "Missing local address for node %s", node_name(n)); + if (!config_setting_lookup_string(cfg, "app_hdr", &app_hdr)) + cerror(cfg, "Missing application header config for node %s", node_name(n)); + + if(!strcmp(app_hdr, "gtskt")) + s->app_hdr = APP_HDR_GTSKT; + else if(!strcmp(app_hdr, "none")) + s->app_hdr = APP_HDR_NONE; + else if(!strcmp(app_hdr, "default")) + s->app_hdr = APP_HDR_DEFAULT; + else + cerror(cfg, "Invalid application header type '%s' for node %s", app_hdr, node_name(n)); + + if (!config_setting_lookup_string(cfg, "vectorize", &vectorize)) { + info("Setting vectorize value to %d", vector_length); + vector_length = 0; + } + vector_length = strtol(vectorize, NULL, 0); + ret = socket_parse_addr(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE); if (ret) { cerror(cfg, "Failed to resolve local address '%s' of node %s: %s", @@ -515,7 +596,8 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye static struct node_type vt = { .name = "socket", .description = "BSD network sockets", - .vectorize = 0, /* unlimited */ + //.vectorize = vector_length, ////TODO intialize vectorize option in socket with constant value + .vectorize = 0, .size = sizeof(struct socket), .destroy = socket_destroy, .reverse = socket_reverse, diff --git a/src/pipe.c b/src/pipe.c index 32b426d6b..732ea8aee 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -86,17 +86,20 @@ void * send_loop(void *ctx) for (;;) { for (int i = 0; i < node->vectorize; i++) { struct sample *s = smps[i]; - int reason; + int reason, retry; -retry: reason = sample_fscan(stdin, s, NULL); - if (reason < 0) { - if (feof(stdin)) - return NULL; - else { - warn("Skipped invalid message message: reason=%d", reason); - goto retry; + do { + retry = 0; + reason = sample_fscan(stdin, s, NULL); + if (reason < 0) { + if (feof(stdin)) + return NULL; + else { + warn("Skipped invalid message message: reason=%d", reason); + retry = 1; + } } - } + } while (retry); } node_write(node, smps, node->vectorize);