diff --git a/doc/nodes/Socket.md b/doc/nodes/Socket.md index 486453be8..5f6c43bf9 100644 --- a/doc/nodes/Socket.md +++ b/doc/nodes/Socket.md @@ -38,6 +38,24 @@ See below for a more detailed description of this feature. #### `layer` *("udp" | "ip" | "eth")* +Select the network layer which should be used for the socket. Please note that `eth` can only be used locally in a LAN as it contains no routing information for the internet. + +#### `header` *("default" | "none" | "fake")* + +The socket node-type supports multiple protocols: + +- The `default` VILLASnode header includes a couple of fields like the origin timestamp, number of values and the endianess of the transported data. The packet format is described in the following section called "Packet Format". +- It is also possible to just send raw data by omitting the header completely (`none`). Each value is expected to take 4 bytes. It can be either a single precission floating point number (`float`) or a 32 bit unsigned integer (`uint32_t`). This protocol is used by RTDS' GTNET-SKT card. +- The `fake` setting is very similar to the `none` setting. Only the first three values will have a special interpretation: + - Sequence no. (`uint32_t`) + - Timestamp seconds (Unix epoch, `uint32_t`) + - Timestamp nano-seconds (Unix epoch, `uint32_t`) + +#### `endian` *("big" | "network" | "little")* + +This setting is only valid for the `none` and `fake` protocols. +It select the endianes which is used for outgoing and incoming data. + ### Example nodes = { @@ -52,7 +70,17 @@ See below for a more detailed description of this feature. # ip Send / recv IP packets # eth Send / recv raw Ethernet frames (IEEE802.3) - + header = "gtnet-skt:fake", # Header can be one of: + # default | villas Use VILLASnode protocol (see struct msg) (default) + # none | gtnet-skt Use no header, send raw data as used by RTDS GTNETv2-SKT + # fake | gtnet-skt:fake Same as 'none', but use first three data values as + # sequence, seconds & nanoseconds timestamp + # In this mode values are uint32_t not floats! + + endian = "network", # Endianess of header and data: + # big | network Use big endianess. Also know as network byte order (default) + # little Use little endianess. + local = "127.0.0.1:12001", # This node only received messages on this IP:Port pair remote = "127.0.0.1:12000" # This node sents outgoing messages to this IP:Port pair diff --git a/etc/example.conf b/etc/example.conf index 52e24179f..177537cbe 100644 --- a/etc/example.conf +++ b/etc/example.conf @@ -38,6 +38,9 @@ nodes = { # sequence, seconds & nanoseconds timestamp # In this mode values are uint32_t not floats! + endian = "network", # Endianess of header and data: + # big | network Use big endianess. Also know as network byte order (default) + # little Use little endianess. local = "127.0.0.1:12001", # This node only received messages on this IP:Port pair remote = "127.0.0.1:12000" # This node sents outgoing messages to this IP:Port pair diff --git a/etc/gtnet-skt/test5.conf b/etc/gtnet-skt/test5.conf new file mode 100644 index 000000000..a8d9ce9d9 --- /dev/null +++ b/etc/gtnet-skt/test5.conf @@ -0,0 +1,63 @@ +# This is an example for a minimal loopback configuration. +# +# All messages will be sent back to the origin using UDP packets. +# +# You can use this configuration in conjunction with the 'send', 'receive' and 'random' +# utilities as shown below (run all three steps in parallel). +# +# 0. Overview: +# +# ./signal --PIPE--> ./pipe --UDP--> ./node --UDP--> ./pipe +# +# 1. Start server: +# +# $ ./node etc/loopback.conf +# +# 2. Send random data to server: +# +# $ ./signal random -r 10 -v 4 | ./pipe etc/loopback.conf node1 +# +# 3. Receive data from server: +# +# $ ./pipe etc/loopback.conf node2 +# +# Author: Steffen Vogel +# Copyright: 2016, Institute for Automation of Complex Power Systems, EONERC +## + +stats = 1; +debug = 10; + +nodes = { + node1 = { + type = "socket", + layer = "udp", + local = "134.130.169.31:12002", # Local ip:port, use '*' for random port + remote = "134.130.169.98:12001", + header = "gtnet-skt:fake", # 'gtnet-skt' or 'villas'. If not provided, 'villas' header will be used + endian = "big", # Can be 'little' or 'small'. If not provided (default), little endianness logic will be applied + vectorize = 1, # Number of samples to fetch per iteration from the socket + netem = { + enabled = false, + delay = 1000000, # In micro seconds! + jitter = 300000, + distribution = "normal" + } + }, + node2 = { + type = "socket", + layer = "udp", + local = "192.168.88.128:12004", # Local ip:port, use '*' for random port + remote = "192.168.88.129:12001", + header = "gtnet-skt", # 'gtnet-skt' or 'villas'. If not provided, 'villas' header will be used + vectorize = 1 # Number of samples to fetch per iteration from the socket + } +}; + +paths = ( + { + in = "node1", # Name of the node we listen to (see above) + out = "node1", # And we loop back to the origin + hook = ["print"] + } +); diff --git a/include/villas/memory.h b/include/villas/memory.h index cf1d7acd5..f6e3064f5 100644 --- a/include/villas/memory.h +++ b/include/villas/memory.h @@ -44,6 +44,11 @@ struct memzone { size_t len; }; +/** Allocate \p len bytes memory of type \p m. + * + * @retval NULL If allocation failed. + * @retval <>0 If allocation was successful. + */ void * memory_alloc(const struct memtype *m, size_t len); void * memory_alloc_aligned(const struct memtype *m, size_t len, size_t alignment); diff --git a/include/villas/msg.h b/include/villas/msg.h index 8379969f5..2fb032f8c 100644 --- a/include/villas/msg.h +++ b/include/villas/msg.h @@ -16,7 +16,7 @@ struct node; -/** Swaps message contents byte-order. +/** Swaps the byte order of the header part of struct msg. * * Message can either be transmitted in little or big endian * format. The actual endianess for a message is defined by the @@ -27,7 +27,7 @@ struct node; * * @param m A pointer to the message */ -void msg_swap(struct msg *m); +void msg_hdr_swap(struct msg *m); /** Check the consistency of a message. * diff --git a/include/villas/node.h b/include/villas/node.h index c24841832..d57553402 100644 --- a/include/villas/node.h +++ b/include/villas/node.h @@ -47,7 +47,7 @@ struct node int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */ int affinity; /**< CPU Affinity of this node */ - + unsigned long sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */ enum node_state { diff --git a/include/villas/nodes/socket.h b/include/villas/nodes/socket.h index 4a0a6c337..9f5cc7750 100644 --- a/include/villas/nodes/socket.h +++ b/include/villas/nodes/socket.h @@ -44,6 +44,7 @@ union sockaddr_union { struct socket { int sd; /**> The socket descriptor */ int mark; /**> Socket mark for netem, routing and filtering */ + int endian; /** Endianness of the data sent/received by the node */ enum socket_layer layer; /**> The OSI / IP layer which should be used for this socket */ enum socket_header header; /**> Payload header type */ diff --git a/include/villas/sample.h b/include/villas/sample.h index 5d2db263d..ec58f8bd8 100644 --- a/include/villas/sample.h +++ b/include/villas/sample.h @@ -45,6 +45,8 @@ struct sample { atomic_int refcnt; /**< Reference counter. */ struct pool *pool; /**< This sample is belong to this memory pool. */ + + int endian; /**< Endianess of data in the sample. */ /** All timestamps are seconds / nano seconds after 1.1.1970 UTC */ struct { diff --git a/include/villas/utils.h b/include/villas/utils.h index 8a50b6754..9a144278d 100644 --- a/include/villas/utils.h +++ b/include/villas/utils.h @@ -55,7 +55,14 @@ #define ALIGN_MASK(x, m) (((uintptr_t) (x) + (m)) & ~(m)) #define IS_ALIGNED(x, a) (ALIGN(x, a) == (uintptr_t) x) -#define CEIL(x, y) ((x + y - 1) / y) +/** Round-up integer division */ +#define CEIL(x, y) (((x) + (y) - 1) / (y)) + +/** Get nearest up-rounded power of 2 */ +#define LOG2_CEIL(x) (1 << (log2i((x) - 1) + 1)) + +/** Check if the number is a power of 2 */ +#define IS_POW2(x) (((x) != 0) && !((x) & ((x) - 1))) /** Get nearest up-rounded power of 2 */ #define LOG2_CEIL(x) (1 << (log2i((x) - 1) + 1)) @@ -224,6 +231,13 @@ __attribute__((always_inline)) static inline uint64_t rdtsc() return tsc; } +/** Get log2 of long long integers */ +static inline int log2i(long long x) { + assert(x > 0); + + return sizeof(x) * 8 - __builtin_clzll(x) - 1; +} + /** Sleep with rdtsc */ void rdtsc_sleep(uint64_t nanosecs, uint64_t start); diff --git a/lib/cfg.c b/lib/cfg.c index 33fe11ed6..d2781631e 100644 --- a/lib/cfg.c +++ b/lib/cfg.c @@ -204,6 +204,11 @@ int cfg_parse_path(config_setting_t *cfg, if (!config_setting_lookup_float(cfg, "rate", &p->rate)) p->rate = 0; /* disabled */ + if (!IS_POW2(p->queuelen)) { + p->queuelen = LOG2_CEIL(p->queuelen); + warn("Queue length should always be a power of 2. Adjusting to %d", p->queuelen); + } + p->cfg = cfg; list_push(paths, p); diff --git a/lib/memory.c b/lib/memory.c index 7525a1c0c..2f46161b3 100644 --- a/lib/memory.c +++ b/lib/memory.c @@ -19,20 +19,20 @@ void * memory_alloc(const struct memtype *m, size_t len) { - debug(DBG_MEM | 2, "Allocating %zu byte of %s memory", len, m->name); + debug(DBG_MEM | 2, "Allocating %#zx bytes of %s memory", len, m->name); return m->alloc(len); } void * memory_alloc_aligned(const struct memtype *m, size_t len, size_t alignment) { - debug(DBG_MEM | 2, "Allocating %zu byte of %zu-byte-aligned %s memory", len, alignment, m->name); + debug(DBG_MEM | 2, "Allocating %#zx bytes of %#zx-byte-aligned %s memory", len, alignment, m->name); warn("%s: not implemented yet!", __FUNCTION__); return memory_alloc(m, len); } int memory_free(const struct memtype *m, void *ptr, size_t len) { - debug(DBG_MEM | 2, "Releasing %zu bytes of %s memory", len, m->name); + debug(DBG_MEM | 2, "Releasing %#zx bytes of %s memory", len, m->name); return m->free(ptr, len); } @@ -60,7 +60,14 @@ static void * memory_hugepage_alloc(size_t len) flags |= MAP_HUGETLB | MAP_LOCKED; #endif - return mmap(NULL, len, prot, flags, -1, 0); + void *ret = mmap(NULL, len, prot, flags, -1, 0); + + if (ret == MAP_FAILED) { + info("Failed to allocate huge pages: Check https://www.kernel.org/doc/Documentation/vm/hugetlbpage.txt"); + return NULL; + } + + return ret; } static int memory_hugepage_free(void *ptr, size_t len) diff --git a/lib/msg.c b/lib/msg.c index 704e348c2..70b277d31 100644 --- a/lib/msg.c +++ b/lib/msg.c @@ -22,15 +22,12 @@ #include "node.h" #include "utils.h" -void msg_swap(struct msg *m) +void msg_hdr_swap(struct msg *m) { m->length = bswap_16(m->length); m->sequence = bswap_32(m->sequence); m->ts.sec = bswap_32(m->ts.sec); m->ts.nsec = bswap_32(m->ts.nsec); - - for (int i = 0; i < m->length; i++) - m->data[i].i = bswap_32(m->data[i].i); m->endian ^= 1; } diff --git a/lib/nodes/socket.c b/lib/nodes/socket.c index d28644836..000fc123a 100644 --- a/lib/nodes/socket.c +++ b/lib/nodes/socket.c @@ -19,6 +19,14 @@ #include #include +#ifdef __linux__ + #include +#elif defined(__PPC__) /* Xilinx toolchain */ + #include + #define bswap_16(x) Xil_EndianSwap16(x) + #define bswap_32(x) Xil_EndianSwap32(x) +#endif + #include "nodes/socket.h" #include "config.h" #include "utils.h" @@ -94,7 +102,7 @@ int socket_deinit() char * socket_print(struct node *n) { struct socket *s = n->_vd; - char *layer = NULL, *header = NULL, *buf; + char *layer = NULL, *header = NULL, *endian = NULL, *buf; switch (s->layer) { case SOCKET_LAYER_UDP: layer = "udp"; break; @@ -107,11 +115,19 @@ char * socket_print(struct node *n) case SOCKET_HEADER_FAKE: header = "fake"; break; case SOCKET_HEADER_DEFAULT: header = "default"; break; } + + if (s->header == SOCKET_HEADER_DEFAULT) + endian = "auto"; + else + switch (s->endian) { + case MSG_ENDIAN_LITTLE: endian = "little"; break; + case MSG_ENDIAN_BIG: endian = "big"; break; + } char *local = socket_print_addr((struct sockaddr *) &s->local); char *remote = socket_print_addr((struct sockaddr *) &s->remote); - buf = strf("layer=%s, header=%s, local=%s, remote=%s", layer, header, local, remote); + buf = strf("layer=%s, header=%s, endian=%s, local=%s, remote=%s", layer, header, endian, local, remote); free(local); free(remote); @@ -253,8 +269,21 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) recv(s->sd, NULL, 0, 0); /* empty receive buffer */ return -1; } + + /* Convert message to host endianess */ + if (s->endian != MSG_ENDIAN_HOST) { + for (int i = 0; i < ARRAY_LEN(header); i++) + header[i] = bswap_32(header[i]); + + for (int i = 0; i < bytes / SAMPLE_DATA_LEN(1); i++) + smp->data[i].i = bswap_32(smp->data[i].i); + } - length = (s->header == SOCKET_HEADER_FAKE ? bytes - sizeof(header) : bytes) / SAMPLE_DATA_LEN(1); + if (s->header == SOCKET_HEADER_FAKE) + length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1); + else + length = bytes / SAMPLE_DATA_LEN(1); + if (length > smp->capacity) { warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity); length = smp->capacity; @@ -303,7 +332,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) /* Convert message to host endianess */ if (hdr.endian != MSG_ENDIAN_HOST) - msg_swap(&hdr); + msg_hdr_swap(&hdr); samples = bytes / MSG_LEN(hdr.length); if (samples > cnt) { @@ -344,8 +373,12 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt) break; /* Convert message to host endianess */ - if (m->endian != MSG_ENDIAN_HOST) - msg_swap(m); + if (m->endian != MSG_ENDIAN_HOST) { + msg_hdr_swap(m); + + for (int i = 0; i < m->length; i++) + smp->data[i].i = bswap_32(smp->data[i].i); + } smp->length = m->length; smp->sequence = m->sequence; @@ -371,33 +404,27 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) if (cnt < 1) return 0; - for (int i = 0; i < cnt; i++) { - int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1; - struct iovec iov[iov_len]; - + for (int i = 0; i < cnt; i++) { + int off = s->header == SOCKET_HEADER_FAKE ? 3 : 0; + int len = smps[i]->length + off; + uint32_t data[len]; + /* First three values are sequence, seconds and nano-seconds timestamps */ - uint32_t header[3]; if (s->header == SOCKET_HEADER_FAKE) { - header[0] = smps[i]->sequence; - header[1] = smps[i]->ts.origin.tv_sec; - header[2] = smps[i]->ts.origin.tv_nsec; - - iov[0].iov_base = header; - iov[0].iov_len = sizeof(header); + data[0] = smps[i]->sequence; + data[1] = smps[i]->ts.origin.tv_sec; + data[2] = smps[i]->ts.origin.tv_nsec; } - /* Remaining values are payload */ - iov[iov_len-1].iov_base = &smps[i]->data; - iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smps[i]->length); - - struct msghdr mhdr = { - .msg_iov = iov, - .msg_iovlen = iov_len, - .msg_name = (struct sockaddr *) &s->remote, - .msg_namelen = sizeof(s->remote) - }; + for (int j = 0; j < smps[i]->length; j++) { + if (s->endian == MSG_ENDIAN_HOST) + data[off + j] = smps[i]->data[j].i; + else + data[off + j] = bswap_32(smps[i]->data[j].i); + } - bytes = sendmsg(s->sd, &mhdr, 0); + bytes = sendto(s->sd, data, len * sizeof(data[0]), 0, + (struct sockaddr *) &s->remote, sizeof(s->remote)); if (bytes < 0) serror("Failed send to node %s", node_name(n)); @@ -445,7 +472,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt) int socket_parse(struct node *n, config_setting_t *cfg) { - const char *local, *remote, *layer, *hdr; + const char *local, *remote, *layer, *hdr, *endian; int ret; struct socket *s = n->_vd; @@ -477,6 +504,17 @@ int socket_parse(struct node *n, config_setting_t *cfg) else cerror(cfg, "Invalid application header type '%s' for node %s", hdr, node_name(n)); } + + if (!config_setting_lookup_string(cfg, "endian", &endian)) + s->endian = MSG_ENDIAN_BIG; + else { + if (!strcmp(endian, "big") || !strcmp(endian, "network")) + s->endian = MSG_ENDIAN_BIG; + else if (!strcmp(endian, "little")) + s->endian = MSG_ENDIAN_LITTLE; + else + cerror(cfg, "Invalid endianness type '%s' for node %s", endian, node_name(n)); + } if (!config_setting_lookup_string(cfg, "remote", &remote)) cerror(cfg, "Missing remote address for node %s", node_name(n)); diff --git a/lib/queue.c b/lib/queue.c index 1d1b797bd..15d3b8996 100644 --- a/lib/queue.c +++ b/lib/queue.c @@ -32,14 +32,16 @@ */ #include "queue.h" +#include "utils.h" /** Initialize MPMC queue */ int queue_init(struct queue *q, size_t size, const struct memtype *mem) { + /* Queue size must be 2 exponent */ - if ((size < 2) || ((size & (size - 1)) != 0)) + if (!IS_POW2(size)) return -1; - + q->mem = mem; q->buffer_mask = size - 1; q->buffer = memory_alloc(q->mem, sizeof(q->buffer[0]) * size); @@ -57,7 +59,7 @@ int queue_init(struct queue *q, size_t size, const struct memtype *mem) int queue_destroy(struct queue *q) { - return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(sizeof(q->buffer[0]))); + return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(q->buffer[0])); } /** Return estimation of current queue usage. @@ -108,7 +110,6 @@ int queue_pull(struct queue *q, void **ptr) pos = atomic_load_explicit(&q->head, memory_order_relaxed); for (;;) { cell = &q->buffer[pos & q->buffer_mask]; - seq = atomic_load_explicit(&cell->sequence, memory_order_acquire); diff = (intptr_t) seq - (intptr_t) (pos + 1); diff --git a/src/pipe.c b/src/pipe.c index 334fa0302..75ca559e9 100644 --- a/src/pipe.c +++ b/src/pipe.c @@ -96,7 +96,7 @@ static void * send_loop(void *ctx) sendd.started = true; /* Initialize memory */ - ret = pool_init(&sendd.pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize, &memtype_hugepage); + ret = pool_init(&sendd.pool, node->vectorize, SAMPLE_LEN(DEFAULT_VALUES), &memtype_hugepage); if (ret < 0) error("Failed to allocate memory for receive pool."); @@ -140,7 +140,7 @@ static void * recv_loop(void *ctx) recvv.started = true; /* Initialize memory */ - ret = pool_init(&recvv.pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize, &memtype_hugepage); + ret = pool_init(&recvv.pool, node->vectorize, SAMPLE_LEN(DEFAULT_VALUES), &memtype_hugepage); if (ret < 0) error("Failed to allocate memory for receive pool.");