1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

Merge branch 'feature-socket-fake-header' into 'develop'

Make "fake" header option for gtnet-skt mode configurable per node

This change makes the "fake" header mode runtime-configurable for each individual node.
We do not use the the compile-time option anymore.

Please look at `etc/example.conf` on how to use it.

@umar.farooq Could you do a quick review and merge this request to the develop branch?
Just look at my changes for a second and press the green button below if you are happy :D

See merge request !6
This commit is contained in:
Umar Farooq 2016-10-22 19:29:35 +02:00
commit edd58f9fd2
29 changed files with 462 additions and 176 deletions

View file

@ -101,7 +101,7 @@ integration:
website:
stage: deploy
script:
- rsync web/ $DEPLOY_PATH/
- rsync -r web/ $DEPLOY_PATH/
only:
- develop
tags:

View file

@ -82,7 +82,7 @@ COPY thirdparty/libwebsockets /tmp/libwebsockets
RUN mkdir -p /tmp/libwebsockets/build && cd /tmp/libwebsockets/build && cmake .. && make install
# Cleanup intermediate files from builds
RUN rm -rf /tmp
RUN rm -rf /tmp/*
WORKDIR /villas

View file

@ -24,9 +24,6 @@
#define DEFAULT_VALUES 64
#define DEFAULT_QUEUELEN 1024
/** Whether or not to send / receive timestamp & sequence number as first values of payload */
#define GTNET_SKT_HEADER 1
/** Width of log output in characters */
#define LOG_WIDTH 132

View file

