From 04aad098bbfa12b27046ea1ac4238625409fc8ad Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 13 Jun 2016 21:41:55 +0200 Subject: [PATCH 01/10] fixed some whitespace --- include/msg_format.h | 2 +- lib/websocket.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/msg_format.h b/include/msg_format.h index 3762825f2..497fc4f1a 100644 --- a/include/msg_format.h +++ b/include/msg_format.h @@ -13,7 +13,7 @@ #include #ifdef __linux__ - #define _BSD_SOURCE 1 + #define _BSD_SOURCE 1 #include #elif defined(__PPC__) /* Xilinx toolchain */ #include diff --git a/lib/websocket.c b/lib/websocket.c index c00c7e8bd..effc164b1 100644 --- a/lib/websocket.c +++ b/lib/websocket.c @@ -310,7 +310,7 @@ found: * (void **) user = n; break; } - pthread_mutex_unlock(&w->read.mutex); + pthread_mutex_unlock(&w->read.mutex); pthread_cond_broadcast(&w->read.cond); /* new data available, wake-up websocket_read() */ return 0; From 35bfd02b13bab78cfda01173ac014e08bc1722dd Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 13 Jun 2016 21:43:08 +0200 Subject: [PATCH 02/10] added debug facilities to debug output --- lib/gtfpga.c | 6 +++--- lib/opal.c | 14 +++++++------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/gtfpga.c b/lib/gtfpga.c index 72dbc5cc4..0ea4c721e 100644 --- a/lib/gtfpga.c +++ b/lib/gtfpga.c @@ -129,7 +129,7 @@ static int gtfpga_load_driver(struct pci_dev *d) if (!f) serror("Failed to add PCI id to uio_pci_generic driver"); - debug(5, "Adding ID to uio_pci_generic module: %04x %04x", d->vendor_id, d->device_id); + debug(DBG_GTFPGA | 5, "Adding ID to uio_pci_generic module: %04x %04x", d->vendor_id, d->device_id); fprintf(f, "%04x %04x", d->vendor_id, d->device_id); fclose(f); @@ -138,7 +138,7 @@ static int gtfpga_load_driver(struct pci_dev *d) if (!f) serror("Failed to add PCI id to uio_pci_generic driver"); - debug(5, "Bind slot to uio_pci_generic module: %s", slot); + debug(DBG_GTFPGA | 5, "Bind slot to uio_pci_generic module: %s", slot); fprintf(f, "%s\n", slot); fclose(f); @@ -168,7 +168,7 @@ static int gtfpga_mmap(struct gtfpga *g) int size = g->dev->size[GTFPGA_BAR]; /* mmap() first BAR */ - debug(5, "Setup mapping: mmap(NULL, %#x, PROT_READ | PROT_WRITE, MAP_SHARED, %u, %#lx)", size, fd, addr); + debug(DBG_GTFPGA | 5, "Setup mapping: mmap(NULL, %#x, PROT_READ | PROT_WRITE, MAP_SHARED, %u, %#lx)", size, fd, addr); void *map = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, addr); if (map == MAP_FAILED) serror("Failed mmap()"); diff --git a/lib/opal.c b/lib/opal.c index d6a268384..54087534c 100644 --- a/lib/opal.c +++ b/lib/opal.c @@ -88,7 +88,7 @@ int opal_deinit() if (err != EOK) error("Failed to close shared memory area (%d)", err); - debug(4, "Closing OPAL shared memory mapping"); + debug(DBG_OPAL | 4, "Closing OPAL shared memory mapping"); err = OpalSystemCtrl_UnRegister(print_shmem_name); if (err != EOK) @@ -104,7 +104,7 @@ int opal_deinit() int opal_print_global() { - debug(2, "Controller ID: %u", params.controllerID); + debug(DBG_OPAL | 2, "Controller ID: %u", params.controllerID); char *sbuf = alloc(send_icons * 5); char *rbuf = alloc(recv_icons * 5); @@ -114,17 +114,17 @@ int opal_print_global() for (int i = 0; i < recv_icons; i++) strcatf(&rbuf, "%u ", recv_ids[i]); - debug(2, "Send Blocks: %s", sbuf); - debug(2, "Receive Blocks: %s", rbuf); + debug(DBG_OPAL | 2, "Send Blocks: %s", sbuf); + debug(DBG_OPAL | 2, "Receive Blocks: %s", rbuf); free(sbuf); free(rbuf); - debug(2, "Control Block Parameters:"); + debug(DBG_OPAL | 2, "Control Block Parameters:"); for (int i = 0; i < GENASYNC_NB_FLOAT_PARAM; i++) - debug(2, "FloatParam[]%u] = %f", i, params.FloatParam[i]); + debug(DBG_OPAL | 2, "FloatParam[]%u] = %f", i, params.FloatParam[i]); for (int i = 0; i < GENASYNC_NB_STRING_PARAM; i++) - debug(2, "StringParam[%u] = %s", i, params.StringParam[i]); + debug(DBG_OPAL | 2, "StringParam[%u] = %s", i, params.StringParam[i]); return 0; } From d589c71ab6d3c2f7309a4570376972ef4edd0dea Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Mon, 13 Jun 2016 21:44:39 +0200 Subject: [PATCH 03/10] missed something to rename --- lib/hooks-internal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/hooks-internal.c b/lib/hooks-internal.c index 0d3d2bb7a..26958703c 100644 --- a/lib/hooks-internal.c +++ b/lib/hooks-internal.c @@ -2,7 +2,7 @@ * * @author Steffen Vogel * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC - * This file is part of S2SS. All Rights Reserved. Proprietary and confidential. + * This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential. * Unauthorized copying of this file, via any medium is strictly prohibited. *********************************************************************************/ From 2f47be0ac5ef758dcdffffc34bd4d0e22437c36f Mon Sep 17 00:00:00 2001 From: Umar Farooq Date: Sun, 17 Jul 2016 01:01:43 +0200 Subject: [PATCH 04/10] Add app_hdr tag for GT-NET-Socket GT-NET-Socket uses a tag in the config file to distinguish from default socket node type application layer header (struct msg). For now all the values are encoded in the struct sample without any header (no timestamp or sequence number). Minor improvement in pipe.c: replace 'goto' with do while --- Makefile | 1 + etc/loopback.conf | 13 ++- include/sample.h | 2 +- include/socket.h | 8 ++ lib/cfg.c | 2 +- lib/path.c | 2 +- lib/socket.c | 234 +++++++++++++++++++++++++++++++--------------- src/pipe.c | 21 +++-- 8 files changed, 190 insertions(+), 93 deletions(-) diff --git a/Makefile b/Makefile index c917c7e08..69fd1b4e9 100644 --- a/Makefile +++ b/Makefile @@ -35,6 +35,7 @@ LDLIBS = -pthread -lrt -lm -lconfig -lvillas CFLAGS += -std=c11 -Iinclude/ -I. -MMD -mcx16 CFLAGS += -Wall -fdiagnostics-color=auto CFLAGS += -D_GIT_REV='"$(GIT_REV)"' -D_POSIX_C_SOURCE=200809L -D_GNU_SOURCE=1 -DV=$(V) +#CFLAGS += -std=c99 -pedantic -ansi LDFLAGS += -L. -Wl,-rpath,'$$ORIGIN' # pkg-config dependencies diff --git a/etc/loopback.conf b/etc/loopback.conf index 9939254ff..572556516 100644 --- a/etc/loopback.conf +++ b/etc/loopback.conf @@ -32,9 +32,11 @@ nodes = { node1 = { type = "socket", layer = "udp", - local = "*:12000", # Local ip:port, use '*' for random port + local = "127.0.0.1:12000", # Local ip:port, use '*' for random port remote = "127.0.0.1:12001", combine = 5, + app_hdr = "gtskt", # app_hdr can be gtskt, deafult or none + #vectorize = 0, # number of samples to fetch per iteration from the socket. TODO correct vectorize logic netem = { enabled = false, delay = 1000000, @@ -45,9 +47,10 @@ nodes = { node2 = { type = "socket", layer = "udp", - local = "*:12002", # Local ip:port, use '*' for random port - remote = "127.0.0.1:12003" - combine = 30 + local = "127.0.0.1:12001", # Local ip:port, use '*' for random port + remote = "127.0.0.1:12002", + combine = 30, + app_hdr = "gtskt" } }; @@ -55,6 +58,6 @@ paths = ( { in = "node1", # Name of the node we listen to (see above) out = "node2", # And we loop back to the origin - hook = ["decimate", "print"] + hook = ["decimate:2", "print"] } ); diff --git a/include/sample.h b/include/sample.h index 2a6c02767..a73c016ce 100644 --- a/include/sample.h +++ b/include/sample.h @@ -31,7 +31,7 @@ enum sample_flags { SAMPLE_OFFSET = 2, SAMPLE_SEQUENCE = 4, SAMPLE_VALUES = 8, - SAMPLE_ALL = 16-1 + SAMPLE_ALL = 16-1 }; struct sample { diff --git a/include/socket.h b/include/socket.h index 4af49ed20..6c517ac97 100644 --- a/include/socket.h +++ b/include/socket.h @@ -28,6 +28,12 @@ enum socket_layer { LAYER_UDP }; +enum app_hdr_type { + APP_HDR_NONE, /** No header in the payload */ + APP_HDR_GTSKT, /** No header in the payload, same as HDR_NONE*/ + APP_HDR_DEFAULT /** Default header in the payload, (see msg_format.h) */ +}; + union sockaddr_union { struct sockaddr sa; struct sockaddr_in sin; @@ -48,6 +54,8 @@ struct socket { union sockaddr_union local; /** Remote address of the socket */ union sockaddr_union remote; + /** Payload header type */ + enum app_hdr_type app_hdr; /** libnl3: Network emulator queuing discipline */ struct rtnl_qdisc *tc_qdisc; diff --git a/lib/cfg.c b/lib/cfg.c index 8498f71c2..126e483e5 100644 --- a/lib/cfg.c +++ b/lib/cfg.c @@ -241,7 +241,7 @@ int config_parse_node(config_setting_t *cfg, struct list *nodes, struct settings config_setting_t *cfg_vectorize = config_setting_lookup(cfg, "vectorize"); if (n->vectorize <= 0) - cerror(cfg_vectorize, "Invalid value for `vectorize`. Must be natural number!"); + cerror(cfg_vectorize, "Invalid value for `vectorize` %d. Must be natural number!", n->vectorize); if (vt->vectorize && vt->vectorize < n->vectorize) cerror(cfg_vectorize, "Invalid value for `vectorize`. Node type %s requires a number smaller than %d!", node_name_type(n), vt->vectorize); diff --git a/lib/path.c b/lib/path.c index 585c6b20b..403721fb3 100644 --- a/lib/path.c +++ b/lib/path.c @@ -28,7 +28,7 @@ static void path_write(struct path *p, bool resend) /* The first message in the chunk which we want to send */ if (resend) - base = p->in->received - cnt; /* we simply resent the last vector of samples */ + base = p->in->received - cnt; /* we simply resend the last vector of samples */ else { base = n->sent; } diff --git a/lib/socket.c b/lib/socket.c index db84f136d..c0e94da28 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -36,6 +36,8 @@ static struct node_type vt; /* Private static storage */ struct list interfaces; +uint8_t vector_length = 0; //TODO intialize vectorize option in socket with constant value + int socket_init(int argc, char * argv[], config_setting_t *cfg) { if (getuid() != 0) @@ -91,6 +93,7 @@ int socket_deinit() return 0; } +//TODO: Add app_header printing char * socket_print(struct node *n) { struct socket *s = n->_vd; @@ -204,79 +207,123 @@ int socket_destroy(struct node *n) int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { + //TODO Vectorize setting...for gtskt set to 0--? + struct socket *s = n->_vd; - int samples, ret, received; + int samples, ret, received, smp_count, values_per_sample = 1; ssize_t bytes; - - struct msg msgs[cnt]; - struct msg hdr; - - struct iovec iov[2*cnt]; + + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) + smp_count = cnt; + else /** Default case if(s->app_hdr == HDR_DEFAULT)*/ + smp_count = 2*cnt; + + float sample_value; + struct iovec iov[smp_count]; struct msghdr mhdr = { .msg_iov = iov }; - /* Peak into message header of the first sample and to get total packet size. */ - bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); - if (bytes < sizeof(struct msg) || bytes % 4 != 0) { - warn("Packet size is invalid"); - return -1; - } - - ret = msg_verify(&hdr); - if (ret) { - warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); - return -1; + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) { + bytes = recv(s->sd, &sample_value, SAMPLE_DATA_LEN(1), MSG_PEEK | MSG_TRUNC); + if (bytes < sizeof(float) || bytes % 4 != 0) { + warn("Packet size is invalid"); + return -1; + } + + samples = bytes / sizeof(sample_value); + + if (samples > cnt) { + warn("Received more samples than supported. Dropping %u samples", samples - cnt); + samples = cnt; + } + /* We add one value per sample */ + for (int i = 0; i < samples; i++) { + iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[i].iov_len = SAMPLE_DATA_LEN(values_per_sample); //TODO add logic for number of values per sample + mhdr.msg_iovlen += 1; + } + /* Receive message from socket */ + bytes = recvmsg(s->sd, &mhdr, 0); + if (bytes == 0) + error("Remote node %s closed the connection", node_name(n)); + else if (bytes < 0) + serror("Failed recv from node %s", node_name(n)); + + for (received = 0; received < samples; received++) { + struct sample *smp = smps[received]; + smp->length = values_per_sample; + //TODO see if s->sequence value is needed + //TODO see if s->ts.origin and smp->ts.received value is needed, essentially requiring a header + } } - /* Convert message to host endianess */ - if (hdr.endian != MSG_ENDIAN_HOST) - msg_swap(&hdr); - - samples = bytes / MSG_LEN(hdr.values); - - if (samples > cnt) { - warn("Received more samples than supported. Dropping %u samples", samples - cnt); - samples = cnt; - } - - /* We expect that all received samples have the same amount of values! */ - for (int i = 0; i < samples; i++) { - iov[2*i+0].iov_base = &msgs[i]; - iov[2*i+0].iov_len = MSG_LEN(0); + else { //if(s->app_hdr == APP_HDR_DEFAULT) + struct msg msgs[cnt]; + struct msg hdr; - iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values); - - mhdr.msg_iovlen += 2; - } + /* Peak into message header of the first sample and to get total packet size. */ + bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); + if (bytes < sizeof(struct msg) || bytes % 4 != 0) { + warn("Packet size is invalid"); + return -1; + } - /* Receive message from socket */ - bytes = recvmsg(s->sd, &mhdr, 0); - if (bytes == 0) - error("Remote node %s closed the connection", node_name(n)); - else if (bytes < 0) - serror("Failed recv from node %s", node_name(n)); - - for (received = 0; received < samples; received++) { - struct msg *m = &msgs[received]; - struct sample *s = smps[received]; + ret = msg_verify(&hdr); + if (ret) { + warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); + return -1; + } - ret = msg_verify(m); - if (ret) - break; - - if (m->values != hdr.values) - break; - /* Convert message to host endianess */ - if (m->endian != MSG_ENDIAN_HOST) - msg_swap(m); + if (hdr.endian != MSG_ENDIAN_HOST) + msg_swap(&hdr); + + samples = bytes / MSG_LEN(hdr.values); + + if (samples > cnt) { + warn("Received more samples than supported. Dropping %u samples", samples - cnt); + samples = cnt; + } - s->length = m->values; - s->sequence = m->sequence; - s->ts.origin = MSG_TS(m); + /* We expect that all received samples have the same amount of values! */ + for (int i = 0; i < samples; i++) { + iov[2*i+0].iov_base = &msgs[i]; + iov[2*i+0].iov_len = MSG_LEN(0); + + iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values); + + mhdr.msg_iovlen += 2; + } + + /* Receive message from socket */ + bytes = recvmsg(s->sd, &mhdr, 0); //--? samples - cnt samples dropped + if (bytes == 0) + error("Remote node %s closed the connection", node_name(n)); + else if (bytes < 0) + serror("Failed recv from node %s", node_name(n)); + + for (received = 0; received < samples; received++) { + struct msg *m = &msgs[received]; + struct sample *smp = smps[received]; + + ret = msg_verify(m); + if (ret) + break; + + if (m->values != hdr.values) + break; + + /* Convert message to host endianess */ + if (m->endian != MSG_ENDIAN_HOST) + msg_swap(m); + + smp->length = m->values; + smp->sequence = m->sequence; + smp->ts.origin = MSG_TS(m); + } } debug(DBG_SOCKET | 17, "Received message of %zd bytes: %u samples", bytes, received); @@ -288,28 +335,44 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; ssize_t bytes; - - struct msg msgs[cnt]; - struct iovec iov[2*cnt]; + + int smp_count, values_per_sample = 1; + + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) + smp_count = cnt; + else /** Default case if(s->app_hdr == APP_HDR_DEFAULT)*/ + smp_count = 2*cnt; + + struct iovec iov[smp_count]; struct msghdr mhdr = { .msg_iov = iov, .msg_iovlen = ARRAY_LEN(iov) }; - + /* Construct iovecs */ - for (int i = 0; i < cnt; i++) { - msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); - - msgs[i].ts.sec = smps[i]->ts.origin.tv_sec; - msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec; - - iov[i*2+0].iov_base = &msgs[i]; - iov[i*2+0].iov_len = MSG_LEN(0); - - iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); + if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) { + for (int i = 0; i < cnt; i++) { + iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[i].iov_len = SAMPLE_DATA_LEN(values_per_sample); + } } + else { //if(s->app_hdr == APP_HDR_DEFAULT + struct msg msgs[cnt]; + for (int i = 0; i < cnt; i++) { + + msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); + + msgs[i].ts.sec = smps[i]->ts.origin.tv_sec; + msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec; + + iov[i*2+0].iov_base = &msgs[i]; + iov[i*2+0].iov_len = MSG_LEN(0); + iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); + } + } + /* Specify destination address for connection-less procotols */ switch (s->layer) { case LAYER_UDP: @@ -332,7 +395,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) int socket_parse(struct node *n, config_setting_t *cfg) { - const char *local, *remote, *layer; + const char *local, *remote, *layer, *app_hdr, *vectorize; int ret; struct socket *s = n->_vd; @@ -355,6 +418,24 @@ int socket_parse(struct node *n, config_setting_t *cfg) if (!config_setting_lookup_string(cfg, "local", &local)) cerror(cfg, "Missing local address for node %s", node_name(n)); + if (!config_setting_lookup_string(cfg, "app_hdr", &app_hdr)) + cerror(cfg, "Missing application header config for node %s", node_name(n)); + + if(!strcmp(app_hdr, "gtskt")) + s->app_hdr = APP_HDR_GTSKT; + else if(!strcmp(app_hdr, "none")) + s->app_hdr = APP_HDR_NONE; + else if(!strcmp(app_hdr, "default")) + s->app_hdr = APP_HDR_DEFAULT; + else + cerror(cfg, "Invalid application header type '%s' for node %s", app_hdr, node_name(n)); + + if (!config_setting_lookup_string(cfg, "vectorize", &vectorize)) { + info("Setting vectorize value to %d", vector_length); + vector_length = 0; + } + vector_length = strtol(vectorize, NULL, 0); + ret = socket_parse_addr(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE); if (ret) { cerror(cfg, "Failed to resolve local address '%s' of node %s: %s", @@ -515,7 +596,8 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye static struct node_type vt = { .name = "socket", .description = "BSD network sockets", - .vectorize = 0, /* unlimited */ + //.vectorize = vector_length, ////TODO intialize vectorize option in socket with constant value + .vectorize = 0, .size = sizeof(struct socket), .destroy = socket_destroy, .reverse = socket_reverse, diff --git a/src/pipe.c b/src/pipe.c index 32b426d6b..732ea8aee 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -86,17 +86,20 @@ void * send_loop(void *ctx) for (;;) { for (int i = 0; i < node->vectorize; i++) { struct sample *s = smps[i]; - int reason; + int reason, retry; -retry: reason = sample_fscan(stdin, s, NULL); - if (reason < 0) { - if (feof(stdin)) - return NULL; - else { - warn("Skipped invalid message message: reason=%d", reason); - goto retry; + do { + retry = 0; + reason = sample_fscan(stdin, s, NULL); + if (reason < 0) { + if (feof(stdin)) + return NULL; + else { + warn("Skipped invalid message message: reason=%d", reason); + retry = 1; + } } - } + } while (retry); } node_write(node, smps, node->vectorize); From 62bf0c2b0ce6aaa971b68b1e104d572039d0269f Mon Sep 17 00:00:00 2001 From: Umar Farooq Date: Tue, 2 Aug 2016 21:08:25 +0200 Subject: [PATCH 05/10] Bug fixes/improvements to GTNET-SKT code - Change enum types APP_HDR_* to SOCKET_HDR_* - Remove SOCKET_HDR_NONE type from app-hdr as its same as SOCKET_HDR_GTSKT - Fix values per samples for GTSKT to 1 - Replace TODO tag with @todo - Use SOCKET_HDR_DEFAULT if app_hdr setting is not present in config file - Fix bug when invalid message is received resulting in infinite printing loop - Fix bug in socket_write when due to declaration of struct msg msgs[cnt] in the for loop the values are not transmitted - Replace std=c98 with std=c11 but still compile without it --- Makefile | 2 +- etc/loopback.conf | 4 +-- include/socket.h | 5 ++- lib/socket.c | 85 +++++++++++++++++++++++------------------------ 4 files changed, 47 insertions(+), 49 deletions(-) diff --git a/Makefile b/Makefile index 69fd1b4e9..8001ed98b 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,7 @@ LDLIBS = -pthread -lrt -lm -lconfig -lvillas CFLAGS += -std=c11 -Iinclude/ -I. -MMD -mcx16 CFLAGS += -Wall -fdiagnostics-color=auto CFLAGS += -D_GIT_REV='"$(GIT_REV)"' -D_POSIX_C_SOURCE=200809L -D_GNU_SOURCE=1 -DV=$(V) -#CFLAGS += -std=c99 -pedantic -ansi +#CFLAGS += -pedantic -std=c11 LDFLAGS += -L. -Wl,-rpath,'$$ORIGIN' # pkg-config dependencies diff --git a/etc/loopback.conf b/etc/loopback.conf index 572556516..ef63d7977 100644 --- a/etc/loopback.conf +++ b/etc/loopback.conf @@ -35,8 +35,8 @@ nodes = { local = "127.0.0.1:12000", # Local ip:port, use '*' for random port remote = "127.0.0.1:12001", combine = 5, - app_hdr = "gtskt", # app_hdr can be gtskt, deafult or none - #vectorize = 0, # number of samples to fetch per iteration from the socket. TODO correct vectorize logic + app_hdr = "gtskt", # app_hdr can be gtskt or default. If not provided, default header will be used + #vectorize = 1, # number of samples to fetch per iteration from the socket netem = { enabled = false, delay = 1000000, diff --git a/include/socket.h b/include/socket.h index 6c517ac97..cfda82e36 100644 --- a/include/socket.h +++ b/include/socket.h @@ -29,9 +29,8 @@ enum socket_layer { }; enum app_hdr_type { - APP_HDR_NONE, /** No header in the payload */ - APP_HDR_GTSKT, /** No header in the payload, same as HDR_NONE*/ - APP_HDR_DEFAULT /** Default header in the payload, (see msg_format.h) */ + SOCKET_HDR_GTSKT, /** No header in the payload, same as HDR_NONE*/ + SOCKET_HDR_DEFAULT /** Default header in the payload, (see msg_format.h) */ }; union sockaddr_union { diff --git a/lib/socket.c b/lib/socket.c index c0e94da28..084223564 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -36,8 +36,6 @@ static struct node_type vt; /* Private static storage */ struct list interfaces; -uint8_t vector_length = 0; //TODO intialize vectorize option in socket with constant value - int socket_init(int argc, char * argv[], config_setting_t *cfg) { if (getuid() != 0) @@ -93,22 +91,28 @@ int socket_deinit() return 0; } -//TODO: Add app_header printing char * socket_print(struct node *n) { struct socket *s = n->_vd; - char *layer = NULL, *buf; + char *layer = NULL, *app_hdr = NULL, *buf; switch (s->layer) { case LAYER_UDP: layer = "udp"; break; case LAYER_IP: layer = "ip"; break; case LAYER_ETH: layer = "eth"; break; } + + switch (s->app_hdr) { + case SOCKET_HDR_GTSKT: app_hdr = "GTNET-Socket-v2"; break; + case SOCKET_HDR_DEFAULT: + default: + app_hdr = "Default"; break; + } char *local = socket_print_addr((struct sockaddr *) &s->local); char *remote = socket_print_addr((struct sockaddr *) &s->remote); - buf = strf("layer=%s, local=%s, remote=%s", layer, local, remote); + buf = strf("layer=%s, header=%s, local=%s, remote=%s", layer, app_hdr, local, remote); free(local); free(remote); @@ -207,25 +211,26 @@ int socket_destroy(struct node *n) int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { - //TODO Vectorize setting...for gtskt set to 0--? - struct socket *s = n->_vd; - int samples, ret, received, smp_count, values_per_sample = 1; + int samples, ret, received, smp_count; ssize_t bytes; - if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) + if(s->app_hdr == SOCKET_HDR_GTSKT) smp_count = cnt; else /** Default case if(s->app_hdr == HDR_DEFAULT)*/ smp_count = 2*cnt; + struct msg msgs[cnt]; + struct msg hdr; + float sample_value; struct iovec iov[smp_count]; struct msghdr mhdr = { .msg_iov = iov }; - if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) { + if(s->app_hdr == SOCKET_HDR_GTSKT) { bytes = recv(s->sd, &sample_value, SAMPLE_DATA_LEN(1), MSG_PEEK | MSG_TRUNC); if (bytes < sizeof(float) || bytes % 4 != 0) { warn("Packet size is invalid"); @@ -241,7 +246,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) /* We add one value per sample */ for (int i = 0; i < samples; i++) { iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[i].iov_len = SAMPLE_DATA_LEN(values_per_sample); //TODO add logic for number of values per sample + iov[i].iov_len = SAMPLE_DATA_LEN(1); /** values per sample is 1 while reading */ mhdr.msg_iovlen += 1; } /* Receive message from socket */ @@ -253,26 +258,25 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) for (received = 0; received < samples; received++) { struct sample *smp = smps[received]; - smp->length = values_per_sample; - //TODO see if s->sequence value is needed - //TODO see if s->ts.origin and smp->ts.received value is needed, essentially requiring a header + smp->length = 1; + /** @todo see if s->sequence value is needed */ + /** @todo see if s->ts.origin and smp->ts.received value is needed, essentially requiring a header */ } } - else { //if(s->app_hdr == APP_HDR_DEFAULT) - struct msg msgs[cnt]; - struct msg hdr; - + else { //if(s->app_hdr == SOCKET_HDR_DEFAULT) /* Peak into message header of the first sample and to get total packet size. */ bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); if (bytes < sizeof(struct msg) || bytes % 4 != 0) { warn("Packet size is invalid"); + recv(s->sd, &hdr, sizeof(struct msg), 0); return -1; } ret = msg_verify(&hdr); if (ret) { warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); + recv(s->sd, &hdr, sizeof(struct msg), 0); return -1; } @@ -336,13 +340,14 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) struct socket *s = n->_vd; ssize_t bytes; - int smp_count, values_per_sample = 1; + unsigned smp_count; - if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) + if(s->app_hdr == SOCKET_HDR_GTSKT) smp_count = cnt; - else /** Default case if(s->app_hdr == APP_HDR_DEFAULT)*/ + else /** Default case if(s->app_hdr == SOCKET_HDR_DEFAULT)*/ smp_count = 2*cnt; + struct msg msgs[cnt]; struct iovec iov[smp_count]; struct msghdr mhdr = { .msg_iov = iov, @@ -350,14 +355,13 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) }; /* Construct iovecs */ - if(s->app_hdr == APP_HDR_NONE || s->app_hdr == APP_HDR_GTSKT) { + if(s->app_hdr == SOCKET_HDR_GTSKT) { for (int i = 0; i < cnt; i++) { iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[i].iov_len = SAMPLE_DATA_LEN(values_per_sample); + iov[i].iov_len = SAMPLE_DATA_LEN(1); } } - else { //if(s->app_hdr == APP_HDR_DEFAULT - struct msg msgs[cnt]; + else { /** if(s->app_hdr == SOCKET_HDR_DEFAULT */ for (int i = 0; i < cnt; i++) { msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); @@ -390,12 +394,12 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) debug(DBG_SOCKET | 17, "Sent packet of %zd bytes with %u samples", bytes, cnt); - return cnt; + return cnt; } int socket_parse(struct node *n, config_setting_t *cfg) { - const char *local, *remote, *layer, *app_hdr, *vectorize; + const char *local, *remote, *layer, *app_hdr; int ret; struct socket *s = n->_vd; @@ -419,22 +423,18 @@ int socket_parse(struct node *n, config_setting_t *cfg) cerror(cfg, "Missing local address for node %s", node_name(n)); if (!config_setting_lookup_string(cfg, "app_hdr", &app_hdr)) - cerror(cfg, "Missing application header config for node %s", node_name(n)); - - if(!strcmp(app_hdr, "gtskt")) - s->app_hdr = APP_HDR_GTSKT; - else if(!strcmp(app_hdr, "none")) - s->app_hdr = APP_HDR_NONE; - else if(!strcmp(app_hdr, "default")) - s->app_hdr = APP_HDR_DEFAULT; - else - cerror(cfg, "Invalid application header type '%s' for node %s", app_hdr, node_name(n)); - - if (!config_setting_lookup_string(cfg, "vectorize", &vectorize)) { - info("Setting vectorize value to %d", vector_length); - vector_length = 0; + s->app_hdr = SOCKET_HDR_DEFAULT; + else { + if(!strcmp(app_hdr, "gtskt")) + s->app_hdr = SOCKET_HDR_GTSKT; + else if(!strcmp(app_hdr, "default")) + s->app_hdr = SOCKET_HDR_DEFAULT; + else + cerror(cfg, "Invalid application header type '%s' for node %s", app_hdr, node_name(n)); } - vector_length = strtol(vectorize, NULL, 0); + + /** if (!config_setting_lookup_int(cfg, "vectorize", &n->vectorize)) + n->vectorize = 1; */ ret = socket_parse_addr(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE); if (ret) { @@ -596,7 +596,6 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye static struct node_type vt = { .name = "socket", .description = "BSD network sockets", - //.vectorize = vector_length, ////TODO intialize vectorize option in socket with constant value .vectorize = 0, .size = sizeof(struct socket), .destroy = socket_destroy, From fe92747aa9ad2ac2013b335f396d0839d7ad338f Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 10 Sep 2016 20:40:37 -0400 Subject: [PATCH 06/10] renamed a couple of enum constants and some minor code style cleanups --- include/socket.h | 17 ++-- lib/socket.c | 205 +++++++++++++++++++++++------------------------ 2 files changed, 110 insertions(+), 112 deletions(-) diff --git a/include/socket.h b/include/socket.h index cfda82e36..64c1dd82e 100644 --- a/include/socket.h +++ b/include/socket.h @@ -23,14 +23,14 @@ #include "node.h" enum socket_layer { - LAYER_ETH, - LAYER_IP, - LAYER_UDP + SOCKET_LAYER_ETH, + SOCKET_LAYER_IP, + SOCKET_LAYER_UDP }; -enum app_hdr_type { - SOCKET_HDR_GTSKT, /** No header in the payload, same as HDR_NONE*/ - SOCKET_HDR_DEFAULT /** Default header in the payload, (see msg_format.h) */ +enum socket_header { + SOCKET_HEADER_DEFAULT, /**> Default header in the payload, (see msg_format.h) */ + SOCKET_HEADER_GTNET_SKT /**> No header in the payload, same as HDR_NONE*/ }; union sockaddr_union { @@ -49,12 +49,13 @@ struct socket { /** The OSI / IP layer which should be used for this socket */ enum socket_layer layer; + /** Payload header type */ + enum socket_header header; + /** Local address of the socket */ union sockaddr_union local; /** Remote address of the socket */ union sockaddr_union remote; - /** Payload header type */ - enum app_hdr_type app_hdr; /** libnl3: Network emulator queuing discipline */ struct rtnl_qdisc *tc_qdisc; diff --git a/lib/socket.c b/lib/socket.c index 084223564..71f5a511c 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -40,7 +40,7 @@ int socket_init(int argc, char * argv[], config_setting_t *cfg) { if (getuid() != 0) error("The 'socket' node-type requires superuser privileges!"); - + nl_init(); /* Fill link cache */ list_init(&interfaces); @@ -62,14 +62,14 @@ int socket_init(int argc, char * argv[], config_setting_t *cfg) if (rtnl_link_get_ifindex(i->nl_link) == rtnl_link_get_ifindex(link)) goto found; } - + /* If not found, create a new interface */ i = if_create(link); list_push(&interfaces, i); found: list_push(&i->sockets, s); } - + /** @todo Improve mapping of NIC IRQs per path */ int affinity; if (!config_setting_lookup_int(cfg, "affinity", &affinity)) @@ -94,29 +94,28 @@ int socket_deinit() char * socket_print(struct node *n) { struct socket *s = n->_vd; - char *layer = NULL, *app_hdr = NULL, *buf; - + char *layer = NULL, *hdr = NULL, *buf; + switch (s->layer) { - case LAYER_UDP: layer = "udp"; break; - case LAYER_IP: layer = "ip"; break; - case LAYER_ETH: layer = "eth"; break; + case SOCKET_LAYER_UDP: layer = "udp"; break; + case SOCKET_LAYER_IP: layer = "ip"; break; + case SOCKET_LAYER_ETH: layer = "eth"; break; } - - switch (s->app_hdr) { - case SOCKET_HDR_GTSKT: app_hdr = "GTNET-Socket-v2"; break; - case SOCKET_HDR_DEFAULT: - default: - app_hdr = "Default"; break; + + switch (s->header) { + case SOCKET_HEADER_GTNET_SKT: hdr = "RTDS GTNETv2-SKT"; break; + case SOCKET_HEADER_DEFAULT: + default: hdr = "VILLASnode"; break; } char *local = socket_print_addr((struct sockaddr *) &s->local); char *remote = socket_print_addr((struct sockaddr *) &s->remote); - buf = strf("layer=%s, header=%s, local=%s, remote=%s", layer, app_hdr, local, remote); - + buf = strf("layer=%s, header=%s, local=%s, remote=%s", layer, hdr, local, remote); + free(local); free(remote); - + return buf; } @@ -129,9 +128,9 @@ int socket_open(struct node *n) /* Create socket */ switch (s->layer) { - case LAYER_UDP: s->sd = socket(sin->sin_family, SOCK_DGRAM, IPPROTO_UDP); break; - case LAYER_IP: s->sd = socket(sin->sin_family, SOCK_RAW, ntohs(sin->sin_port)); break; - case LAYER_ETH: s->sd = socket(sll->sll_family, SOCK_DGRAM, sll->sll_protocol); break; + case SOCKET_LAYER_UDP: s->sd = socket(sin->sin_family, SOCK_DGRAM, IPPROTO_UDP); break; + case SOCKET_LAYER_IP: s->sd = socket(sin->sin_family, SOCK_RAW, ntohs(sin->sin_port)); break; + case SOCKET_LAYER_ETH: s->sd = socket(sll->sll_family, SOCK_DGRAM, sll->sll_protocol); break; default: error("Invalid socket type!"); } @@ -156,8 +155,8 @@ int socket_open(struct node *n) /* Set socket priority, QoS or TOS IP options */ int prio; switch (s->layer) { - case LAYER_UDP: - case LAYER_IP: + case SOCKET_LAYER_UDP: + case SOCKET_LAYER_IP: prio = IPTOS_LOWDELAY; if (setsockopt(s->sd, IPPROTO_IP, IP_TOS, &prio, sizeof(prio))) serror("Failed to set type of service (QoS)"); @@ -181,11 +180,11 @@ int socket_reverse(struct node *n) { struct socket *s = n->_vd; union sockaddr_union tmp; - + tmp = s->local; s->local = s->remote; s->remote = tmp; - + return 0; } @@ -202,43 +201,43 @@ int socket_close(struct node *n) int socket_destroy(struct node *n) { struct socket *s = n->_vd; - + rtnl_qdisc_put(s->tc_qdisc); rtnl_cls_put(s->tc_classifier); - + return 0; } int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; - + int samples, ret, received, smp_count; ssize_t bytes; - - if(s->app_hdr == SOCKET_HDR_GTSKT) + + if (s->header == SOCKET_HEADER_GTNET_SKT) smp_count = cnt; - else /** Default case if(s->app_hdr == HDR_DEFAULT)*/ + else smp_count = 2*cnt; - + struct msg msgs[cnt]; struct msg hdr; - + float sample_value; struct iovec iov[smp_count]; struct msghdr mhdr = { .msg_iov = iov }; - - if(s->app_hdr == SOCKET_HDR_GTSKT) { + + if (s->header == SOCKET_HEADER_GTNET_SKT) { bytes = recv(s->sd, &sample_value, SAMPLE_DATA_LEN(1), MSG_PEEK | MSG_TRUNC); if (bytes < sizeof(float) || bytes % 4 != 0) { warn("Packet size is invalid"); return -1; } - + samples = bytes / sizeof(sample_value); - + if (samples > cnt) { warn("Received more samples than supported. Dropping %u samples", samples - cnt); samples = cnt; @@ -255,7 +254,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) error("Remote node %s closed the connection", node_name(n)); else if (bytes < 0) serror("Failed recv from node %s", node_name(n)); - + for (received = 0; received < samples; received++) { struct sample *smp = smps[received]; smp->length = 1; @@ -263,8 +262,8 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) /** @todo see if s->ts.origin and smp->ts.received value is needed, essentially requiring a header */ } } - - else { //if(s->app_hdr == SOCKET_HDR_DEFAULT) + + else { /* Peak into message header of the first sample and to get total packet size. */ bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); if (bytes < sizeof(struct msg) || bytes % 4 != 0) { @@ -279,15 +278,15 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) recv(s->sd, &hdr, sizeof(struct msg), 0); return -1; } - + /* Convert message to host endianess */ if (hdr.endian != MSG_ENDIAN_HOST) msg_swap(&hdr); - + samples = bytes / MSG_LEN(hdr.values); - + if (samples > cnt) { - warn("Received more samples than supported. Dropping %u samples", samples - cnt); + warn("Node %s received more samples than supported. Dropping %u samples", node_name(n), samples - cnt); samples = cnt; } @@ -295,10 +294,10 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) for (int i = 0; i < samples; i++) { iov[2*i+0].iov_base = &msgs[i]; iov[2*i+0].iov_len = MSG_LEN(0); - + iov[2*i+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values); - + mhdr.msg_iovlen += 2; } @@ -312,11 +311,11 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) for (received = 0; received < samples; received++) { struct msg *m = &msgs[received]; struct sample *smp = smps[received]; - + ret = msg_verify(m); if (ret) break; - + if (m->values != hdr.values) break; @@ -329,7 +328,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) smp->ts.origin = MSG_TS(m); } } - + debug(DBG_SOCKET | 17, "Received message of %zd bytes: %u samples", bytes, received); return received; @@ -339,36 +338,36 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; ssize_t bytes; - + unsigned smp_count; - - if(s->app_hdr == SOCKET_HDR_GTSKT) + + if (s->header == SOCKET_HEADER_GTNET_SKT) smp_count = cnt; - else /** Default case if(s->app_hdr == SOCKET_HDR_DEFAULT)*/ + else smp_count = 2*cnt; - + struct msg msgs[cnt]; struct iovec iov[smp_count]; struct msghdr mhdr = { .msg_iov = iov, .msg_iovlen = ARRAY_LEN(iov) }; - + /* Construct iovecs */ - if(s->app_hdr == SOCKET_HDR_GTSKT) { + if (s->header == SOCKET_HEADER_GTNET_SKT) { for (int i = 0; i < cnt; i++) { iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); iov[i].iov_len = SAMPLE_DATA_LEN(1); } } - else { /** if(s->app_hdr == SOCKET_HDR_DEFAULT */ + else { for (int i = 0; i < cnt; i++) { - - msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); - - msgs[i].ts.sec = smps[i]->ts.origin.tv_sec; - msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec; - + + msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); + + msgs[i].ts.sec = smps[i]->ts.origin.tv_sec; + msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec; + iov[i*2+0].iov_base = &msgs[i]; iov[i*2+0].iov_len = MSG_LEN(0); @@ -376,12 +375,12 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); } } - + /* Specify destination address for connection-less procotols */ switch (s->layer) { - case LAYER_UDP: - case LAYER_IP: - case LAYER_ETH: + case SOCKET_LAYER_UDP: + case SOCKET_LAYER_IP: + case SOCKET_LAYER_ETH: mhdr.msg_name = (struct sockaddr *) &s->remote; mhdr.msg_namelen = sizeof(s->remote); break; @@ -394,48 +393,48 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) debug(DBG_SOCKET | 17, "Sent packet of %zd bytes with %u samples", bytes, cnt); - return cnt; + return cnt; } int socket_parse(struct node *n, config_setting_t *cfg) { - const char *local, *remote, *layer, *app_hdr; + const char *local, *remote, *layer, *hdr; int ret; struct socket *s = n->_vd; + /* IP layer */ if (!config_setting_lookup_string(cfg, "layer", &layer)) - cerror(cfg, "Missing layer for node %s", node_name(n)); + s->layer = SOCKET_LAYER_UDP; + else { + if (!strcmp(layer, "eth")) + s->layer = SOCKET_LAYER_ETH; + else if (!strcmp(layer, "ip")) + s->layer = SOCKET_LAYER_IP; + else if (!strcmp(layer, "udp")) + s->layer = SOCKET_LAYER_UDP; + else + cerror(cfg, "Invalid layer '%s' for node %s", layer, node_name(n)); + } - if (!strcmp(layer, "eth")) - s->layer = LAYER_ETH; - else if (!strcmp(layer, "ip")) - s->layer = LAYER_IP; - else if (!strcmp(layer, "udp")) - s->layer = LAYER_UDP; - else - cerror(cfg, "Invalid layer '%s' for node %s", layer, node_name(n)); + /* Application header */ + if (!config_setting_lookup_string(cfg, "header", &hdr)) + s->header = SOCKET_HEADER_DEFAULT; + else { + if (!strcmp(hdr, "gtnet-skt")) + s->header = SOCKET_HEADER_GTNET_SKT; + else if (!strcmp(hdr, "default")) + s->header = SOCKET_HEADER_DEFAULT; + else + cerror(cfg, "Invalid application header type '%s' for node %s", hdr, node_name(n)); + } if (!config_setting_lookup_string(cfg, "remote", &remote)) cerror(cfg, "Missing remote address for node %s", node_name(n)); if (!config_setting_lookup_string(cfg, "local", &local)) cerror(cfg, "Missing local address for node %s", node_name(n)); - - if (!config_setting_lookup_string(cfg, "app_hdr", &app_hdr)) - s->app_hdr = SOCKET_HDR_DEFAULT; - else { - if(!strcmp(app_hdr, "gtskt")) - s->app_hdr = SOCKET_HDR_GTSKT; - else if(!strcmp(app_hdr, "default")) - s->app_hdr = SOCKET_HDR_DEFAULT; - else - cerror(cfg, "Invalid application header type '%s' for node %s", app_hdr, node_name(n)); - } - - /** if (!config_setting_lookup_int(cfg, "vectorize", &n->vectorize)) - n->vectorize = 1; */ - + ret = socket_parse_addr(local, (struct sockaddr *) &s->local, s->layer, AI_PASSIVE); if (ret) { cerror(cfg, "Failed to resolve local address '%s' of node %s: %s", @@ -462,7 +461,7 @@ char * socket_print_addr(struct sockaddr *saddr) { union sockaddr_union *sa = (union sockaddr_union *) saddr; char *buf = alloc(64); - + /* Address */ switch (sa->sa.sa_family) { case AF_INET6: @@ -472,7 +471,7 @@ char * socket_print_addr(struct sockaddr *saddr) case AF_INET: inet_ntop(AF_INET, &sa->sin.sin_addr, buf, 64); break; - + case AF_PACKET: strcatf(&buf, "%02x", sa->sll.sll_addr[0]); for (int i = 1; i < sa->sll.sll_halen; i++) @@ -482,7 +481,7 @@ char * socket_print_addr(struct sockaddr *saddr) default: error("Unknown address family: '%u'", sa->sa.sa_family); } - + /* Port / Interface */ switch (sa->sa.sa_family) { case AF_INET6: @@ -495,7 +494,7 @@ char * socket_print_addr(struct sockaddr *saddr) struct rtnl_link *link = rtnl_link_get(cache, sa->sll.sll_ifindex); if (!link) error("Failed to get interface for index: %u", sa->sll.sll_ifindex); - + strcatf(&buf, "%%%s", rtnl_link_get_name(link)); strcatf(&buf, ":%hu", ntohs(sa->sll.sll_protocol)); break; @@ -513,7 +512,7 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye char *copy = strdup(addr); int ret; - if (layer == LAYER_ETH) { /* Format: "ab:cd:ef:12:34:56%ifname:protocol" */ + if (layer == SOCKET_LAYER_ETH) { /* Format: "ab:cd:ef:12:34:56%ifname:protocol" */ /* Split string */ char *node = strtok(copy, "%"); char *ifname = strtok(NULL, ":"); @@ -525,7 +524,7 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye error("Failed to parse MAC address: %s", node); memcpy(&sa->sll.sll_addr, &mac->ether_addr_octet, 6); - + /* Get interface index from name */ struct nl_cache *cache = nl_cache_mngt_require("route/link"); struct rtnl_link *link = rtnl_link_get_by_name(cache, ifname); @@ -556,13 +555,13 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye service = NULL; switch (layer) { - case LAYER_IP: + case SOCKET_LAYER_IP: hint.ai_socktype = SOCK_RAW; hint.ai_protocol = (service) ? strtol(service, NULL, 0) : IPPROTO_VILLAS; hint.ai_flags |= AI_NUMERICSERV; break; - case LAYER_UDP: + case SOCKET_LAYER_UDP: hint.ai_socktype = SOCK_DGRAM; hint.ai_protocol = IPPROTO_UDP; break; @@ -573,17 +572,15 @@ int socket_parse_addr(const char *addr, struct sockaddr *saddr, enum socket_laye /* Lookup address */ struct addrinfo *result; - ret = getaddrinfo(node, (layer == LAYER_IP) ? NULL : service, &hint, &result); + ret = getaddrinfo(node, (layer == SOCKET_LAYER_IP) ? NULL : service, &hint, &result); if (!ret) { - - if (layer == LAYER_IP) { + if (layer == SOCKET_LAYER_IP) { /* We mis-use the sin_port field to store the IP protocol number on RAW sockets */ struct sockaddr_in *sin = (struct sockaddr_in *) result->ai_addr; sin->sin_port = htons(result->ai_protocol); } memcpy(sa, result->ai_addr, result->ai_addrlen); - freeaddrinfo(result); } } From 545103ff1c61fb766e6623e316c96d84be10142f Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 10 Sep 2016 20:58:46 -0400 Subject: [PATCH 07/10] rewrote socket_read(): there ave been some misunderstandings --- lib/socket.c | 83 +++++++++++++++++++++++----------------------------- 1 file changed, 37 insertions(+), 46 deletions(-) diff --git a/lib/socket.c b/lib/socket.c index 71f5a511c..ab08eebc2 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -212,70 +212,61 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; - int samples, ret, received, smp_count; + int samples, ret, received; ssize_t bytes; - if (s->header == SOCKET_HEADER_GTNET_SKT) - smp_count = cnt; - else - smp_count = 2*cnt; - - struct msg msgs[cnt]; - struct msg hdr; - - float sample_value; - struct iovec iov[smp_count]; - struct msghdr mhdr = { - .msg_iov = iov - }; - if (s->header == SOCKET_HEADER_GTNET_SKT) { - bytes = recv(s->sd, &sample_value, SAMPLE_DATA_LEN(1), MSG_PEEK | MSG_TRUNC); - if (bytes < sizeof(float) || bytes % 4 != 0) { - warn("Packet size is invalid"); + if (cnt < 1) + return 0; + + /* The GTNETv2-SKT protocol send every sample in a single packet. + * socket_read() receives a single packet. */ + struct sample *smp = smps[0]; + + /* Receive next sample */ + bytes = recv(s->sd, &smp->values[0], SAMPLE_DATA_LEN(smp->length), MSG_PEEK | MSG_TRUNC); + if (bytes == 0) + error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */ + else if (bytes < 0) + serror("Failed recv from node %s", node_name(n)); + else if (bytes % 4 != 0) { + warn("Packet size is invalid. Must be multiple of 4 bytes."); + recv(s->sd, NULL, 0, 0); /* empty receive buffer */ return -1; } - samples = bytes / sizeof(sample_value); + received = bytes / sizeof(smp->values[0]); + if (received > smp->length) { + warn("Node %s received more samples than supported. Dropping %u samples", node_name(n), received - smp->length); + received = smp->length; + } - if (samples > cnt) { - warn("Received more samples than supported. Dropping %u samples", samples - cnt); - samples = cnt; - } - /* We add one value per sample */ - for (int i = 0; i < samples; i++) { - iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[i].iov_len = SAMPLE_DATA_LEN(1); /** values per sample is 1 while reading */ - mhdr.msg_iovlen += 1; - } - /* Receive message from socket */ - bytes = recvmsg(s->sd, &mhdr, 0); - if (bytes == 0) - error("Remote node %s closed the connection", node_name(n)); - else if (bytes < 0) - serror("Failed recv from node %s", node_name(n)); - - for (received = 0; received < samples; received++) { - struct sample *smp = smps[received]; - smp->length = 1; - /** @todo see if s->sequence value is needed */ - /** @todo see if s->ts.origin and smp->ts.received value is needed, essentially requiring a header */ - } + /** @todo Should we generate sequence no here manually? + * Or maybe optinally use the first data value as a sequence? + * However this would require the RTDS model to be changed. */ + smp->sequence = 0; + smp->length = received; } - else { + struct msg msgs[cnt]; + struct msg hdr; + struct iovec iov[2*cnt]; + struct msghdr mhdr = { + .msg_iov = iov + }; + /* Peak into message header of the first sample and to get total packet size. */ bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); if (bytes < sizeof(struct msg) || bytes % 4 != 0) { - warn("Packet size is invalid"); - recv(s->sd, &hdr, sizeof(struct msg), 0); + warn("Packet size is invalid. Must be multiple of 4 bytes."); + recv(s->sd, &hdr, sizeof(struct msg), 0); /* empty receive buffer */ return -1; } ret = msg_verify(&hdr); if (ret) { warn("Invalid message received: reason=%d, bytes=%zd", ret, bytes); - recv(s->sd, &hdr, sizeof(struct msg), 0); + recv(s->sd, NULL, 0, 0); /* empty receive buffer */ return -1; } From 3c4c08cda6e668efafe028ae17b9724803b423fc Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 10 Sep 2016 21:10:00 -0400 Subject: [PATCH 08/10] rewrote socket_write(): there ave been some misunderstandings --- lib/socket.c | 72 +++++++++++++++++++++++++--------------------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/lib/socket.c b/lib/socket.c index ab08eebc2..2bd56fc60 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -329,29 +329,33 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; ssize_t bytes; - - unsigned smp_count; - - if (s->header == SOCKET_HEADER_GTNET_SKT) - smp_count = cnt; - else - smp_count = 2*cnt; - - struct msg msgs[cnt]; - struct iovec iov[smp_count]; - struct msghdr mhdr = { - .msg_iov = iov, - .msg_iovlen = ARRAY_LEN(iov) - }; + int sent = 0; /* Construct iovecs */ if (s->header == SOCKET_HEADER_GTNET_SKT) { + if (cnt < 1) + return 0; + for (int i = 0; i < cnt; i++) { - iov[i].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[i].iov_len = SAMPLE_DATA_LEN(1); + bytes = sendto(s->sd, &smps[i]->values, SAMPLE_DATA_LEN(smps[i]->length), 0, (struct sockaddr *) &s->remote, sizeof(s->remote)); + if (bytes < 0) + serror("Failed send to node %s", node_name(n)); + + sent++; + + debug(DBG_SOCKET | 17, "Sent packet of %zd bytes with 1 sample", bytes); } } else { + struct msg msgs[cnt]; + struct iovec iov[2*cnt]; + struct msghdr mhdr = { + .msg_iov = iov, + .msg_iovlen = ARRAY_LEN(iov), + .msg_name = (struct sockaddr *) &s->remote, + .msg_namelen = sizeof(s->remote) + }; + for (int i = 0; i < cnt; i++) { msgs[i] = MSG_INIT(smps[i]->length, smps[i]->sequence); @@ -359,32 +363,24 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) msgs[i].ts.sec = smps[i]->ts.origin.tv_sec; msgs[i].ts.nsec = smps[i]->ts.origin.tv_nsec; - iov[i*2+0].iov_base = &msgs[i]; - iov[i*2+0].iov_len = MSG_LEN(0); + iov[i*2+0].iov_base = &msgs[i]; + iov[i*2+0].iov_len = MSG_LEN(0); - iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); - iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); + iov[i*2+1].iov_base = SAMPLE_DATA_OFFSET(smps[i]); + iov[i*2+1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); } + + /* Send message */ + bytes = sendmsg(s->sd, &mhdr, 0); + if (bytes < 0) + serror("Failed send to node %s", node_name(n)); + + sent = cnt; /** @todo Find better way to determine how many values we actually sent */ + + debug(DBG_SOCKET | 17, "Sent packet of %zd bytes with %u samples", bytes, cnt); } - /* Specify destination address for connection-less procotols */ - switch (s->layer) { - case SOCKET_LAYER_UDP: - case SOCKET_LAYER_IP: - case SOCKET_LAYER_ETH: - mhdr.msg_name = (struct sockaddr *) &s->remote; - mhdr.msg_namelen = sizeof(s->remote); - break; - } - - /* Send message */ - bytes = sendmsg(s->sd, &mhdr, 0); - if (bytes < 0) - serror("Failed send to node %s", node_name(n)); - - debug(DBG_SOCKET | 17, "Sent packet of %zd bytes with %u samples", bytes, cnt); - - return cnt; + return sent; } int socket_parse(struct node *n, config_setting_t *cfg) From 65f46cef5fa22188dd24bb4b07039c2bbcceb162 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 10 Sep 2016 22:16:23 -0400 Subject: [PATCH 09/10] Check for maximum amount of values per sample supported --- include/sample.h | 13 ++++++++++--- lib/sample.c | 22 ++++++++++++++++++---- lib/socket.c | 39 ++++++++++++++++++++++----------------- src/pipe.c | 4 ++-- 4 files changed, 52 insertions(+), 26 deletions(-) diff --git a/include/sample.h b/include/sample.h index a73c016ce..3a1ff945c 100644 --- a/include/sample.h +++ b/include/sample.h @@ -4,7 +4,7 @@ * @author Steffen Vogel * @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC * This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential. - * Unauthorized copying of this file, via any medium is strictly prohibited. + * Unauthorized copying of this file, via any medium is strictly prohibited. */ #ifndef _SAMPLE_H_ @@ -16,6 +16,9 @@ #include #include +/* Forward declarations */ +struct pool; + /** The length of a sample datastructure with \p values values in bytes. */ #define SAMPLE_LEN(values) (sizeof(struct sample) + SAMPLE_DATA_LEN(values)) @@ -35,8 +38,9 @@ enum sample_flags { }; struct sample { - int length; /**< The number of values in sample::values. */ int sequence; /**< The sequence number of this sample. */ + int length; /**< The number of values in sample::values which are valid. */ + int capacity; /**< The number of values in sample::values for which memory is reserved. */ /** All timestamps are seconds / nano seconds after 1.1.1970 UTC */ struct { @@ -44,7 +48,7 @@ struct sample { struct timespec received; /**< The point in time when this data was received. */ struct timespec sent; /**< The point in time this data was send for the last time. */ } ts; - + /** The values. */ union { float f; /**< Floating point values (note msg::endian) */ @@ -52,6 +56,9 @@ struct sample { } values[]; }; +/** Request \p cnt samples from memory pool \p p and initialize them. */ +int sample_get_many(struct pool *p, struct sample *smps[], int cnt); + /** Print a sample in human readable form to a file stream. * * @param buf A character buffer of len bytes. diff --git a/lib/sample.c b/lib/sample.c index 8206bb97e..cd5be7873 100644 --- a/lib/sample.c +++ b/lib/sample.c @@ -8,9 +8,23 @@ #include +#include "pool.h" #include "sample.h" #include "timing.h" +int sample_get_many(struct pool *p, struct sample *smps[], int cnt) { + int ret; + + ret = pool_get_many(p, (void **) smps, cnt); + if (ret < 0) + return ret; + + for (int i = 0; i < ret; i++) + smps[i]->capacity = (p->blocksz - sizeof(**smps)) / sizeof(smps[0]->values[0]); + + return ret; +} + int sample_print(char *buf, size_t len, struct sample *s, int flags) { size_t off = snprintf(buf, len, "%llu", (unsigned long long) s->ts.origin.tv_sec); @@ -91,11 +105,11 @@ int sample_scan(const char *line, struct sample *s, int *fl) end++; } - for (s->length = 0, ptr = end; ; - s->length++, ptr = end) { + for (ptr = end, s->length = 0; + s->length < s->capacity; + ptr = end, s->length++) { - /** @todo We only support floating point values at the moment */ - s->values[s->length].f = strtod(ptr, &end); + s->values[s->length].f = strtod(ptr, &end); /** @todo We only support floating point values at the moment */ if (end == ptr) /* there are no valid FP values anymore */ break; diff --git a/lib/socket.c b/lib/socket.c index 2bd56fc60..cbfafd1bf 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -94,24 +94,23 @@ int socket_deinit() char * socket_print(struct node *n) { struct socket *s = n->_vd; - char *layer = NULL, *hdr = NULL, *buf; + char *layer = NULL, *header = NULL, *buf; switch (s->layer) { - case SOCKET_LAYER_UDP: layer = "udp"; break; + case SOCKET_LAYER_UDP: layer = "udp"; break; case SOCKET_LAYER_IP: layer = "ip"; break; case SOCKET_LAYER_ETH: layer = "eth"; break; } switch (s->header) { - case SOCKET_HEADER_GTNET_SKT: hdr = "RTDS GTNETv2-SKT"; break; - case SOCKET_HEADER_DEFAULT: - default: hdr = "VILLASnode"; break; + case SOCKET_HEADER_GTNET_SKT: header = "gtnet-skt"; break; + case SOCKET_HEADER_DEFAULT: header = "villas"; break; } char *local = socket_print_addr((struct sockaddr *) &s->local); char *remote = socket_print_addr((struct sockaddr *) &s->remote); - buf = strf("layer=%s, header=%s, local=%s, remote=%s", layer, hdr, local, remote); + buf = strf("layer=%s, header=%s, local=%s, remote=%s", layer, header, local, remote); free(local); free(remote); @@ -212,7 +211,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) { struct socket *s = n->_vd; - int samples, ret, received; + int samples, ret, received, length; ssize_t bytes; if (s->header == SOCKET_HEADER_GTNET_SKT) { @@ -224,28 +223,32 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) struct sample *smp = smps[0]; /* Receive next sample */ - bytes = recv(s->sd, &smp->values[0], SAMPLE_DATA_LEN(smp->length), MSG_PEEK | MSG_TRUNC); + bytes = recv(s->sd, &smp->values[0], SAMPLE_DATA_LEN(smp->capacity), MSG_TRUNC); if (bytes == 0) error("Remote node %s closed the connection", node_name(n)); /** @todo Should we really hard fail here? */ else if (bytes < 0) serror("Failed recv from node %s", node_name(n)); else if (bytes % 4 != 0) { - warn("Packet size is invalid. Must be multiple of 4 bytes."); + warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes); recv(s->sd, NULL, 0, 0); /* empty receive buffer */ return -1; } - received = bytes / sizeof(smp->values[0]); - if (received > smp->length) { - warn("Node %s received more samples than supported. Dropping %u samples", node_name(n), received - smp->length); - received = smp->length; + debug(3, "Received %zd bytes", bytes); + + length = bytes / SAMPLE_DATA_LEN(1); + if (length > smp->capacity) { + warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity); + length = smp->capacity; } /** @todo Should we generate sequence no here manually? * Or maybe optinally use the first data value as a sequence? * However this would require the RTDS model to be changed. */ smp->sequence = 0; - smp->length = received; + smp->length = length; + + received = 1; /* GTNET-SKT sends every sample in a single packet */ } else { struct msg msgs[cnt]; @@ -258,8 +261,8 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) /* Peak into message header of the first sample and to get total packet size. */ bytes = recv(s->sd, &hdr, sizeof(struct msg), MSG_PEEK | MSG_TRUNC); if (bytes < sizeof(struct msg) || bytes % 4 != 0) { - warn("Packet size is invalid. Must be multiple of 4 bytes."); - recv(s->sd, &hdr, sizeof(struct msg), 0); /* empty receive buffer */ + warn("Packet size is invalid: %zd Must be multiple of 4 bytes.", bytes); + recv(s->sd, NULL, 0, 0); /* empty receive buffer */ return -1; } @@ -275,7 +278,6 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) msg_swap(&hdr); samples = bytes / MSG_LEN(hdr.values); - if (samples > cnt) { warn("Node %s received more samples than supported. Dropping %u samples", node_name(n), samples - cnt); samples = cnt; @@ -290,6 +292,9 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) iov[2*i+1].iov_len = SAMPLE_DATA_LEN(hdr.values); mhdr.msg_iovlen += 2; + + if (hdr.values > smps[i]->capacity) + error("Node %s received more values than supported. Dropping %d values.", node_name(n), hdr.values - smps[i]->capacity); } /* Receive message from socket */ diff --git a/src/pipe.c b/src/pipe.c index 732ea8aee..4a651fefd 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -79,7 +79,7 @@ void * send_loop(void *ctx) if (ret < 0) error("Failed to allocate memory for receive pool."); - ret = pool_get_many(&send_pool, (void **) smps, node->vectorize); + ret = sample_get_many(&send_pool, smps, node->vectorize); if (ret < 0) error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret); @@ -118,7 +118,7 @@ void * recv_loop(void *ctx) if (ret < 0) error("Failed to allocate memory for receive pool."); - ret = pool_get_many(&recv_pool, (void **) smps, node->vectorize); + ret = sample_get_many(&recv_pool, smps, node->vectorize); if (ret < 0) error("Failed to get %u samples out of receive pool (%d).", node->vectorize, ret); From 4d67e6ef4b2614e721694ed91365c60955b51b37 Mon Sep 17 00:00:00 2001 From: Steffen Vogel Date: Sat, 10 Sep 2016 22:19:07 -0400 Subject: [PATCH 10/10] various smaller cleanups before merge --- Makefile | 1 - etc/loopback.conf | 12 ++++++------ src/pipe.c | 48 +++++++++++++++++++++++------------------------ 3 files changed, 30 insertions(+), 31 deletions(-) diff --git a/Makefile b/Makefile index 8001ed98b..c917c7e08 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,6 @@ LDLIBS = -pthread -lrt -lm -lconfig -lvillas CFLAGS += -std=c11 -Iinclude/ -I. -MMD -mcx16 CFLAGS += -Wall -fdiagnostics-color=auto CFLAGS += -D_GIT_REV='"$(GIT_REV)"' -D_POSIX_C_SOURCE=200809L -D_GNU_SOURCE=1 -DV=$(V) -#CFLAGS += -pedantic -std=c11 LDFLAGS += -L. -Wl,-rpath,'$$ORIGIN' # pkg-config dependencies diff --git a/etc/loopback.conf b/etc/loopback.conf index ef63d7977..803c0b432 100644 --- a/etc/loopback.conf +++ b/etc/loopback.conf @@ -32,11 +32,11 @@ nodes = { node1 = { type = "socket", layer = "udp", - local = "127.0.0.1:12000", # Local ip:port, use '*' for random port + local = "*:12000", # Local ip:port, use '*' for random port remote = "127.0.0.1:12001", combine = 5, - app_hdr = "gtskt", # app_hdr can be gtskt or default. If not provided, default header will be used - #vectorize = 1, # number of samples to fetch per iteration from the socket + header = "villas", # app_hdr can be gtskt or default. If not provided, default header will be used + vectorize = 1, # number of samples to fetch per iteration from the socket netem = { enabled = false, delay = 1000000, @@ -47,10 +47,10 @@ nodes = { node2 = { type = "socket", layer = "udp", - local = "127.0.0.1:12001", # Local ip:port, use '*' for random port + local = "*:12001", # Local ip:port, use '*' for random port remote = "127.0.0.1:12002", - combine = 30, - app_hdr = "gtskt" + vectorize = 30, + header = "villas" } }; diff --git a/src/pipe.c b/src/pipe.c index 4a651fefd..b2753fba2 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -38,18 +38,18 @@ static void quit(int signal, siginfo_t *sinfo, void *ctx) { pthread_cancel(recv_thread); pthread_cancel(send_thread); - + pthread_join(recv_thread, NULL); pthread_join(send_thread, NULL); - + node_stop(node); node_deinit(node->_vt); - + pool_destroy(&recv_pool); pool_destroy(&send_pool); - + list_destroy(&nodes, (dtor_cb_t) node_destroy, false); - + info(GRN("Goodbye!")); exit(EXIT_SUCCESS); } @@ -73,12 +73,12 @@ void * send_loop(void *ctx) { int ret; struct sample *smps[node->vectorize]; - + /* Initialize memory */ ret = pool_init_mmap(&send_pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize); if (ret < 0) error("Failed to allocate memory for receive pool."); - + ret = sample_get_many(&send_pool, smps, node->vectorize); if (ret < 0) error("Failed to get %u samples out of send pool (%d).", node->vectorize, ret); @@ -95,7 +95,7 @@ void * send_loop(void *ctx) if (feof(stdin)) return NULL; else { - warn("Skipped invalid message message: reason=%d", reason); + warn("Skipped invalid message message from stdin: reason=%d", reason); retry = 1; } } @@ -112,16 +112,16 @@ void * recv_loop(void *ctx) { int ret; struct sample *smps[node->vectorize]; - + /* Initialize memory */ ret = pool_init_mmap(&recv_pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize); if (ret < 0) error("Failed to allocate memory for receive pool."); - + ret = sample_get_many(&recv_pool, smps, node->vectorize); - if (ret < 0) + if (ret < 0) error("Failed to get %u samples out of receive pool (%d).", node->vectorize, ret); - + /* Print header */ fprintf(stdout, "# %-20s\t\t%s\n", "sec.nsec+offset", "data[]"); @@ -134,20 +134,20 @@ void * recv_loop(void *ctx) fflush(stdout); } } - + return NULL; } int main(int argc, char *argv[]) { bool send = true, recv = true, reverse = false; - + /* Parse command line arguments */ if (argc < 3) usage(argv[0]); - + log_init(); - + char c; while ((c = getopt(argc-2, argv+2, "hxrsd:")) != -1) { switch (c) { @@ -168,7 +168,7 @@ int main(int argc, char *argv[]) usage(argv[0]); } } - + /* Setup signals */ struct sigaction sa_quit = { .sa_flags = SA_SIGINFO, @@ -178,26 +178,26 @@ int main(int argc, char *argv[]) sigemptyset(&sa_quit.sa_mask); sigaction(SIGTERM, &sa_quit, NULL); sigaction(SIGINT, &sa_quit, NULL); - + /* Initialize log, configuration.. */ config_t config; - + /* Create lists */ list_init(&nodes); config_init(&config); config_parse(argv[1], &config, &settings, &nodes, NULL); - + /* Initialize node */ node = list_lookup(&nodes, argv[2]); if (!node) error("Node '%s' does not exist!", argv[2]); - + if (reverse) node_reverse(node); - node_init(node->_vt, argc-optind, argv+optind, config_root_setting(&config)); - + node_init(node->_vt, argc-optind, argv+optind, config_root_setting(&config)); + node_start(node); /* Start threads */ @@ -205,7 +205,7 @@ int main(int argc, char *argv[]) pthread_create(&recv_thread, NULL, recv_loop, NULL); if (send) pthread_create(&send_thread, NULL, send_loop, NULL); - + for (;;) pause();