1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'gtnet_features' into 'develop'

Endianess setting for GTNET-SKT header

Hi there,

I did a bit of rework on Umar's work.

This unveiled some more severe bugs (my faults).
So, I am quite happy that this kind of code review helps everybody :-)

As last time, I assign this MR to @umar.farooq. Have a look over it, and merge if you are happy.

See merge request !8
This commit is contained in:
Umar Farooq 2016-11-07 14:56:59 +01:00
commit 21f5ca3dff
15 changed files with 212 additions and 48 deletions

View file

@ -38,6 +38,24 @@ See below for a more detailed description of this feature.
#### `layer` *("udp" | "ip" | "eth")*
Select the network layer which should be used for the socket. Please note that `eth` can only be used locally in a LAN as it contains no routing information for the internet.
#### `header` *("default" | "none" | "fake")*
The socket node-type supports multiple protocols:
- The `default` VILLASnode header includes a couple of fields like the origin timestamp, number of values and the endianess of the transported data. The packet format is described in the following section called "Packet Format".
- It is also possible to just send raw data by omitting the header completely (`none`). Each value is expected to take 4 bytes. It can be either a single precission floating point number (`float`) or a 32 bit unsigned integer (`uint32_t`). This protocol is used by RTDS' GTNET-SKT card.
- The `fake` setting is very similar to the `none` setting. Only the first three values will have a special interpretation:
- Sequence no. (`uint32_t`)
- Timestamp seconds (Unix epoch, `uint32_t`)
- Timestamp nano-seconds (Unix epoch, `uint32_t`)
#### `endian` *("big" | "network" | "little")*
This setting is only valid for the `none` and `fake` protocols.
It select the endianes which is used for outgoing and incoming data.
### Example
nodes = {
@ -52,7 +70,17 @@ See below for a more detailed description of this feature.
# ip Send / recv IP packets
# eth Send / recv raw Ethernet frames (IEEE802.3)
header = "gtnet-skt:fake", # Header can be one of:
# default | villas Use VILLASnode protocol (see struct msg) (default)
# none | gtnet-skt Use no header, send raw data as used by RTDS GTNETv2-SKT
# fake | gtnet-skt:fake Same as 'none', but use first three data values as
# sequence, seconds & nanoseconds timestamp
# In this mode values are uint32_t not floats!
endian = "network", # Endianess of header and data:
# big | network Use big endianess. Also know as network byte order (default)
# little Use little endianess.
local = "127.0.0.1:12001", # This node only received messages on this IP:Port pair
remote = "127.0.0.1:12000" # This node sents outgoing messages to this IP:Port pair

View file

@ -38,6 +38,9 @@ nodes = {
# sequence, seconds & nanoseconds timestamp
# In this mode values are uint32_t not floats!
endian = "network", # Endianess of header and data:
# big | network Use big endianess. Also know as network byte order (default)
# little Use little endianess.
local = "127.0.0.1:12001", # This node only received messages on this IP:Port pair
remote = "127.0.0.1:12000" # This node sents outgoing messages to this IP:Port pair

63
etc/gtnet-skt/test5.conf Normal file
View file

@ -0,0 +1,63 @@
# This is an example for a minimal loopback configuration.
#
# All messages will be sent back to the origin using UDP packets.
#
# You can use this configuration in conjunction with the 'send', 'receive' and 'random'
# utilities as shown below (run all three steps in parallel).
#
# 0. Overview:
#
# ./signal --PIPE--> ./pipe --UDP--> ./node --UDP--> ./pipe
#
# 1. Start server:
#
# $ ./node etc/loopback.conf
#
# 2. Send random data to server:
#
# $ ./signal random -r 10 -v 4 | ./pipe etc/loopback.conf node1
#
# 3. Receive data from server:
#
# $ ./pipe etc/loopback.conf node2
#
# Author: Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
# Copyright: 2016, Institute for Automation of Complex Power Systems, EONERC
##
stats = 1;
debug = 10;
nodes = {
node1 = {
type = "socket",
layer = "udp",
local = "134.130.169.31:12002", # Local ip:port, use '*' for random port
remote = "134.130.169.98:12001",
header = "gtnet-skt:fake", # 'gtnet-skt' or 'villas'. If not provided, 'villas' header will be used
endian = "big", # Can be 'little' or 'small'. If not provided (default), little endianness logic will be applied
vectorize = 1, # Number of samples to fetch per iteration from the socket
netem = {
enabled = false,
delay = 1000000, # In micro seconds!
jitter = 300000,
distribution = "normal"
}
},
node2 = {
type = "socket",
layer = "udp",
local = "192.168.88.128:12004", # Local ip:port, use '*' for random port
remote = "192.168.88.129:12001",
header = "gtnet-skt", # 'gtnet-skt' or 'villas'. If not provided, 'villas' header will be used
vectorize = 1 # Number of samples to fetch per iteration from the socket
}
};
paths = (
{
in = "node1", # Name of the node we listen to (see above)
out = "node1", # And we loop back to the origin
hook = ["print"]
}
);

View file

@ -44,6 +44,11 @@ struct memzone {
size_t len;
};
/** Allocate \p len bytes memory of type \p m.
*
* @retval NULL If allocation failed.
* @retval <>0 If allocation was successful.
*/
void * memory_alloc(const struct memtype *m, size_t len);
void * memory_alloc_aligned(const struct memtype *m, size_t len, size_t alignment);

View file

@ -16,7 +16,7 @@
struct node;
/** Swaps message contents byte-order.
/** Swaps the byte order of the header part of struct msg.
*
* Message can either be transmitted in little or big endian
* format. The actual endianess for a message is defined by the
@ -27,7 +27,7 @@ struct node;
*
* @param m A pointer to the message
*/
void msg_swap(struct msg *m);
void msg_hdr_swap(struct msg *m);
/** Check the consistency of a message.
*

View file

@ -47,7 +47,7 @@ struct node
int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
int affinity; /**< CPU Affinity of this node */
unsigned long sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */
enum node_state {

View file

@ -44,6 +44,7 @@ union sockaddr_union {
struct socket {
int sd; /**> The socket descriptor */
int mark; /**> Socket mark for netem, routing and filtering */
int endian; /** Endianness of the data sent/received by the node */
enum socket_layer layer; /**> The OSI / IP layer which should be used for this socket */
enum socket_header header; /**> Payload header type */

View file

@ -45,6 +45,8 @@ struct sample {
atomic_int refcnt; /**< Reference counter. */
struct pool *pool; /**< This sample is belong to this memory pool. */
int endian; /**< Endianess of data in the sample. */
/** All timestamps are seconds / nano seconds after 1.1.1970 UTC */
struct {

View file

@ -55,7 +55,14 @@
#define ALIGN_MASK(x, m) (((uintptr_t) (x) + (m)) & ~(m))
#define IS_ALIGNED(x, a) (ALIGN(x, a) == (uintptr_t) x)
#define CEIL(x, y) ((x + y - 1) / y)
/** Round-up integer division */
#define CEIL(x, y) (((x) + (y) - 1) / (y))
/** Get nearest up-rounded power of 2 */
#define LOG2_CEIL(x) (1 << (log2i((x) - 1) + 1))
/** Check if the number is a power of 2 */
#define IS_POW2(x) (((x) != 0) && !((x) & ((x) - 1)))
/** Get nearest up-rounded power of 2 */
#define LOG2_CEIL(x) (1 << (log2i((x) - 1) + 1))
@ -224,6 +231,13 @@ __attribute__((always_inline)) static inline uint64_t rdtsc()
return tsc;
}
/** Get log2 of long long integers */
static inline int log2i(long long x) {
assert(x > 0);
return sizeof(x) * 8 - __builtin_clzll(x) - 1;
}
/** Sleep with rdtsc */
void rdtsc_sleep(uint64_t nanosecs, uint64_t start);

View file

@ -204,6 +204,11 @@ int cfg_parse_path(config_setting_t *cfg,
if (!config_setting_lookup_float(cfg, "rate", &p->rate))
p->rate = 0; /* disabled */
if (!IS_POW2(p->queuelen)) {
p->queuelen = LOG2_CEIL(p->queuelen);
warn("Queue length should always be a power of 2. Adjusting to %d", p->queuelen);
}
p->cfg = cfg;
list_push(paths, p);

View file

@ -19,20 +19,20 @@
void * memory_alloc(const struct memtype *m, size_t len)
{
debug(DBG_MEM | 2, "Allocating %zu byte of %s memory", len, m->name);
debug(DBG_MEM | 2, "Allocating %#zx bytes of %s memory", len, m->name);
return m->alloc(len);
}
void * memory_alloc_aligned(const struct memtype *m, size_t len, size_t alignment)
{
debug(DBG_MEM | 2, "Allocating %zu byte of %zu-byte-aligned %s memory", len, alignment, m->name);
debug(DBG_MEM | 2, "Allocating %#zx bytes of %#zx-byte-aligned %s memory", len, alignment, m->name);
warn("%s: not implemented yet!", __FUNCTION__);
return memory_alloc(m, len);
}
int memory_free(const struct memtype *m, void *ptr, size_t len)
{
debug(DBG_MEM | 2, "Releasing %zu bytes of %s memory", len, m->name);
debug(DBG_MEM | 2, "Releasing %#zx bytes of %s memory", len, m->name);
return m->free(ptr, len);
}
@ -60,7 +60,14 @@ static void * memory_hugepage_alloc(size_t len)
flags |= MAP_HUGETLB | MAP_LOCKED;
#endif
return mmap(NULL, len, prot, flags, -1, 0);
void *ret = mmap(NULL, len, prot, flags, -1, 0);
if (ret == MAP_FAILED) {
info("Failed to allocate huge pages: Check https://www.kernel.org/doc/Documentation/vm/hugetlbpage.txt");
return NULL;
}
return ret;
}
static int memory_hugepage_free(void *ptr, size_t len)

View file

@ -22,15 +22,12 @@
#include "node.h"
#include "utils.h"
void msg_swap(struct msg *m)
void msg_hdr_swap(struct msg *m)
{
m->length = bswap_16(m->length);
m->sequence = bswap_32(m->sequence);
m->ts.sec = bswap_32(m->ts.sec);
m->ts.nsec = bswap_32(m->ts.nsec);
for (int i = 0; i < m->length; i++)
m->data[i].i = bswap_32(m->data[i].i);
m->endian ^= 1;
}

View file

@ -19,6 +19,14 @@
#include <linux/if_packet.h>
#include <arpa/inet.h>
#ifdef __linux__
#include <byteswap.h>
#elif defined(__PPC__) /* Xilinx toolchain */
#include <xil_io.h>
#define bswap_16(x) Xil_EndianSwap16(x)
#define bswap_32(x) Xil_EndianSwap32(x)
#endif
#include "nodes/socket.h"
#include "config.h"
#include "utils.h"
@ -94,7 +102,7 @@ int socket_deinit()
char * socket_print(struct node *n)
{
struct socket *s = n->_vd;
char *layer = NULL, *header = NULL, *buf;
char *layer = NULL, *header = NULL, *endian = NULL, *buf;
switch (s->layer) {
case SOCKET_LAYER_UDP: layer = "udp"; break;
@ -107,11 +115,19 @@ char * socket_print(struct node *n)
case SOCKET_HEADER_FAKE: header = "fake"; break;
case SOCKET_HEADER_DEFAULT: header = "default"; break;
}
if (s->header == SOCKET_HEADER_DEFAULT)
endian = "auto";
else
switch (s->endian) {
case MSG_ENDIAN_LITTLE: endian = "little"; break;
case MSG_ENDIAN_BIG: endian = "big"; break;
}
char *local = socket_print_addr((struct sockaddr *) &s->local);
char *remote = socket_print_addr((struct sockaddr *) &s->remote);
buf = strf("layer=%s, header=%s, local=%s, remote=%s", layer, header, local, remote);
buf = strf("layer=%s, header=%s, endian=%s, local=%s, remote=%s", layer, header, endian, local, remote);
free(local);
free(remote);
@ -253,8 +269,21 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
recv(s->sd, NULL, 0, 0); /* empty receive buffer */
return -1;
}
/* Convert message to host endianess */
if (s->endian != MSG_ENDIAN_HOST) {
for (int i = 0; i < ARRAY_LEN(header); i++)
header[i] = bswap_32(header[i]);
for (int i = 0; i < bytes / SAMPLE_DATA_LEN(1); i++)
smp->data[i].i = bswap_32(smp->data[i].i);
}
length = (s->header == SOCKET_HEADER_FAKE ? bytes - sizeof(header) : bytes) / SAMPLE_DATA_LEN(1);
if (s->header == SOCKET_HEADER_FAKE)
length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1);
else
length = bytes / SAMPLE_DATA_LEN(1);
if (length > smp->capacity) {
warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity);
length = smp->capacity;
@ -303,7 +332,7 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
/* Convert message to host endianess */
if (hdr.endian != MSG_ENDIAN_HOST)
msg_swap(&hdr);
msg_hdr_swap(&hdr);
samples = bytes / MSG_LEN(hdr.length);
if (samples > cnt) {
@ -344,8 +373,12 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
break;
/* Convert message to host endianess */
if (m->endian != MSG_ENDIAN_HOST)
msg_swap(m);
if (m->endian != MSG_ENDIAN_HOST) {
msg_hdr_swap(m);
for (int i = 0; i < m->length; i++)
smp->data[i].i = bswap_32(smp->data[i].i);
}
smp->length = m->length;
smp->sequence = m->sequence;
@ -371,33 +404,27 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
if (cnt < 1)
return 0;
for (int i = 0; i < cnt; i++) {
int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1;
struct iovec iov[iov_len];
for (int i = 0; i < cnt; i++) {
int off = s->header == SOCKET_HEADER_FAKE ? 3 : 0;
int len = smps[i]->length + off;
uint32_t data[len];
/* First three values are sequence, seconds and nano-seconds timestamps */
uint32_t header[3];
if (s->header == SOCKET_HEADER_FAKE) {
header[0] = smps[i]->sequence;
header[1] = smps[i]->ts.origin.tv_sec;
header[2] = smps[i]->ts.origin.tv_nsec;
iov[0].iov_base = header;
iov[0].iov_len = sizeof(header);
data[0] = smps[i]->sequence;
data[1] = smps[i]->ts.origin.tv_sec;
data[2] = smps[i]->ts.origin.tv_nsec;
}
/* Remaining values are payload */
iov[iov_len-1].iov_base = &smps[i]->data;
iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smps[i]->length);
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = iov_len,
.msg_name = (struct sockaddr *) &s->remote,
.msg_namelen = sizeof(s->remote)
};
for (int j = 0; j < smps[i]->length; j++) {
if (s->endian == MSG_ENDIAN_HOST)
data[off + j] = smps[i]->data[j].i;
else
data[off + j] = bswap_32(smps[i]->data[j].i);
}
bytes = sendmsg(s->sd, &mhdr, 0);
bytes = sendto(s->sd, data, len * sizeof(data[0]), 0,
(struct sockaddr *) &s->remote, sizeof(s->remote));
if (bytes < 0)
serror("Failed send to node %s", node_name(n));
@ -445,7 +472,7 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
int socket_parse(struct node *n, config_setting_t *cfg)
{
const char *local, *remote, *layer, *hdr;
const char *local, *remote, *layer, *hdr, *endian;
int ret;
struct socket *s = n->_vd;
@ -477,6 +504,17 @@ int socket_parse(struct node *n, config_setting_t *cfg)
else
cerror(cfg, "Invalid application header type '%s' for node %s", hdr, node_name(n));
}
if (!config_setting_lookup_string(cfg, "endian", &endian))
s->endian = MSG_ENDIAN_BIG;
else {
if (!strcmp(endian, "big") || !strcmp(endian, "network"))
s->endian = MSG_ENDIAN_BIG;
else if (!strcmp(endian, "little"))
s->endian = MSG_ENDIAN_LITTLE;
else
cerror(cfg, "Invalid endianness type '%s' for node %s", endian, node_name(n));
}
if (!config_setting_lookup_string(cfg, "remote", &remote))
cerror(cfg, "Missing remote address for node %s", node_name(n));

View file

@ -32,14 +32,16 @@
*/
#include "queue.h"
#include "utils.h"
/** Initialize MPMC queue */
int queue_init(struct queue *q, size_t size, const struct memtype *mem)
{
/* Queue size must be 2 exponent */
if ((size < 2) || ((size & (size - 1)) != 0))
if (!IS_POW2(size))
return -1;
q->mem = mem;
q->buffer_mask = size - 1;
q->buffer = memory_alloc(q->mem, sizeof(q->buffer[0]) * size);
@ -57,7 +59,7 @@ int queue_init(struct queue *q, size_t size, const struct memtype *mem)
int queue_destroy(struct queue *q)
{
return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(sizeof(q->buffer[0])));
return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(q->buffer[0]));
}
/** Return estimation of current queue usage.
@ -108,7 +110,6 @@ int queue_pull(struct queue *q, void **ptr)
pos = atomic_load_explicit(&q->head, memory_order_relaxed);
for (;;) {
cell = &q->buffer[pos & q->buffer_mask];
seq = atomic_load_explicit(&cell->sequence, memory_order_acquire);
diff = (intptr_t) seq - (intptr_t) (pos + 1);

View file

@ -96,7 +96,7 @@ static void * send_loop(void *ctx)
sendd.started = true;
/* Initialize memory */
ret = pool_init(&sendd.pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize, &memtype_hugepage);
ret = pool_init(&sendd.pool, node->vectorize, SAMPLE_LEN(DEFAULT_VALUES), &memtype_hugepage);
if (ret < 0)
error("Failed to allocate memory for receive pool.");
@ -140,7 +140,7 @@ static void * recv_loop(void *ctx)
recvv.started = true;
/* Initialize memory */
ret = pool_init(&recvv.pool, SAMPLE_LEN(DEFAULT_VALUES), node->vectorize, &memtype_hugepage);
ret = pool_init(&recvv.pool, node->vectorize, SAMPLE_LEN(DEFAULT_VALUES), &memtype_hugepage);
if (ret < 0)
error("Failed to allocate memory for receive pool.");