@ -26,16 +26,24 @@ nodes = {
### The following settings are specific to the socket node-type!! ###
layer = "udp" # Layer can be one of:
# udp Send / recv UDP packets
# ip Send / recv IP packets
# eth Send / recv raw Ethernet frames (IEEE802.3)
layer = "udp", # Layer can be one of:
# udp Send / receive UDP packets
# ip Send / receive IP packets
# eth Send / receive raw Ethernet frames (IEEE802.3)
header = "gtnet-skt:fake", # Header can be one of:
# default | villas Use VILLASnode protocol (see struct msg) (default)
# none | gtnet-skt Use no header, send raw data as used by RTDS GTNETv2-SKT
# fake | gtnet-skt:fake Same as 'none', but use first three data values as
# sequence, seconds & nanoseconds timestamp
# In this mode values are uint32_t not floats!
local = "127.0.0.1:12001", # This node only received messages on this IP:Port pair
remote = "127.0.0.1:12000" # This node sents outgoing messages to this IP:Port pair
combine = 30 # Receive and sent 30 samples per message (multiplexing).
vectorize = 30 # Receive and sent 30 samples per message (combining).
},
ethernet_node = {
type = "socket", # See above.
@ -141,7 +149,7 @@ paths = (
hook = "print", # Register custom hook funktion (see src/hooks.c)
poolsize = 30 # The amount of samples which are kept in a circular buffer.
# This number must be larger than the 'combine' settings of all
# This number must be larger than the 'vectorize' settings of all
# associated input and output nodes!
},
{

View file

@ -32,8 +32,8 @@ nodes = {
node1 = {
type = "socket",
layer = "udp",
local = "192.168.88.128:12002", # Local ip:port, use '*' for random port
remote = "192.168.88.129:12001",
local = "134.130.169.31:12002", # Local ip:port, use '*' for random port
remote = "134.130.169.98:12001",
header = "gtnet-skt", # 'gtnet-skt' or 'villas'. If not provided, 'villas' header will be used
vectorize = 1, # Number of samples to fetch per iteration from the socket
netem = {

View file

@ -22,6 +22,7 @@
#define _HOOKS_H_
#include <time.h>
#include <string.h>
#include "queue.h"
#include "list.h"

View file

@ -38,6 +38,7 @@ enum debug_facilities {
DBG_CONFIG = (1 << 10),
DBG_HOOK = (1 << 11),
DBG_PATH = (1 << 12),
DBG_MEM = (1 << 13),
/* Node-types */
DBG_SOCKET = (1 << 16),

View file

@ -47,6 +47,8 @@ struct node
int vectorize; /**< Number of messages to send / recv at once (scatter / gather) */
int affinity; /**< CPU Affinity of this node */
unsigned long sequence; /**< This is a counter of received samples, in case the node-type does not generate sequence numbers itself. */
enum node_state {
NODE_INVALID, /**< This node object is not in a valid state. */

View file

@ -29,8 +29,9 @@ enum socket_layer {
};
enum socket_header {
SOCKET_HEADER_DEFAULT, /**> Default header in the payload, (see msg_format.h) */
SOCKET_HEADER_GTNET_SKT /**> No header in the payload, same as HDR_NONE*/
SOCKET_HEADER_DEFAULT, /**> Default header in the payload, (see msg_format.h) */
SOCKET_HEADER_NONE, /**> No header in the payload, same as HDR_NONE*/
SOCKET_HEADER_FAKE /**> Same as SOCKET_HEADER_NONE but using the first three data values as: sequence, seconds & nano-seconds. */
};
union sockaddr_union {
@ -41,29 +42,19 @@ union sockaddr_union {
};
struct socket {
/** The socket descriptor */
int sd;
/** Socket mark for netem, routing and filtering */
int mark;
int sd; /**> The socket descriptor */
int mark; /**> Socket mark for netem, routing and filtering */
/** The OSI / IP layer which should be used for this socket */
enum socket_layer layer;
enum socket_layer layer; /**> The OSI / IP layer which should be used for this socket */
enum socket_header header; /**> Payload header type */
/** Payload header type */
enum socket_header header;
union sockaddr_union local; /**> Local address of the socket */
union sockaddr_union remote; /**> Remote address of the socket */
/** Local address of the socket */
union sockaddr_union local;
/** Remote address of the socket */
union sockaddr_union remote;
struct rtnl_qdisc *tc_qdisc; /**> libnl3: Network emulator queuing discipline */
struct rtnl_cls *tc_classifier; /**> libnl3: Firewall mark classifier */
/** libnl3: Network emulator queuing discipline */
struct rtnl_qdisc *tc_qdisc;
/** libnl3: Firewall mark classifier */
struct rtnl_cls *tc_classifier;
/* Linked list _per_interface_ */
struct socket *next;
struct socket *next; /* Linked list _per_interface_ */
};

View file

@ -42,7 +42,7 @@ struct path
struct node *in; /**< Pointer to the incoming node */
struct mpmc_queue queue; /**< A ring buffer for all received messages (unmodified) */
struct queue queue; /**< A ring buffer for all received messages (unmodified) */
struct pool pool; /**< Memory pool for messages / samples. */
struct list destinations; /**< List of all outgoing nodes */

View file

@ -15,8 +15,7 @@
#include <sys/types.h>
#include "queue.h"
struct memtype;
#include "memory.h"
/** A thread-safe memory pool */
struct pool {
@ -28,13 +27,13 @@ struct pool {
size_t blocksz; /**< Length of a block in bytes */
size_t alignment; /**< Alignment of a block in bytes */
struct mpmc_queue queue; /**< The queue which is used to keep track of free blocks */
struct queue queue; /**< The queue which is used to keep track of free blocks */
};
#define INLINE static inline __attribute__((unused))
/** Initiazlize a pool */
int pool_init(struct pool *p, size_t blocksz, size_t alignment, const struct memtype *mem);
int pool_init(struct pool *p, size_t cnt, size_t blocksz, const struct memtype *mem);
/** Destroy and release memory used by pool. */
int pool_destroy(struct pool *p);
@ -42,26 +41,26 @@ int pool_destroy(struct pool *p);
/** Pop cnt values from the stack an place them in the array blocks */
INLINE ssize_t pool_get_many(struct pool *p, void *blocks[], size_t cnt)
{
return mpmc_queue_pull_many(&p->queue, blocks, cnt);
return queue_pull_many(&p->queue, blocks, cnt);
}
/** Push cnt values which are giving by the array values to the stack. */
INLINE ssize_t pool_put_many(struct pool *p, void *blocks[], size_t cnt)
{
return mpmc_queue_push_many(&p->queue, blocks, cnt);
return queue_push_many(&p->queue, blocks, cnt);
}
/** Get a free memory block from pool. */
INLINE void * pool_get(struct pool *p)
{
void *ptr;
return mpmc_queue_pull(&p->queue, &ptr) == 1 ? ptr : NULL;
return queue_pull(&p->queue, &ptr) == 1 ? ptr : NULL;
}
/** Release a memory block back to the pool. */
INLINE int pool_put(struct pool *p, void *buf)
{
return mpmc_queue_push(&p->queue, buf);
return queue_push(&p->queue, buf);
}
#endif /* _POOL_H_ */

View file

@ -42,12 +42,12 @@
#define CACHELINE_SIZE 64
typedef char cacheline_pad_t[CACHELINE_SIZE];
struct mpmc_queue {
struct queue {
cacheline_pad_t _pad0; /**< Shared area: all threads read */
struct memtype const * mem;
size_t buffer_mask;
struct mpmc_queue_cell {
struct queue_cell {
atomic_size_t sequence;
void *data;
} *buffer;
@ -64,24 +64,24 @@ struct mpmc_queue {
};
/** Initialize MPMC queue */
int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem);
int queue_init(struct queue *q, size_t size, const struct memtype *mem);
/** Desroy MPMC queue and release memory */
int mpmc_queue_destroy(struct mpmc_queue *q);
int queue_destroy(struct queue *q);
/** Return estimation of current queue usage.
*
* Note: This is only an estimation and not accurate as long other
* threads are performing operations.
*/
size_t mpmc_queue_available(struct mpmc_queue *q);
size_t queue_available(struct queue *q);
int mpmc_queue_push(struct mpmc_queue *q, void *ptr);
int queue_push(struct queue *q, void *ptr);
int mpmc_queue_pull(struct mpmc_queue *q, void **ptr);
int queue_pull(struct queue *q, void **ptr);
int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt);
int queue_push_many(struct queue *q, void *ptr[], size_t cnt);
int mpmc_queue_pull_many(struct mpmc_queue *q, void *ptr[], size_t cnt);
int queue_pull_many(struct queue *q, void *ptr[], size_t cnt);
#endif /* _MPMC_QUEUE_H_ */

View file

@ -204,11 +204,11 @@ void printb(void *mem, size_t len);
void printdw(void *mem, size_t len);
/** Get CPU timestep counter */
__attribute__((always_inline)) static inline uint64_t rdtscp()
__attribute__((always_inline)) static inline uint64_t rdtsc()
{
uint64_t tsc;
__asm__ ("rdtscp;"
__asm__ ("rdtsc;"
"shl $32, %%rdx;"
"or %%rdx,%%rax"
: "=a" (tsc)

View file

@ -19,23 +19,20 @@
void * memory_alloc(const struct memtype *m, size_t len)
{
debug(DBG_MEM | 2, "Allocating %zu byte of %s memory", len, m->name);
return m->alloc(len);
}
void * memory_alloc_aligned(const struct memtype *m, size_t len, size_t alignment)
{
warn("memory_alloc_aligned: not implemented yet!");
return memory_alloc(m, len);
}
void * memory_aligned_alloc(const struct memtype *m, size_t len, size_t align)
{
warn("memory_aligned_alloc: not implemented yet. Falling back to unaligned version.");
debug(DBG_MEM | 2, "Allocating %zu byte of %zu-byte-aligned %s memory", len, alignment, m->name);
warn("%s: not implemented yet!", __FUNCTION__);
return memory_alloc(m, len);
}
int memory_free(const struct memtype *m, void *ptr, size_t len)
{
debug(DBG_MEM | 2, "Releasing %zu bytes of %s memory", len, m->name);
return m->free(ptr, len);
}
@ -85,7 +82,7 @@ const struct memtype memtype_hugepage = {
.flags = MEMORY_MMAP | MEMORY_HUGEPAGE,
.alloc = memory_hugepage_alloc,
.free = memory_hugepage_free,
.alignment = 1 << 21 /* 2 MiB hugepage */
.alignment = 21 /* 2 MiB hugepage */
};
/** @todo */
@ -93,5 +90,5 @@ const struct memtype memtype_dma = {
.name = "dma",
.flags = MEMORY_DMA | MEMORY_MMAP,
.alloc = NULL, .free = NULL,
.alignment = 1 << 12
.alignment = 12
};

View file

@ -113,6 +113,8 @@ int node_start(struct node *n)
if (ret == 0)
n->state = NODE_RUNNING;
n->sequence = 0;
return ret;
}

View file

@ -103,8 +103,9 @@ char * socket_print(struct node *n)
}
switch (s->header) {
case SOCKET_HEADER_GTNET_SKT: header = "gtnet-skt"; break;
case SOCKET_HEADER_DEFAULT: header = "villas"; break;
case SOCKET_HEADER_NONE: header = "none"; break;
case SOCKET_HEADER_FAKE: header = "fake"; break;
case SOCKET_HEADER_DEFAULT: header = "default"; break;
}
char *local = socket_print_addr((struct sockaddr *) &s->local);
@ -214,34 +215,29 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
int samples, ret, received, length;
ssize_t bytes;
if (s->header == SOCKET_HEADER_GTNET_SKT) {
if (s->header == SOCKET_HEADER_NONE || s->header == SOCKET_HEADER_FAKE) {
if (cnt < 1)
return 0;
/* The GTNETv2-SKT protocol send every sample in a single packet.
* socket_read() receives a single packet. */
int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1;
struct iovec iov[iov_len];
struct sample *smp = smps[0];
#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER
uint32_t header[3];
struct iovec iov[] = {
{ /* First three values are sequence, seconds and nano-seconds */
.iov_base = header,
.iov_len = sizeof(header)
},
#else
struct iovec iov[] = {
#endif
{ /* Remaining values are payload */
.iov_base = &smp->data,
.iov_len = SAMPLE_DATA_LEN(smp->capacity)
}
};
if (s->header == SOCKET_HEADER_FAKE) {
iov[0].iov_base = header;
iov[0].iov_len = sizeof(header);
}
/* Remaining values are payload */
iov[iov_len-1].iov_base = &smp->data;
iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smp->capacity);
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = ARRAY_LEN(iov),
.msg_iovlen = iov_len,
.msg_name = (struct sockaddr *) &s->remote,
.msg_namelen = sizeof(s->remote)
};
@ -258,28 +254,23 @@ int socket_read(struct node *n, struct sample *smps[], unsigned cnt)
return -1;
}
#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER
length = (bytes - sizeof(header)) / SAMPLE_DATA_LEN(1);
#else
length = bytes / SAMPLE_DATA_LEN(1);
#endif
length = (s->header == SOCKET_HEADER_FAKE ? bytes - sizeof(header) : bytes) / SAMPLE_DATA_LEN(1);
if (length > smp->capacity) {
warn("Node %s received more values than supported. Dropping %u values", node_name(n), length - smp->capacity);
length = smp->capacity;
}
/** @todo Should we generate sequence no here manually?
* Or maybe optinally use the first data value as a sequence?
* However this would require the RTDS model to be changed. */
#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER
smp->sequence = header[0];
smp->ts.origin.tv_sec = header[1];
smp->ts.origin.tv_nsec = header[2];
#else
smp->sequence = -1;
smp->ts.origin.tv_sec = -1;
smp->ts.origin.tv_nsec = -1;
#endif
if (s->header == SOCKET_HEADER_FAKE) {
smp->sequence = header[0];
smp->ts.origin.tv_sec = header[1];
smp->ts.origin.tv_nsec = header[2];
}
else {
smp->sequence = n->sequence++; /* Fake sequence no generated by VILLASnode */
smp->ts.origin.tv_sec = -1;
smp->ts.origin.tv_nsec = -1;
}
smp->ts.received.tv_sec = -1;
smp->ts.received.tv_nsec = -1;
@ -376,35 +367,32 @@ int socket_write(struct node *n, struct sample *smps[], unsigned cnt)
int sent = 0;
/* Construct iovecs */
if (s->header == SOCKET_HEADER_GTNET_SKT) {
if (s->header == SOCKET_HEADER_NONE || s->header == SOCKET_HEADER_FAKE) {
if (cnt < 1)
return 0;
for (int i = 0; i < cnt; i++) {
#if defined(GTNET_SKT_HEADER) && GTNET_SKT_HEADER
uint32_t header[] = {
smps[i]->sequence,
smps[i]->ts.origin.tv_sec,
smps[i]->ts.origin.tv_nsec
};
for (int i = 0; i < cnt; i++) {
int iov_len = s->header == SOCKET_HEADER_FAKE ? 2 : 1;
struct iovec iov[iov_len];
struct iovec iov[] = {
{ /* First three values are sequence, seconds and nano-seconds */
.iov_base = header,
.iov_len = sizeof(header)
},
#else
struct iovec iov[] = {
#endif
{ /* Remaining values are payload */
.iov_base = &smps[i]->data,
.iov_len = SAMPLE_DATA_LEN(smps[i]->length)
}
};
/* First three values are sequence, seconds and nano-seconds timestamps */
uint32_t header[3];
if (s->header == SOCKET_HEADER_FAKE) {
header[0] = smps[i]->sequence;
header[1] = smps[i]->ts.origin.tv_sec;
header[2] = smps[i]->ts.origin.tv_nsec;
iov[0].iov_base = header;
iov[0].iov_len = sizeof(header);
}
/* Remaining values are payload */
iov[iov_len-1].iov_base = &smps[i]->data;
iov[iov_len-1].iov_len = SAMPLE_DATA_LEN(smps[i]->length);
struct msghdr mhdr = {
.msg_iov = iov,
.msg_iovlen = ARRAY_LEN(iov),
.msg_iovlen = iov_len,
.msg_name = (struct sockaddr *) &s->remote,
.msg_namelen = sizeof(s->remote)
};
@ -480,9 +468,11 @@ int socket_parse(struct node *n, config_setting_t *cfg)
if (!config_setting_lookup_string(cfg, "header", &hdr))
s->header = SOCKET_HEADER_DEFAULT;
else {
if (!strcmp(hdr, "gtnet-skt") || (!strcmp(hdr, "none")))
s->header = SOCKET_HEADER_GTNET_SKT;
else if (!strcmp(hdr, "default") || !strcmp(hdr, "villas"))
if (!strcmp(hdr, "gtnet-skt") || (!strcmp(hdr, "none")))
s->header = SOCKET_HEADER_NONE;
else if (!strcmp(hdr, "gtnet-skt:fake") || (!strcmp(hdr, "fake")))
s->header = SOCKET_HEADER_FAKE;
else if (!strcmp(hdr, "villas") || !strcmp(hdr, "default"))
s->header = SOCKET_HEADER_DEFAULT;
else
cerror(cfg, "Invalid application header type '%s' for node %s", hdr, node_name(n));

View file

@ -26,7 +26,7 @@ static void path_write(struct path *p, bool resend)
int sent, tosend, available, released;
struct sample *smps[n->vectorize];
available = mpmc_queue_pull_many(&p->queue, (void **) smps, cnt);
available = queue_pull_many(&p->queue, (void **) smps, cnt);
if (available < cnt)
warn("Queue underrun for path %s: available=%u expected=%u", path_name(p), available, cnt);
@ -108,7 +108,7 @@ static void * path_run(void *arg)
p->skipped += recv - enqueue;
}
enqueued = mpmc_queue_push_many(&p->queue, (void **) smps, enqueue);
enqueued = queue_push_many(&p->queue, (void **) smps, enqueue);
if (enqueue != enqueued)
warn("Failed to enqueue %u samples for path %s", enqueue - enqueued, path_name(p));
@ -219,7 +219,7 @@ int path_prepare(struct path *p)
if (ret)
error("Failed to allocate memory pool for path");
ret = mpmc_queue_init(&p->queue, p->queuelen, &memtype_hugepage);
ret = queue_init(&p->queue, p->queuelen, &memtype_hugepage);
if (ret)
error("Failed to initialize queue for path");
@ -233,7 +233,7 @@ void path_destroy(struct path *p)
list_destroy(&p->destinations, NULL, false);
list_destroy(&p->hooks, NULL, true);
mpmc_queue_destroy(&p->queue);
queue_destroy(&p->queue);
pool_destroy(&p->pool);
free(p->_name);

View file

@ -12,8 +12,10 @@
#include "memory.h"
#include "kernel/kernel.h"
int pool_init(struct pool *p, size_t blocksz, size_t cnt, const struct memtype *m)
int pool_init(struct pool *p, size_t cnt, size_t blocksz, const struct memtype *m)
{
int ret;
/* Make sure that we use a block size that is aligned to the size of a cache line */
p->alignment = kernel_get_cacheline_size();
p->blocksz = blocksz * CEIL(blocksz, p->alignment);
@ -25,18 +27,20 @@ int pool_init(struct pool *p, size_t blocksz, size_t cnt, const struct memtype *
serror("Failed to allocate memory for memory pool");
else
debug(DBG_POOL | 4, "Allocated %#zx bytes for memory pool", p->len);
mpmc_queue_init(&p->queue, cnt, m);
ret = queue_init(&p->queue, cnt, m);
if (ret)
return ret;
for (int i = 0; i < cnt; i++)
mpmc_queue_push(&p->queue, (char *) p->buffer + i * p->blocksz);
queue_push(&p->queue, (char *) p->buffer + i * p->blocksz);
return 0;
}
int pool_destroy(struct pool *p)
{
mpmc_queue_destroy(&p->queue);
queue_destroy(&p->queue);
return memory_free(p->mem, p->buffer, p->len);
}

View file

@ -34,7 +34,7 @@
#include "queue.h"
/** Initialize MPMC queue */
int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem)
int queue_init(struct queue *q, size_t size, const struct memtype *mem)
{
/* Queue size must be 2 exponent */
if ((size < 2) || ((size & (size - 1)) != 0))
@ -55,7 +55,7 @@ int mpmc_queue_init(struct mpmc_queue *q, size_t size, const struct memtype *mem
return 0;
}
int mpmc_queue_destroy(struct mpmc_queue *q)
int queue_destroy(struct queue *q)
{
return memory_free(q->mem, q->buffer, (q->buffer_mask + 1) * sizeof(sizeof(q->buffer[0])));
}
@ -65,15 +65,15 @@ int mpmc_queue_destroy(struct mpmc_queue *q)
* Note: This is only an estimation and not accurate as long other
* threads are performing operations.
*/
size_t mpmc_queue_available(struct mpmc_queue *q)
size_t queue_available(struct queue *q)
{
return atomic_load_explicit(&q->tail, memory_order_relaxed) -
atomic_load_explicit(&q->head, memory_order_relaxed);
}
int mpmc_queue_push(struct mpmc_queue *q, void *ptr)
int queue_push(struct queue *q, void *ptr)
{
struct mpmc_queue_cell *cell;
struct queue_cell *cell;
size_t pos, seq;
intptr_t diff;
@ -99,9 +99,9 @@ int mpmc_queue_push(struct mpmc_queue *q, void *ptr)
return 1;
}
int mpmc_queue_pull(struct mpmc_queue *q, void **ptr)
int queue_pull(struct queue *q, void **ptr)
{
struct mpmc_queue_cell *cell;
struct queue_cell *cell;
size_t pos, seq;
intptr_t diff;
@ -128,13 +128,13 @@ int mpmc_queue_pull(struct mpmc_queue *q, void **ptr)
return 1;
}
int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt)
int queue_push_many(struct queue *q, void *ptr[], size_t cnt)
{
int ret;
size_t i;
for (i = 0; i < cnt; i++) {
ret = mpmc_queue_push(q, ptr[i]);
ret = queue_push(q, ptr[i]);
if (!ret)
break;
}
@ -142,13 +142,13 @@ int mpmc_queue_push_many(struct mpmc_queue *q, void *ptr[], size_t cnt)
return i;
}
int mpmc_queue_pull_many(struct mpmc_queue *q, void *ptr[], size_t cnt)
int queue_pull_many(struct queue *q, void *ptr[], size_t cnt)
{
int ret;
size_t i;
for (i = 0; i < cnt; i++) {
ret = mpmc_queue_pull(q, &ptr[i]);
ret = queue_pull(q, &ptr[i]);
if (!ret)
break;
}

View file

@ -370,12 +370,13 @@ void rdtsc_sleep(uint64_t nanosecs, uint64_t start)
{
uint64_t cycles;
/** @todo Replace the hard coded CPU clock frequency */
cycles = (double) nanosecs / (1e9 / 3392389000);
if (start == 0)
start = rdtscp();
start = rdtsc();
do {
__asm__("nop");
} while (rdtscp() - start < cycles);
} while (rdtsc() - start < cycles);
}

View file

@ -115,9 +115,9 @@ int fpga_benchmark_overruns(struct fpga *f)
for (int i = 0; i < runs + BENCH_WARMUP; i++) {
dma_read(dm, mem.base_phys, 0x200);
start = rdtscp();
start = rdtsc();
lapack_workload(p, A);
stop = rdtscp();
stop = rdtsc();
dma_read_complete(dm, NULL, NULL);

View file

@ -108,7 +108,7 @@ int fpga_benchmark_jitter(struct fpga *f)
XTmrCtr_SetResetValue(xtmr, 0, period * FPGA_AXI_HZ);
XTmrCtr_Start(xtmr, 0);
uint64_t end, start = rdtscp();
uint64_t end, start = rdtsc();
for (int i = 0; i < runs; i++) {
uint64_t cnt = intc_wait(f->intc, tmr->irq);
if (cnt != 1)
@ -117,7 +117,7 @@ int fpga_benchmark_jitter(struct fpga *f)
/* Ackowledge IRQ */
XTmrCtr_WriteReg((uintptr_t) f->map + tmr->baseaddr, 0, XTC_TCSR_OFFSET, XTmrCtr_ReadReg((uintptr_t) f->map + tmr->baseaddr, 0, XTC_TCSR_OFFSET));
end = rdtscp();
end = rdtsc();
hist[i] = end - start;
start = end;
}
@ -157,11 +157,11 @@ int fpga_benchmark_latency(struct fpga *f)
error("Failed to enable interrupts");
for (int i = 0; i < runs; i++) {
start = rdtscp();
start = rdtsc();
XIntc_Out32((uintptr_t) f->map + f->intc->baseaddr + XIN_ISR_OFFSET, 0x100);
intc_wait(f->intc, 8);
end = rdtscp();
end = rdtsc();
hist[i] = end - start;
}
@ -239,7 +239,7 @@ int fpga_benchmark_datamover(struct fpga *f)
uint64_t runs = BENCH_RUNS >> exp;
for (int i = 0; i < runs + BENCH_WARMUP; i++) {
start = rdtscp();
start = rdtsc();
#if BENCH_DM == 1
ssize_t ret;
@ -255,7 +255,7 @@ int fpga_benchmark_datamover(struct fpga *f)
if (ret)
error("DMA ping pong failed");
#endif
stop = rdtscp();
stop = rdtsc();
if (memcmp(src.base_virt, dst.base_virt, len))
warn("Compare failed");
@ -304,13 +304,13 @@ int fpga_benchmark_memcpy(struct fpga *f)
uint64_t runs = (BENCH_RUNS << 2) >> exp;
for (int i = 0; i < runs + BENCH_WARMUP; i++) {
start = rdtscp();
start = rdtsc();
for (int j = 0; j < len / 4; j++)
// mapi[j] = j; // write
dummy += mapi[j]; // read
end = rdtscp();
end = rdtsc();
if (i > BENCH_WARMUP)
total += end - start;

View file

@ -3,7 +3,7 @@ TEST_OBJS = $(patsubst %.c,$(BUILDDIR)/%.o,$(TEST_SRCS))
TEST_CFLAGS = $(CFLAGS)
TEST_LDFLAGS = $(LDFLAGS) -Wl,-rpath,'$$ORIGIN'
TEST_LDLIBS = $(LDLIBS) -lcriterion -lvillas
TEST_LDLIBS = $(LDLIBS) -lcriterion -lvillas -pthread
tests: $(BUILDDIR)/testsuite

View file

@ -1,25 +1,318 @@
/** Unit tests for MPMC queue
*
* @author Steffen Vogel <stvogel@eonerc.rwth-aachen.de>
* @copyright 2014-2016, Institute for Automation of Complex Power Systems, EONERC
* This file is part of VILLASnode. All Rights Reserved. Proprietary and confidential.
* Unauthorized copying of this file, via any medium is strictly prohibited.
*********************************************************************************/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <time.h>
#include <pthread.h>
#include <criterion/criterion.h>
#include <criterion/logging.h>
#include <criterion/parameterized.h>
#include "utils.h"
#include "queue.h"
#include "memory.h"
Test(queue, singleThreaded)
#define SIZE (1 << 10)
static struct queue q;
struct param {
int volatile start;
int thread_count;
int queue_size;
int iter_count;
int batch_size;
void * (*thread_func)(void *);
struct queue queue;
const struct memtype *memtype;
};
/** Get thread id as integer
* In contrast to pthread_t which is an opaque type */
#ifdef __linux__
#include <sys/syscall.h>
#endif
uint64_t thread_get_id()
{
/* struct queue q;
#ifdef __MACH__
uint64_t id;
pthread_threadid_np(pthread_self(), &id);
return id;
#elif defined(SYS_gettid)
return (int) syscall(SYS_gettid);
#endif
return -1;
}
/** Sleep, do nothing */
__attribute__((always_inline)) static inline void nop()
{
__asm__("rep nop;");
}
static void * producer(void *ctx)
{
int ret;
struct param *p = (struct param *) ctx;
queue_init(&q, 100, &memtype_heap);
srand((unsigned) time(0) + thread_get_id());
size_t nops = rand() % 1000;
srand(1337);
/** @todo Criterion cr_log() is broken for multi-threaded programs */
//cr_log_info("producer: tid = %lu", thread_get_id());
/* Wait for global start signal */
while (p->start == 0)
pthread_yield();
for (int i = 0; i < 100; i++)
queue_push(&q, &)
//cr_log_info("producer: wait for %zd nops", nops);
queue_destroy(&q);*/
/* Wait for a random time */
for (size_t i = 0; i != nops; i += 1)
nop();
//cr_log_info("producer: start pushing");
/* Enqueue */
for (unsigned long count = 0; count < p->iter_count; count++) {
do {
ret = queue_push(&p->queue, (void *) count);
pthread_yield();
} while (ret != 1);
}
//cr_log_info("producer: finished");
return NULL;
}
static void * consumer(void *ctx)
{
int ret;
struct param *p = (struct param *) ctx;
srand((unsigned) time(0) + thread_get_id());
size_t nops = rand() % 1000;
//cr_log_info("consumer: tid = %lu", thread_get_id());
/* Wait for global start signal */
while (p->start == 0)
pthread_yield();
//cr_log_info("consumer: wait for %zd nops", nops);
/* Wait for a random time */
for (size_t i = 0; i != nops; i += 1)
nop();
//cr_log_info("consumer: start pulling");
/* Dequeue */
for (unsigned long count = 0; count < p->iter_count; count++) {
void *ptr;
do {
ret = queue_pull(&p->queue, &ptr);
} while (ret != 1);
//cr_log_info("consumer: %lu\n", count);
//cr_assert_eq((intptr_t) ptr, count);
}
//cr_log_info("consumer: finished");
return NULL;
}
void * producer_consumer(void *ctx)
{
struct param *p = (struct param *) ctx;
srand((unsigned) time(0) + thread_get_id());
size_t nops = rand() % 1000;
/* Wait for global start signal */
while (p->start == 0)
pthread_yield();
/* Wait for a random time */
for (size_t i = 0; i != nops; i += 1)
nop();
for (int iter = 0; iter < p->iter_count; ++iter) {
for (size_t i = 0; i < p->batch_size; i++) {
void *ptr = (void *) (iter * p->batch_size + i);
while (!queue_push(&p->queue, ptr))
pthread_yield(); /* queue full, let other threads proceed */
}
for (size_t i = 0; i < p->batch_size; i++) {
void *ptr;
while (!queue_pull(&p->queue, &ptr))
pthread_yield(); /* queue empty, let other threads proceed */
}
}
return 0;
}
void * producer_consumer_many(void *ctx)
{
struct param *p = (struct param *) ctx;
srand((unsigned) time(0) + thread_get_id());
size_t nops = rand() % 1000;
/* Wait for global start signal */
while (p->start == 0)
pthread_yield();
/* Wait for a random time */
for (size_t i = 0; i != nops; i += 1)
nop();
void *ptrs[p->batch_size];
for (int iter = 0; iter < p->iter_count; ++iter) {
for (size_t i = 0; i < p->batch_size; i++)
ptrs[i] = (void *) (iter * p->batch_size + i);
int pushed = 0;
do {
pushed += queue_push_many(&p->queue, &ptrs[pushed], p->batch_size - pushed);
if (pushed != p->batch_size)
pthread_yield(); /* queue full, let other threads proceed */
} while (pushed < p->batch_size);
int pulled = 0;
do {
pulled += queue_pull_many(&p->queue, &ptrs[pulled], p->batch_size - pulled);
if (pulled != p->batch_size)
pthread_yield(); /* queue empty, let other threads proceed */
} while (pulled < p->batch_size);
}
return 0;
}
Test(queue, single_threaded)
{
int ret;
struct param p = {
.iter_count = 1 << 8,
.queue_size = 1 << 10,
.start = 1 /* we start immeadiatly */
};
ret = queue_init(&p.queue, p.queue_size, &memtype_heap);
cr_assert_eq(ret, 0, "Failed to create queue");
producer(&p);
consumer(&p);
cr_assert_eq(queue_available(&q), 0);
ret = queue_destroy(&p.queue);
cr_assert_eq(ret, 0, "Failed to create queue");
}
ParameterizedTestParameters(queue, multi_threaded)
{
static struct param params[] = {
{
.iter_count = 1 << 12,
.queue_size = 1 << 9,
.thread_count = 32,
.thread_func = producer_consumer_many,
.batch_size = 10,
.memtype = &memtype_heap
}, {
.iter_count = 1 << 8,
.queue_size = 1 << 9,
.thread_count = 4,
.thread_func = producer_consumer_many,
.batch_size = 100,
.memtype = &memtype_heap
}, {
.iter_count = 1 << 16,
.queue_size = 1 << 9,
.thread_count = 16,
.thread_func = producer_consumer_many,
.batch_size = 100,
.memtype = &memtype_heap
}, {
.iter_count = 1 << 8,
.queue_size = 1 << 9,
.thread_count = 4,
.thread_func = producer_consumer_many,
.batch_size = 10,
.memtype = &memtype_heap
}, {
.iter_count = 1 << 16,
.queue_size = 1 << 9,
.thread_count = 16,
.thread_func = producer_consumer,
.batch_size = 10,
.memtype = &memtype_hugepage
}
};
return cr_make_param_array(struct param, params, ARRAY_LEN(params));
}
ParameterizedTest(struct param *p, queue, multi_threaded, .timeout = 10)
{
int ret, cycpop;
pthread_t threads[p->thread_count];
p->start = 0;
ret = queue_init(&p->queue, p->queue_size, &memtype_heap);
cr_assert_eq(ret, 0, "Failed to create queue");
uint64_t start_tsc_time, end_tsc_time;
for (int i = 0; i < p->thread_count; ++i)
pthread_create(&threads[i], NULL, p->thread_func, &p);
sleep(1);
start_tsc_time = rdtsc();
p->start = 1;
for (int i = 0; i < p->thread_count; ++i)
pthread_join(threads[i], NULL);
end_tsc_time = rdtsc();
cycpop = (end_tsc_time - start_tsc_time) / p->iter_count;
if (cycpop < 400)
cr_log_info("cycles/op: %u\n", cycpop);
else
cr_log_warn("cycles/op are very high (%u). Are you running on a a hypervisor?\n", cycpop);
cr_assert_eq(queue_available(&q), 0);
ret = queue_destroy(&p->queue);
cr_assert_eq(ret, 0, "Failed to create queue");
}
Test(queue, init_destroy)
{
int ret;
struct queue q;
ret = queue_init(&q, 100, &memtype_heap);
cr_assert_eq(ret, -1); /* Should fail as size is not 2^x */
ret = queue_init(&q, 1024, &memtype_heap);
cr_assert_eq(ret, 0); /* Should succeed */
ret = queue_destroy(&q);
cr_assert_eq(ret, 0); /* Should succeed */
}

View file

@ -76,7 +76,7 @@ Test(timing, time_to_from_double)
cr_assert_float_eq(dbl, ref, 1e-9);
}
Test(timing, timerfd_create_rate)
Test(timing, timerfd_create_rate, .timeout = 20)
{
struct timespec start, end;
@ -100,7 +100,7 @@ Test(timing, timerfd_create_rate)
close(tfd);
}
Test(timing, timerfd_wait_until)
Test(timing, timerfd_wait_until, .timeout = 1)
{
int tfd = timerfd_create(CLOCK_REALTIME, 0);

@ -1 +1 @@
Subproject commit 5b0f2b129046955c004ff56ab2caada5b55ba8e3
Subproject commit 1b687a9f6a0e51c9e9b0a047e1fcd6c94de7a080