diff --git a/include/villas/hash_table.h b/include/villas/hash_table.h
index d8a79f873..87726184f 100644
--- a/include/villas/hash_table.h
+++ b/include/villas/hash_table.h
@@ -20,11 +20,17 @@
* along with this program. If not, see .
*********************************************************************************/
+#pragma once
+
#include
#include
#include
+#ifdef __cplusplus
+extern "C" {
+#endif
+
struct hash_table_entry {
void *key;
void *data;
@@ -73,3 +79,7 @@ void * hash_table_lookup(struct hash_table *ht, void *key);
/** Dump the contents of the hash table in a human readable format to stdout. */
void hash_table_dump(struct hash_table *ht);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/include/villas/log.h b/include/villas/log.h
index 7188ef31e..cac1e5e98 100644
--- a/include/villas/log.h
+++ b/include/villas/log.h
@@ -57,7 +57,7 @@ extern "C" {
enum log_facilities {
LOG_POOL = (1L << 8),
LOG_QUEUE = (1L << 9),
- LOG_CONFIG = (1L << 10),
+ LOG_CONFIG = (1L << 10),
LOG_HOOK = (1L << 11),
LOG_PATH = (1L << 12),
LOG_NODE = (1L << 13),
@@ -73,16 +73,17 @@ enum log_facilities {
LOG_ADVIO = (1L << 23),
/* Node-types */
- LOG_SOCKET = (1L << 24),
+ LOG_SOCKET = (1L << 24),
LOG_FILE = (1L << 25),
LOG_FPGA = (1L << 26),
LOG_NGSI = (1L << 27),
- LOG_WEBSOCKET = (1L << 28),
+ LOG_WEBSOCKET = (1L << 28),
LOG_OPAL = (1L << 30),
- LOG_COMEDI = (1L << 31),
+ LOG_COMEDI = (1L << 31),
+ LOG_IB = (1L << 32),
/* Classes */
- LOG_NODES = LOG_NODE | LOG_SOCKET | LOG_FILE | LOG_FPGA | LOG_NGSI | LOG_WEBSOCKET | LOG_OPAL,
+ LOG_NODES = LOG_NODE | LOG_SOCKET | LOG_FILE | LOG_FPGA | LOG_NGSI | LOG_WEBSOCKET | LOG_OPAL | LOG_IB,
LOG_KERNEL = LOG_VFIO | LOG_PCI | LOG_TC | LOG_IF,
LOG_ALL = ~0xFF
};
diff --git a/include/villas/memory.h b/include/villas/memory.h
index 54da2bbc0..d1829f71a 100644
--- a/include/villas/memory.h
+++ b/include/villas/memory.h
@@ -25,57 +25,46 @@
#include
#include
+#include
+
+#include
#ifdef __cplusplus
extern "C" {
#endif
-#define HUGEPAGESIZE (1 << 21)
-
-struct memtype;
-
-typedef void *(*memzone_allocator_t)(struct memtype *mem, size_t len, size_t alignment);
-typedef int (*memzone_deallocator_t)(struct memtype *mem, void *ptr, size_t len);
-
-enum memtype_flags {
- MEMORY_MMAP = (1 << 0),
- MEMORY_DMA = (1 << 1),
- MEMORY_HUGEPAGE = (1 << 2),
- MEMORY_HEAP = (1 << 3)
-};
-
-struct memtype {
- const char *name;
- int flags;
-
- size_t alignment;
-
- memzone_allocator_t alloc;
- memzone_deallocator_t free;
-
- void *_vd; /**0 If allocation was successful.
*/
-void * memory_alloc(struct memtype *m, size_t len);
+void * memory_alloc(struct memory_type *m, size_t len);
-void * memory_alloc_aligned(struct memtype *m, size_t len, size_t alignment);
+void * memory_alloc_aligned(struct memory_type *m, size_t len, size_t alignment);
-int memory_free(struct memtype *m, void *ptr, size_t len);
+int memory_free(void *ptr);
-struct memtype * memtype_managed_init(void *ptr, size_t len);
-
-extern struct memtype memtype_heap;
-extern struct memtype memtype_hugepage;
+struct memory_allocation * memory_get_allocation(void *ptr);
#ifdef __cplusplus
}
diff --git a/include/villas/memory_type.h b/include/villas/memory_type.h
new file mode 100644
index 000000000..8d927cf3d
--- /dev/null
+++ b/include/villas/memory_type.h
@@ -0,0 +1,70 @@
+/** Memory allocators.
+ *
+ * @file
+ * @author Steffen Vogel
+ * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+#pragma once
+
+#include
+#include
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/* Forward declaratio */
+struct memory_type;
+
+typedef struct memory_allocation * (*memory_allocator_t)(struct memory_type *mem, size_t len, size_t alignment);
+typedef int (*memory_deallocator_t)(struct memory_type *mem, struct memory_allocation * ma);
+
+enum memory_type_flags {
+ MEMORY_MMAP = (1 << 0),
+ MEMORY_DMA = (1 << 1),
+ MEMORY_HUGEPAGE = (1 << 2),
+ MEMORY_HEAP = (1 << 3)
+};
+
+struct memory_type {
+ const char *name;
+ int flags;
+
+ size_t alignment;
+
+ memory_allocator_t alloc;
+ memory_deallocator_t free;
+
+ void *_vd; /**< Virtual data for internal state */
+};
+
+extern struct memory_type memory_type_heap;
+extern struct memory_type memory_hugepage;
+
+struct ibv_mr * memory_type_ib_mr(void *ptr);
+
+struct node;
+
+struct memory_type * memory_ib(struct node *n, struct memory_type *parent);
+struct memory_type * memory_managed(void *ptr, size_t len);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/include/villas/node.h b/include/villas/node.h
index ed6be9bbc..09ca1a429 100644
--- a/include/villas/node.h
+++ b/include/villas/node.h
@@ -160,6 +160,8 @@ int node_write(struct node *n, struct sample *smps[], unsigned cnt);
int node_fd(struct node *n);
+struct memory_type * node_memory_type(struct node *n, struct memory_type *parent);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/villas/node_type.h b/include/villas/node_type.h
index 39f0bfd17..b9026b1d3 100644
--- a/include/villas/node_type.h
+++ b/include/villas/node_type.h
@@ -34,6 +34,7 @@ extern "C"{
#include "list.h"
#include "common.h"
+#include "memory.h"
/* Forward declarations */
struct node;
@@ -161,6 +162,9 @@ struct node_type {
/** Return a file descriptor which can be used by poll / select to detect the availability of new data. */
int (*fd)(struct node *n);
+
+ /** */
+ struct memory_type * (*memory_type)(struct node *n, struct memory_type *parent);
};
/** Initialize all registered node type subsystems.
diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h
new file mode 100644
index 000000000..07e11cd30
--- /dev/null
+++ b/include/villas/nodes/infiniband.h
@@ -0,0 +1,144 @@
+/** Node type: infiniband
+ *
+ * @file
+ * @author Dennis Potter
+ * @copyright 2018, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+/**
+ * @addtogroup infiniband infiniband node type
+ * @ingroup node
+ * @{
+ */
+
+#pragma once
+
+#include
+#include
+#include
+#include
+#include
+
+/* Function poitner typedefs */
+typedef void (*ib_on_completion)(struct node*, struct ibv_wc*, int*);
+typedef void* (*ib_poll_function)(void*);
+
+/* Enums */
+enum poll_mode_e
+{
+ EVENT,
+ BUSY
+};
+
+struct r_addr_key_s {
+ uint64_t remote_addr;
+ uint32_t rkey;
+};
+
+struct infiniband {
+ /* IBV/RDMA CM structs */
+ struct context_s {
+ struct rdma_cm_id *listen_id;
+ struct rdma_cm_id *id;
+ struct rdma_event_channel *ec;
+
+ struct ibv_pd *pd;
+ struct ibv_cq *recv_cq;
+ struct ibv_cq *send_cq;
+ struct ibv_comp_channel *comp_channel;
+ } ctx;
+ /* Work Completion related */
+ struct poll_s {
+ enum poll_mode_e poll_mode;
+
+ /* On completion function */
+ ib_on_completion on_compl;
+
+ /* Busy poll or Event function */
+ ib_poll_function poll_func;
+
+ /* Poll thread */
+ pthread_t cq_poller_thread;
+
+ int stopThread;
+ } poll;
+
+ /* Connection specific variables */
+ struct connection_s {
+ struct addrinfo *src_addr;
+ struct addrinfo *dst_addr;
+ enum rdma_port_space port_space;
+ int timeout;
+
+ struct r_addr_key_s *r_addr_key;
+
+ pthread_t stop_thread;
+ int rdma_disconnect_called;
+
+ int available_recv_wrs;
+ } conn;
+
+ /* Memory related variables */
+ struct ib_memory {
+ struct pool p_recv;
+ struct pool p_send;
+
+ struct ibv_mr *mr_recv;
+ struct ibv_mr *mr_send;
+ } mem;
+
+ /* Queue Pair init variables */
+ struct ibv_qp_init_attr qp_init;
+
+ /* Misc settings */
+ int is_source;
+int cq_size;
+};
+
+/** @see node_type::reverse */
+int infiniband_reverse(struct node *n);
+
+/** @see node_type::print */
+char * infiniband_print(struct node *n);
+
+/** @see node_type::parse */
+int infiniband_parse(struct node *n, json_t *cfg);
+
+/** @see node_type::open */
+int infiniband_start(struct node *n);
+
+/** @see node_type::destroy */
+int infiniband_destroy(struct node *n);
+
+/** @see node_type::close */
+int infiniband_stop(struct node *n);
+
+/** @see node_type::init */
+int infiniband_init(struct super_node *n);
+
+/** @see node_type::deinit */
+int infiniband_deinit();
+
+/** @see node_type::read */
+int infiniband_read(struct node *n, struct sample *smps[], unsigned cnt);
+
+/** @see node_type::write */
+int infiniband_write(struct node *n, struct sample *smps[], unsigned cnt);
+
+/** @} */
diff --git a/include/villas/pool.h b/include/villas/pool.h
index 6afe22cd8..047ba0341 100644
--- a/include/villas/pool.h
+++ b/include/villas/pool.h
@@ -41,10 +41,8 @@ struct pool {
enum state state;
off_t buffer_off; /**< Offset from the struct address to the underlying memory area */
- struct memtype *mem;
size_t len; /**< Length of the underlying memory area */
-
size_t blocksz; /**< Length of a block in bytes */
size_t alignment; /**< Alignment of a block in bytes */
@@ -62,7 +60,7 @@ struct pool {
* @retval 0 The pool has been successfully initialized.
* @retval <>0 There was an error during the pool initialization.
*/
-int pool_init(struct pool *p, size_t cnt, size_t blocksz, struct memtype *mem);
+int pool_init(struct pool *p, size_t cnt, size_t blocksz, struct memory_type *mem);
/** Destroy and release memory used by pool. */
int pool_destroy(struct pool *p);
diff --git a/include/villas/queue.h b/include/villas/queue.h
index 2c7b8c669..549cbed78 100644
--- a/include/villas/queue.h
+++ b/include/villas/queue.h
@@ -45,7 +45,7 @@ extern "C"{
#endif
/* Forward declarations */
-struct memtype;
+struct memory_type;
#define CACHELINE_SIZE 64
typedef char cacheline_pad_t[CACHELINE_SIZE];
@@ -61,7 +61,6 @@ struct queue {
atomic_state state;
- struct memtype *mem;
size_t buffer_mask;
off_t buffer_off; /**< Relative pointer to struct queue_cell[] */
@@ -77,7 +76,7 @@ struct queue {
};
/** Initialize MPMC queue */
-int queue_init(struct queue *q, size_t size, struct memtype *mem);
+int queue_init(struct queue *q, size_t size, struct memory_type *mem);
/** Desroy MPMC queue and release memory */
int queue_destroy(struct queue *q);
diff --git a/include/villas/queue_signalled.h b/include/villas/queue_signalled.h
index 9ad916b82..f2eff3508 100644
--- a/include/villas/queue_signalled.h
+++ b/include/villas/queue_signalled.h
@@ -68,7 +68,7 @@ struct queue_signalled {
#define queue_signalled_available(q) queue_available(&((q)->queue))
-int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem, int flags);
+int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memory_type *mem, int flags);
int queue_signalled_destroy(struct queue_signalled *qs);
diff --git a/lib/api.c b/lib/api.c
index 8ea079949..b98af2785 100644
--- a/lib/api.c
+++ b/lib/api.c
@@ -272,7 +272,8 @@ int api_init(struct api *a, struct super_node *sn)
if (ret)
return ret;
- ret = queue_signalled_init(&a->pending, 1024, &memtype_heap, 0);
+ memory_init(0);
+ ret = queue_signalled_init(&a->pending, 1024, &memory_type_heap, 0);
if (ret)
return ret;
diff --git a/lib/api/session.c b/lib/api/session.c
index 8ad155efe..e421bff79 100644
--- a/lib/api/session.c
+++ b/lib/api/session.c
@@ -40,11 +40,11 @@ int api_session_init(struct api_session *s, enum api_mode m)
if (ret)
return ret;
- ret = queue_init(&s->request.queue, 128, &memtype_heap);
+ ret = queue_init(&s->request.queue, 128, &memory_type_heap);
if (ret)
return ret;
- ret = queue_init(&s->response.queue, 128, &memtype_heap);
+ ret = queue_init(&s->response.queue, 128, &memory_type_heap);
if (ret)
return ret;
diff --git a/lib/log.c b/lib/log.c
index 8bb6e058f..fab1a89bf 100644
--- a/lib/log.c
+++ b/lib/log.c
@@ -86,6 +86,7 @@ static const char *facilities_strs[] = {
"ngsi", /* LOG_NGSI */
"websocket", /* LOG_WEBSOCKET */
"opal", /* LOG_OPAL */
+ "ib", /* LOG_IB */
};
#ifdef __GNUC__
diff --git a/lib/memory.c b/lib/memory.c
index 6a85dbad7..63092de24 100644
--- a/lib/memory.c
+++ b/lib/memory.c
@@ -25,28 +25,32 @@
#include
#include
-#include
#include
#include
-#include
-
-/* Required to allocate hugepages on Apple OS X */
-#ifdef __MACH__
- #include
-#elif defined(__linux__)
- #include
-#endif
#include
#include
#include
+#include
+#include
+
+static struct hash_table allocations = { .state = STATE_DESTROYED };
int memory_init(int hugepages)
{
+ int ret;
+
info("Initialize memory sub-system");
+
+ if (allocations.state == STATE_DESTROYED) {
+ ret = hash_table_init(&allocations, 100);
+ if (ret)
+ return ret;
+ }
+
#ifdef __linux__
- int ret, pagecnt, pagesz;
+ int pagecnt, pagesz;
struct rlimit l;
pagecnt = kernel_get_nr_hugepages();
@@ -99,256 +103,47 @@ int memory_init(int hugepages)
return 0;
}
-void * memory_alloc(struct memtype *m, size_t len)
+void * memory_alloc(struct memory_type *m, size_t len)
{
- void *ptr = m->alloc(m, len, sizeof(void *));
-
- debug(LOG_MEM | 5, "Allocated %#zx bytes of %s memory: %p", len, m->name, ptr);
-
- return ptr;
+ return memory_alloc_aligned(m, len, sizeof(void *));
}
-void * memory_alloc_aligned(struct memtype *m, size_t len, size_t alignment)
+void * memory_alloc_aligned(struct memory_type *m, size_t len, size_t alignment)
{
- void *ptr = m->alloc(m, len, alignment);
+ struct memory_allocation *ma = m->alloc(m, len, alignment);
- debug(LOG_MEM | 5, "Allocated %#zx bytes of %#zx-byte-aligned %s memory: %p", len, alignment, m->name, ptr);
+ hash_table_insert(&allocations, ma->address, ma);
- return ptr;
+ debug(LOG_MEM | 5, "Allocated %#zx bytes of %#zx-byte-aligned %s memory: %p", ma->length, ma->alignment, ma->type->name, ma->address);
+
+ return ma->address;
}
-int memory_free(struct memtype *m, void *ptr, size_t len)
+int memory_free(void *ptr)
{
- debug(LOG_MEM | 5, "Releasing %#zx bytes of %s memory", len, m->name);
-
- return m->free(m, ptr, len);
-}
-
-static void * memory_heap_alloc(struct memtype *m, size_t len, size_t alignment)
-{
- void *ptr;
int ret;
- if (alignment < sizeof(void *))
- alignment = sizeof(void *);
+ /* Find corresponding memory allocation entry */
+ struct memory_allocation *ma = (struct memory_allocation *) hash_table_lookup(&allocations, ptr);
+ if (!ma)
+ return -1;
- ret = posix_memalign(&ptr, alignment, len);
+ debug(LOG_MEM | 5, "Releasing %#zx bytes of %s memory", ma->length, ma->type->name);
- return ret ? NULL : ptr;
-}
+ ret = ma->type->free(ma->type, ma);
+ if (ret)
+ return ret;
-int memory_heap_free(struct memtype *m, void *ptr, size_t len)
-{
- free(ptr);
+ /* Remove allocation entry */
+ ret = hash_table_delete(&allocations, ma->address);
+ if (ret)
+ return ret;
return 0;
}
-/** Allocate memory backed by hugepages with malloc() like interface */
-static void * memory_hugepage_alloc(struct memtype *m, size_t len, size_t alignment)
+struct memory_allocation * memory_get_allocation(void *ptr)
{
- void *ret;
- int prot = PROT_READ | PROT_WRITE;
- int flags = MAP_PRIVATE | MAP_ANONYMOUS;
-
-#ifdef __MACH__
- flags |= VM_FLAGS_SUPERPAGE_SIZE_2MB;
-#elif defined(__linux__)
- flags |= MAP_HUGETLB;
-
- if (getuid() == 0)
- flags |= MAP_LOCKED;
-#endif
-
- ret = mmap(NULL, len, prot, flags, -1, 0);
- if (ret == MAP_FAILED)
- return NULL;
-
- return ret;
+ struct memory_allocation *ma = (struct memory_allocation *) hash_table_lookup(&allocations, ptr);
+ return ma;
}
-
-static int memory_hugepage_free(struct memtype *m, void *ptr, size_t len)
-{
- /** We must make sure that len is a multiple of the hugepage size
- *
- * See: https://lkml.org/lkml/2014/10/22/925
- */
- len = ALIGN(len, HUGEPAGESIZE);
-
- return munmap(ptr, len);
-}
-
-void* memory_managed_alloc(struct memtype *m, size_t len, size_t alignment)
-{
- /* Simple first-fit allocation */
- struct memblock *first = (struct memblock *) m->_vd;
- struct memblock *block;
-
- for (block = first; block != NULL; block = block->next) {
- if (block->flags & MEMBLOCK_USED)
- continue;
-
- char* cptr = (char *) block + sizeof(struct memblock);
- size_t avail = block->len;
- uintptr_t uptr = (uintptr_t) cptr;
-
- /* Check alignment first; leave a gap at start of block to assure
- * alignment if necessary */
- uintptr_t rem = uptr % alignment;
- uintptr_t gap = 0;
- if (rem != 0) {
- gap = alignment - rem;
- if (gap > avail)
- continue; /* Next aligned address isn't in this block anymore */
-
- cptr += gap;
- avail -= gap;
- }
-
- if (avail >= len) {
- if (gap > sizeof(struct memblock)) {
- /* The alignment gap is big enough to fit another block.
- * The original block descriptor is already at the correct
- * position, so we just change its len and create a new block
- * descriptor for the actual block we're handling. */
- block->len = gap - sizeof(struct memblock);
- struct memblock *newblock = (struct memblock *) (cptr - sizeof(struct memblock));
- newblock->prev = block;
- newblock->next = block->next;
- block->next = newblock;
- newblock->flags = 0;
- newblock->len = len;
- block = newblock;
- }
- else {
- /* The gap is too small to fit another block descriptor, so we
- * must account for the gap length in the block length. */
- block->len = len + gap;
- }
-
- if (avail > len + sizeof(struct memblock)) {
- /* Imperfect fit, so create another block for the remaining part */
- struct memblock *newblock = (struct memblock *) (cptr + len);
- newblock->prev = block;
- newblock->next = block->next;
- block->next = newblock;
- if (newblock->next)
- newblock->next->prev = newblock;
- newblock->flags = 0;
- newblock->len = avail - len - sizeof(struct memblock);
- }
- else {
- /* If this block was larger than the requested length, but only
- * by less than sizeof(struct memblock), we may have wasted
- * memory by previous assignments to block->len. */
- block->len = avail;
- }
-
- block->flags |= MEMBLOCK_USED;
-
- return (void *) cptr;
- }
- }
-
- /* No suitable block found */
- return NULL;
-}
-
-int memory_managed_free(struct memtype *m, void *ptr, size_t len)
-{
- struct memblock *first = (struct memblock *) m->_vd;
- struct memblock *block;
- char *cptr = ptr;
-
- for (block = first; block != NULL; block = block->next) {
- if (!(block->flags & MEMBLOCK_USED))
- continue;
-
- /* Since we may waste some memory at the start of a block to ensure
- * alignment, ptr may not actually be the start of the block */
- if ((char *) block + sizeof(struct memblock) <= cptr &&
- cptr < (char *) block + sizeof(struct memblock) + block->len) {
- /* Try to merge it with neighbouring free blocks */
- if (block->prev && !(block->prev->flags & MEMBLOCK_USED) &&
- block->next && !(block->next->flags & MEMBLOCK_USED)) {
- /* Special case first: both previous and next block are unused */
- block->prev->len += block->len + block->next->len + 2 * sizeof(struct memblock);
- block->prev->next = block->next->next;
- if (block->next->next)
- block->next->next->prev = block->prev;
- }
- else if (block->prev && !(block->prev->flags & MEMBLOCK_USED)) {
- block->prev->len += block->len + sizeof(struct memblock);
- block->prev->next = block->next;
- if (block->next)
- block->next->prev = block->prev;
- }
- else if (block->next && !(block->next->flags & MEMBLOCK_USED)) {
- block->len += block->next->len + sizeof(struct memblock);
- block->next = block->next->next;
- if (block->next)
- block->next->prev = block;
- }
- else {
- /* no neighbouring free block, so just mark it as free */
- block->flags &= ~MEMBLOCK_USED;
- }
-
- return 0;
- }
- }
-
- return -1;
-}
-
-struct memtype * memtype_managed_init(void *ptr, size_t len)
-{
- struct memtype *mt = ptr;
- struct memblock *mb;
- char *cptr = ptr;
-
- if (len < sizeof(struct memtype) + sizeof(struct memblock)) {
- info("memtype_managed_init: passed region too small");
- return NULL;
- }
-
- /* Initialize memtype */
- mt->name = "managed";
- mt->flags = 0;
- mt->alloc = memory_managed_alloc;
- mt->free = memory_managed_free;
- mt->alignment = 1;
-
- cptr += ALIGN(sizeof(struct memtype), sizeof(void *));
-
- /* Initialize first free memblock */
- mb = (struct memblock *) cptr;
- mb->prev = NULL;
- mb->next = NULL;
- mb->flags = 0;
-
- cptr += ALIGN(sizeof(struct memblock), sizeof(void *));
-
- mb->len = len - (cptr - (char *) ptr);
-
- mt->_vd = (void *) mb;
-
- return mt;
-}
-
-/* List of available memory types */
-struct memtype memtype_heap = {
- .name = "heap",
- .flags = MEMORY_HEAP,
- .alloc = memory_heap_alloc,
- .free = memory_heap_free,
- .alignment = 1
-};
-
-struct memtype memtype_hugepage = {
- .name = "mmap_hugepages",
- .flags = MEMORY_MMAP | MEMORY_HUGEPAGE,
- .alloc = memory_hugepage_alloc,
- .free = memory_hugepage_free,
- .alignment = 21 /* 2 MiB hugepage */
-};
diff --git a/lib/memory/heap.c b/lib/memory/heap.c
new file mode 100644
index 000000000..7e054fd23
--- /dev/null
+++ b/lib/memory/heap.c
@@ -0,0 +1,67 @@
+/** Memory allocators.
+ *
+ * @author Steffen Vogel
+ * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+#include
+
+#include
+#include
+
+static struct memory_allocation * memory_heap_alloc(struct memory_type *m, size_t len, size_t alignment)
+{
+ int ret;
+
+ struct memory_allocation *ma = alloc(sizeof(struct memory_allocation));
+ if (!ma)
+ return NULL;
+
+ ma->alignment = alignment;
+ ma->type = m;
+ ma->length = len;
+
+ if (ma->alignment < sizeof(void *))
+ ma->alignment = sizeof(void *);
+
+ ret = posix_memalign(&ma->address, ma->alignment, ma->length);
+ if (ret) {
+ free(ma);
+ return NULL;
+ }
+
+ return ma;
+}
+
+static int memory_heap_free(struct memory_type *m, struct memory_allocation *ma)
+{
+ free(ma->address);
+ free(ma);
+
+ return 0;
+}
+
+/* List of available memory types */
+struct memory_type memory_type_heap = {
+ .name = "heap",
+ .flags = MEMORY_HEAP,
+ .alloc = memory_heap_alloc,
+ .free = memory_heap_free,
+ .alignment = 1
+};
diff --git a/lib/memory/hugepage.c b/lib/memory/hugepage.c
new file mode 100644
index 000000000..6bf17719c
--- /dev/null
+++ b/lib/memory/hugepage.c
@@ -0,0 +1,101 @@
+/** Hugepage memory allocator.
+ *
+ * @author Steffen Vogel
+ * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+/* Required to allocate hugepages on Apple OS X */
+#ifdef __MACH__
+ #include
+#elif defined(__linux__)
+ #include
+#endif
+
+#include
+#include
+#include
+
+#define HUGEPAGESIZE (1 << 22) /* 2 MiB */
+
+/** Allocate memory backed by hugepages with malloc() like interface */
+static struct memory_allocation * memory_hugepage_alloc(struct memory_type *m, size_t len, size_t alignment)
+{
+ int prot = PROT_READ | PROT_WRITE;
+ int flags = MAP_PRIVATE | MAP_ANONYMOUS;
+
+#ifdef __MACH__
+ flags |= VM_FLAGS_SUPERPAGE_SIZE_2MB;
+#elif defined(__linux__)
+ flags |= MAP_HUGETLB;
+
+ if (getuid() == 0)
+ flags |= MAP_LOCKED;
+#endif
+
+ struct memory_allocation *ma = alloc(sizeof(struct memory_allocation));
+ if (!ma)
+ return NULL;
+
+ /** We must make sure that len is a multiple of the hugepage size
+ *
+ * See: https://lkml.org/lkml/2014/10/22/925
+ */
+ ma->length = ALIGN(len, HUGEPAGESIZE);
+ ma->alignment = alignment;
+ ma->type = m;
+
+ ma->address = mmap(NULL, len, prot, flags, -1, 0);
+ if (ma->address == MAP_FAILED) {
+ free(ma);
+ return NULL;
+ }
+
+ return ma;
+}
+
+static int memory_hugepage_free(struct memory_type *m, struct memory_allocation *ma)
+{
+ int ret;
+
+ ret = munmap(ma->address, ma->length);
+ if (ret)
+ return ret;
+
+ free(ma);
+
+ return 0;
+}
+
+struct memory_type memory_hugepage = {
+ .name = "mmap_hugepages",
+ .flags = MEMORY_MMAP | MEMORY_HUGEPAGE,
+ .alloc = memory_hugepage_alloc,
+ .free = memory_hugepage_free,
+ .alignment = 21 /* 2 MiB hugepage */
+};
diff --git a/lib/memory/ib.c b/lib/memory/ib.c
new file mode 100644
index 000000000..3b89e5637
--- /dev/null
+++ b/lib/memory/ib.c
@@ -0,0 +1,103 @@
+/** Memory allocators.
+ *
+ * @author Steffen Vogel
+ * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+#include
+#include
+#include
+#include
+#include
+
+
+struct ibv_mr * memory_ib_get_mr(struct sample *smps)
+{
+ struct memory_allocation *ma;
+ struct pool *p;
+ struct ibv_mr *mr;
+
+ p = sample_pool(smps);
+
+ ma = memory_get_allocation((char *)(p)+p->buffer_off);
+ mr = ma->ib.mr;
+ return mr;
+}
+
+static struct memory_allocation * memory_ib_alloc(struct memory_type *m, size_t len, size_t alignment)
+{
+ struct memory_ib *mi = (struct memory_ib *) m->_vd;
+
+ struct memory_allocation *ma = alloc(sizeof(struct memory_allocation));
+ if (!ma)
+ return NULL;
+
+ ma->type = m;
+ ma->length = len;
+ ma->alignment = alignment;
+
+ ma->parent = mi->parent->alloc(mi->parent, len + sizeof(struct ibv_mr *), alignment);
+ ma->address = ma->parent->address;
+
+ ma->ib.mr = ibv_reg_mr(mi->pd, ma->address, ma->length, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
+ if(!ma->ib.mr) {
+ mi->parent->free(mi->parent, ma->parent);
+ free(ma);
+ return NULL;
+ }
+
+ return ma;
+}
+
+static int memory_ib_free(struct memory_type *m, struct memory_allocation *ma)
+{
+ int ret;
+ struct memory_ib *mi = (struct memory_ib *) m->_vd;
+
+ ibv_dereg_mr(ma->ib.mr);
+
+ ret = mi->parent->free(mi->parent, ma->parent);
+ if (ret)
+ return ret;
+
+ free(ma);
+
+ return 0;
+}
+
+struct memory_type * memory_ib(struct node *n, struct memory_type *parent)
+{
+ struct infiniband *i = (struct infiniband *) n->_vd;
+ struct memory_type *mt = malloc(sizeof(struct memory_type));
+
+ mt->name = "ib";
+ mt->flags = 0;
+ mt->alloc = memory_ib_alloc;
+ mt->free = memory_ib_free;
+ mt->alignment = 1;
+
+ mt->_vd = malloc(sizeof(struct memory_ib));
+
+ struct memory_ib *mi = (struct memory_ib *) mt->_vd;
+
+ mi->pd = i->ctx.pd;
+ mi->parent = parent;
+
+ return mt;
+}
diff --git a/lib/memory/managed.c b/lib/memory/managed.c
new file mode 100644
index 000000000..ab8bc4b59
--- /dev/null
+++ b/lib/memory/managed.c
@@ -0,0 +1,193 @@
+/** Memory allocators.
+ *
+ * @author Steffen Vogel
+ * @copyright 2017, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+
+static struct memory_allocation * memory_managed_alloc(struct memory_type *m, size_t len, size_t alignment)
+{
+ /* Simple first-fit allocation */
+ struct memory_block *first = (struct memory_block *) m->_vd;
+ struct memory_block *block;
+
+ for (block = first; block != NULL; block = block->next) {
+ if (block->used)
+ continue;
+
+ char* cptr = (char *) block + sizeof(struct memory_block);
+ size_t avail = block->length;
+ uintptr_t uptr = (uintptr_t) cptr;
+
+ /* Check alignment first; leave a gap at start of block to assure
+ * alignment if necessary */
+ uintptr_t rem = uptr % alignment;
+ uintptr_t gap = 0;
+ if (rem != 0) {
+ gap = alignment - rem;
+ if (gap > avail)
+ continue; /* Next aligned address isn't in this block anymore */
+
+ cptr += gap;
+ avail -= gap;
+ }
+
+ if (avail >= len) {
+ if (gap > sizeof(struct memory_block)) {
+ /* The alignment gap is big enough to fit another block.
+ * The original block descriptor is already at the correct
+ * position, so we just change its len and create a new block
+ * descriptor for the actual block we're handling. */
+ block->length = gap - sizeof(struct memory_block);
+ struct memory_block *newblock = (struct memory_block *) (cptr - sizeof(struct memory_block));
+ newblock->prev = block;
+ newblock->next = block->next;
+ block->next = newblock;
+ newblock->used = false;
+ newblock->length = len;
+ block = newblock;
+ }
+ else {
+ /* The gap is too small to fit another block descriptor, so we
+ * must account for the gap length in the block length. */
+ block->length = len + gap;
+ }
+
+ if (avail > len + sizeof(struct memory_block)) {
+ /* Imperfect fit, so create another block for the remaining part */
+ struct memory_block *newblock = (struct memory_block *) (cptr + len);
+ newblock->prev = block;
+ newblock->next = block->next;
+ block->next = newblock;
+
+ if (newblock->next)
+ newblock->next->prev = newblock;
+
+ newblock->used = false;
+ newblock->length = avail - len - sizeof(struct memory_block);
+ }
+ else {
+ /* If this block was larger than the requested length, but only
+ * by less than sizeof(struct memory_block), we may have wasted
+ * memory by previous assignments to block->length. */
+ block->length = avail;
+ }
+
+ block->used = true;
+
+ struct memory_allocation *ma = alloc(sizeof(struct memory_allocation));
+ if (!ma)
+ return NULL;
+
+ ma->address = cptr;
+ ma->type = m;
+ ma->alignment = alignment;
+ ma->length = len;
+ ma->managed.block = block;
+
+ return ma;
+ }
+ }
+
+ /* No suitable block found */
+ return NULL;
+}
+
+static int memory_managed_free(struct memory_type *m, struct memory_allocation *ma)
+{
+ struct memory_block *block = ma->managed.block;
+
+ /* Try to merge it with neighbouring free blocks */
+ if (block->prev && !block->prev->used &&
+ block->next && !block->next->used) {
+ /* Special case first: both previous and next block are unused */
+ block->prev->length += block->length + block->next->length + 2 * sizeof(struct memory_block);
+ block->prev->next = block->next->next;
+ if (block->next->next)
+ block->next->next->prev = block->prev;
+ }
+ else if (block->prev && !block->prev->used) {
+ block->prev->length += block->length + sizeof(struct memory_block);
+ block->prev->next = block->next;
+ if (block->next)
+ block->next->prev = block->prev;
+ }
+ else if (block->next && !block->next->used) {
+ block->length += block->next->length + sizeof(struct memory_block);
+ block->next = block->next->next;
+ if (block->next)
+ block->next->prev = block;
+ }
+ else {
+ /* no neighbouring free block, so just mark it as free */
+ block->used = false;
+ }
+
+ free(ma);
+
+ return 0;
+}
+
+struct memory_type * memory_managed(void *ptr, size_t len)
+{
+ struct memory_type *mt = ptr;
+ struct memory_block *mb;
+ char *cptr = ptr;
+
+ if (len < sizeof(struct memory_type) + sizeof(struct memory_block)) {
+ info("memory_managed: passed region too small");
+ return NULL;
+ }
+
+ /* Initialize memory_type */
+ mt->name = "managed";
+ mt->flags = 0;
+ mt->alloc = memory_managed_alloc;
+ mt->free = memory_managed_free;
+ mt->alignment = 1;
+
+ cptr += ALIGN(sizeof(struct memory_type), sizeof(void *));
+
+ /* Initialize first free memory block */
+ mb = (struct memory_block *) cptr;
+ mb->prev = NULL;
+ mb->next = NULL;
+ mb->used = false;
+
+ cptr += ALIGN(sizeof(struct memory_block), sizeof(void *));
+
+ mb->length = len - (cptr - (char *) ptr);
+
+ mt->_vd = (void *) mb;
+
+ return mt;
+}
diff --git a/lib/node.c b/lib/node.c
index b2422dabb..5580bc399 100644
--- a/lib/node.c
+++ b/lib/node.c
@@ -32,6 +32,7 @@
#include
#include
#include
+#include
static int node_direction_init(struct node_direction *nd, struct node *n)
{
@@ -549,6 +550,11 @@ int node_fd(struct node *n)
return n->_vt->fd ? n->_vt->fd(n) : -1;
}
+struct memory_type * node_memory_type(struct node *n, struct memory_type *parent)
+{
+ return n->_vt->memory_type ? n->_vt->memory_type(n, parent) : &memory_hugepage;
+}
+
int node_parse_list(struct list *list, json_t *cfg, struct list *all)
{
struct node *node;
diff --git a/lib/nodes/iec61850_sv.c b/lib/nodes/iec61850_sv.c
index 13699049e..c4050dfb8 100644
--- a/lib/nodes/iec61850_sv.c
+++ b/lib/nodes/iec61850_sv.c
@@ -299,11 +299,11 @@ int iec61850_sv_start(struct node *n)
SVReceiver_addSubscriber(i->subscriber.receiver, i->subscriber.subscriber);
/* Initialize pool and queue to pass samples between threads */
- ret = pool_init(&i->subscriber.pool, 1024, SAMPLE_LEN(n->samplelen), &memtype_hugepage);
+ ret = pool_init(&i->subscriber.pool, 1024, SAMPLE_LEN(n->samplelen), &memory_hugepage);
if (ret)
return ret;
- ret = queue_signalled_init(&i->subscriber.queue, 1024, &memtype_hugepage, 0);
+ ret = queue_signalled_init(&i->subscriber.queue, 1024, &memory_hugepage, 0);
if (ret)
return ret;
}
diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c
new file mode 100644
index 000000000..8e35952a9
--- /dev/null
+++ b/lib/nodes/infiniband.c
@@ -0,0 +1,856 @@
+/** Node type: infiniband
+ *
+ * @author Dennis Potter
+ * @copyright 2018, Institute for Automation of Complex Power Systems, EONERC
+ * @license GNU General Public License (version 3)
+ *
+ * VILLASnode
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *********************************************************************************/
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+
+int ib_cleanup(struct node *n)
+{
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+ info("Starting to clean up");
+
+ // Destroy QP
+ rdma_destroy_qp(ib->ctx.id);
+ info("Destroyed QP");
+
+ // Deregister memory regions
+ ibv_dereg_mr(ib->mem.mr_recv);
+ if(ib->is_source)
+ ibv_dereg_mr(ib->mem.mr_send);
+ info("Deregistered memory regions");
+
+ // Destroy pools
+ pool_destroy(&ib->mem.p_recv);
+ pool_destroy(&ib->mem.p_send);
+ info("Destroyed memory pools");
+
+ // Destroy RDMA CM ID
+ rdma_destroy_id(ib->ctx.id);
+ info("Destroyed rdma_cm_id");
+
+ // Destroy event channel
+ rdma_destroy_event_channel(ib->ctx.ec);
+ info("Destroyed event channel");
+
+ return 0;
+}
+
+int ib_post_recv_wrs(struct node *n)
+{
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+ struct ibv_recv_wr wr, *bad_wr = NULL;
+ int ret;
+ struct ibv_sge sge;
+
+ // Prepare receive Scatter/Gather element
+ sge.addr = (uintptr_t)pool_get(&ib->mem.p_recv);
+ sge.length = ib->mem.p_recv.blocksz;
+ sge.lkey = ib->mem.mr_recv->lkey;
+
+ // Prepare a receive Work Request
+ wr.wr_id = (uintptr_t)sge.addr;
+ wr.next = NULL;
+ wr.sg_list = &sge;
+ wr.num_sge = 1;
+
+ // Post Work Request
+ ret = ibv_post_recv(ib->ctx.id->qp, &wr, &bad_wr);
+
+ return ret;
+}
+
+void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){}
+
+void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
+{
+ struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
+
+ for(int i=0; i<*size; i++)
+ {
+ //On disconnect, the QP set to error state and will be flushed
+ if(wc[i].status == IBV_WC_WR_FLUSH_ERR)
+ {
+ ib->poll.stopThread = 1;
+ return;
+ }
+
+ if(wc[i].status != IBV_WC_SUCCESS)
+ {
+ warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
+ node_name(n), wc[i].status);
+ }
+
+ sample_put((struct sample*)(wc[i].wr_id));
+ }
+}
+
+void * ib_event_thread(void *n)
+{
+ struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
+ struct ibv_wc wc[ib->cq_size];
+ int size;
+
+ while(1)
+ {
+ // Function blocks, until an event occurs
+ ibv_get_cq_event(ib->ctx.comp_channel, &ib->ctx.send_cq, NULL);
+
+ // Poll as long as WCs are available
+ while((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc)))
+ ib->poll.on_compl(n, wc, &size);
+
+ // Request a new event in the CQ and acknowledge event
+ ibv_req_notify_cq(ib->ctx.send_cq, 0);
+ ibv_ack_cq_events(ib->ctx.send_cq, 1);
+ }
+}
+
+void * ib_busy_poll_thread(void *n)
+{
+ struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
+ struct ibv_wc wc[ib->cq_size];
+ int size;
+
+ while(1)
+ {
+ // Poll as long as WCs are available
+ while((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc)))
+ ib->poll.on_compl(n, wc, &size);
+
+ if(ib->poll.stopThread)
+ return NULL;
+ }
+}
+
+static void ib_init_wc_poll(struct node *n)
+{
+ int ret;
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+ ib->ctx.comp_channel = NULL;
+
+ if(ib->poll.poll_mode == EVENT)
+ {
+ // Create completion channel
+ ib->ctx.comp_channel = ibv_create_comp_channel(ib->ctx.id->verbs);
+ if(!ib->ctx.comp_channel)
+ error("Could not create completion channel in node %s.", node_name(n));
+ }
+
+ // Create completion queues and bind to channel (or NULL)
+ ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs,
+ ib->cq_size,
+ NULL,
+ NULL,
+ 0);
+ if(!ib->ctx.recv_cq)
+ error("Could not create receive completion queue in node %s.", node_name(n));
+
+ ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs,
+ ib->cq_size,
+ NULL,
+ ib->ctx.comp_channel,
+ 0);
+ if(!ib->ctx.send_cq)
+ error("Could not create send completion queue in node %s.", node_name(n));
+
+ if(ib->poll.poll_mode == EVENT)
+ {
+ // Request notifications from completion queue
+ ret = ibv_req_notify_cq(ib->ctx.send_cq, 0);
+ if(ret)
+ error("Failed to request notifiy CQ in node %s: %s",
+ node_name(n), gai_strerror(ret));
+ }
+
+ // Initialize polling pthread for source
+ if(ib->is_source)
+ {
+ ret = pthread_create(&ib->poll.cq_poller_thread, NULL, ib->poll.poll_func, n);
+ if(ret)
+ {
+ error("Failed to create poll thread of node %s: %s",
+ node_name(n), gai_strerror(ret));
+ }
+ }
+}
+
+static void ib_build_ibv(struct node *n)
+{
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+ int ret;
+
+ //Allocate protection domain
+ ib->ctx.pd = ibv_alloc_pd(ib->ctx.id->verbs);
+ if(!ib->ctx.pd)
+ error("Could not allocate protection domain in node %s.", node_name(n));
+ info("Allocated Protection Domain");
+
+ // Initiate poll mode
+ ib_init_wc_poll(n);
+
+ // Prepare remaining Queue Pair (QP) attributes
+ ib->qp_init.send_cq = ib->ctx.send_cq;
+ ib->qp_init.recv_cq = ib->ctx.recv_cq;
+
+ //ToDo: Set maximum inline data
+
+ // Create the actual QP
+ ret = rdma_create_qp(ib->ctx.id, ib->ctx.pd, &ib->qp_init);
+ if(ret)
+ error("Failed to create Queue Pair in node %s.", node_name(n));
+
+ info("Created Queue Pair with %i receive and %i send elements.",
+ ib->qp_init.cap.max_recv_wr, ib->qp_init.cap.max_send_wr);
+
+ // Allocate memory
+ ib->mem.p_recv.state = STATE_DESTROYED;
+ ib->mem.p_recv.queue.state = STATE_DESTROYED;
+
+ // Set pool size to maximum size of Receive Queue
+ pool_init(&ib->mem.p_recv,
+ ib->qp_init.cap.max_recv_wr,
+ SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN),
+ &memory_type_heap);
+ if(ret)
+ {
+ error("Failed to init recv memory pool of node %s: %s",
+ node_name(n), gai_strerror(ret));
+ }
+
+ //ToDo: initialize r_addr_key struct if mode is RDMA
+
+ // Register memory for IB Device. Not necessary if data is send
+ // exclusively inline
+ ib->mem.mr_recv = ibv_reg_mr(
+ ib->ctx.pd,
+ (char*)&ib->mem.p_recv+ib->mem.p_recv.buffer_off,
+ ib->mem.p_recv.len,
+ IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
+ if(!ib->mem.mr_recv) {
+ error("Failed to register mr_recv with ibv_reg_mr of node %s.",
+ node_name(n));
+ }
+ info("Allocated receive memory.");
+
+ if(ib->is_source)
+ {
+ ib->mem.p_send.state = STATE_DESTROYED;
+ ib->mem.p_send.queue.state = STATE_DESTROYED;
+
+ // Set pool size to maximum size of Receive Queue
+ pool_init(&ib->mem.p_send,
+ ib->qp_init.cap.max_send_wr,
+ sizeof(double),
+ &memory_type_heap);
+ if(ret)
+ {
+ error("Failed to init send memory of node %s: %s",
+ node_name(n), gai_strerror(ret));
+ }
+
+ //ToDo: initialize r_addr_key struct if mode is RDMA
+
+ // Register memory for IB Device. Not necessary if data is send
+ // exclusively inline
+ ib->mem.mr_send = ibv_reg_mr(
+ ib->ctx.pd,
+ (char*)&ib->mem.p_send+ib->mem.p_send.buffer_off,
+ ib->mem.p_send.len,
+ IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
+ if(!ib->mem.mr_send) {
+ error("Failed to register mr_send with ibv_reg_mr of node %s.",
+ node_name(n));
+ }
+ info("Allocated send memory.");
+ }
+}
+
+static int ib_addr_resolved(struct node *n)
+{
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+ int ret;
+
+ info("Successfully resolved address.");
+
+ // Build all components from IB Verbs
+ ib_build_ibv(n);
+
+ // Resolve address
+ ret = rdma_resolve_route(ib->ctx.id, ib->conn.timeout);
+ if(ret)
+ error("Failed to resolve route in node %s.", node_name(n));
+
+ return 0;
+}
+
+static int ib_route_resolved(struct node *n)
+{
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+ int ret;
+
+ info("Successfully resolved route.");
+
+ struct rdma_conn_param cm_params;
+ memset(&cm_params, 0, sizeof(cm_params));
+
+ // Send connection request
+ ret = rdma_connect(ib->ctx.id, &cm_params);
+ if(ret)
+ error("Failed to connect in node %s.", node_name(n));
+
+ info("Called rdma_connect.");
+
+ return 0;
+}
+
+static int ib_connect_request(struct node *n, struct rdma_cm_id *id)
+{
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+ int ret;
+ info("Received a connection request!");
+
+ ib->ctx.id = id;
+ ib_build_ibv(n);
+
+ struct rdma_conn_param cm_params;
+ memset(&cm_params, 0, sizeof(cm_params));
+
+ // Accept connection request
+ ret = rdma_accept(ib->ctx.id, &cm_params);
+ if(ret)
+ error("Failed to connect in node %s.", node_name(n));
+
+ info("Successfully accepted connection request.");
+
+ return 0;
+}
+
+static int ib_event(struct node *n, struct rdma_cm_event *event)
+{
+ int ret = 0;
+
+ switch(event->event)
+ {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ ret = ib_addr_resolved(n);
+ break;
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ error("Address resolution (rdma_resolve_addr) failed!");
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ ret = ib_route_resolved(n);
+ break;
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ error("Route resolution (rdma_resovle_route) failed!");
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ ret = ib_connect_request(n, event->id);
+ break;
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ error("An error has occurred trying to establish a connection!");
+ case RDMA_CM_EVENT_REJECTED:
+ error("Connection request or response was rejected by the remote end point!");
+ case RDMA_CM_EVENT_ESTABLISHED:
+ info("Connection established!");
+ ret = 1;
+ break;
+ case RDMA_CM_EVENT_DISCONNECTED:
+ ret = ib_cleanup(n);
+ break;
+ default:
+ error("Unknown event occurred: %u",
+ event->event);
+ }
+
+ return ret;
+}
+
+int ib_reverse(struct node *n)
+{
+ return 0;
+}
+
+int ib_parse(struct node *n, json_t *cfg)
+{
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+
+ int ret;
+ char *local = NULL;
+ char *remote = NULL;
+ const char *port_space = "RDMA_PC_TCP";
+ const char *poll_mode = "BUSY";
+ const char *qp_type = "IBV_QPT_RC";
+ int timeout = 1000;
+ int cq_size = 128;
+ int max_send_wr = 128;
+ int max_recv_wr = 128;
+
+ json_error_t err;
+ ret = json_unpack_ex(cfg, &err, 0, "{ s?: s, s?: s, s?: s, s?: i, \
+ s?: s, s?: i, s?: s, s?: i, s?: i}",
+ "remote", &remote,
+ "local", &local,
+ "rdma_port_space", &port_space,
+ "resolution_timeout", &timeout,
+ "poll_mode", &poll_mode,
+ "cq_size", &cq_size,
+ "qp_type", &qp_type,
+ "max_send_wr", &max_send_wr,
+ "max_recv_wr", &max_recv_wr
+ );
+ if(ret)
+ jerror(&err, "Failed to parse configuration of node %s", node_name(n));
+
+ // Translate IP:PORT to a struct addrinfo
+ char* ip_adr = strtok(local, ":");
+ char* port = strtok(NULL, ":");
+ ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.src_addr);
+ if(ret)
+ {
+ error("Failed to resolve local address '%s' of node %s: %s",
+ local, node_name(n), gai_strerror(ret));
+ }
+
+ // Translate port space
+ if(strcmp(port_space, "RDMA_PS_IPOIB") == 0) ib->conn.port_space = RDMA_PS_IPOIB;
+ else if(strcmp(port_space, "RDMA_PS_TCP") == 0) ib->conn.port_space = RDMA_PS_TCP;
+ else if(strcmp(port_space, "RDMA_PS_UDP") == 0) ib->conn.port_space = RDMA_PS_UDP;
+ else if(strcmp(port_space, "RDMA_PS_IB") == 0) ib->conn.port_space = RDMA_PS_IB;
+ else {
+ error("Failed to translate rdma_port_space in node %s. %s is not a valid \
+ port space supported by rdma_cma.h!", node_name(n), port_space);
+ }
+
+ // Set timeout
+ ib->conn.timeout = timeout;
+
+ n->in.vectorize = 256;
+
+ // Translate poll mode
+ if(strcmp(poll_mode, "EVENT") == 0)
+ {
+ ib->poll.poll_mode = EVENT;
+ ib->poll.poll_func = ib_event_thread;
+ }
+ else if(strcmp(poll_mode, "BUSY") == 0)
+ {
+ ib->poll.poll_mode = BUSY;
+ ib->poll.poll_func = ib_busy_poll_thread;
+ }
+ else
+ {
+ error("Failed to translate poll_mode in node %s. %s is not a valid \
+ poll mode!", node_name(n), poll_mode);
+ }
+
+ // Set completion queue size
+ ib->cq_size = cq_size;
+
+ // Translate QP type
+ if(strcmp(qp_type, "IBV_QPT_RC") == 0) ib->qp_init.qp_type = IBV_QPT_RC;
+ else if(strcmp(qp_type, "IBV_QPT_UC") == 0) ib->qp_init.qp_type = IBV_QPT_UC;
+ else if(strcmp(qp_type, "IBV_QPT_UD") == 0) ib->qp_init.qp_type = IBV_QPT_UD;
+ else {
+ error("Failed to translate qp_type in node %s. %s is not a valid \
+ qp_type!", node_name(n), qp_type);
+ }
+
+ // Set max. send and receive Work Requests
+ // First check if the set value is a power of 2, and warn the user if this is not the case
+ int max_send_pow = (int) pow(2, ceil(log2(max_send_wr)));
+ int max_recv_pow = (int) pow(2, ceil(log2(max_recv_wr)));
+
+ if(max_send_wr != max_send_pow)
+ warn("Max. number of send WRs (%i) is not a power of 2! The HCA will change this to a power of 2: %i",
+ max_send_wr, max_send_pow);
+
+ if(max_recv_wr != max_recv_pow)
+ warn("Max. number of recv WRs (%i) is not a power of 2! The HCA will change this to a power of 2: %i",
+ max_recv_wr, max_recv_pow);
+
+ ib->qp_init.cap.max_send_wr = max_send_wr;
+ ib->qp_init.cap.max_recv_wr = max_recv_wr;
+
+ // Set available receive Work Requests to 0
+ ib->conn.available_recv_wrs = 0;
+
+ // Set remaining QP attributes
+ ib->qp_init.cap.max_send_sge = 1;
+ ib->qp_init.cap.max_recv_sge = 1;
+
+ //Check if node is a source and connect to target
+ if(remote)
+ {
+ ib->is_source = 1;
+
+ // Translate address info
+ char* ip_adr = strtok(remote, ":");
+ char* port = strtok(NULL, ":");
+ ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.dst_addr);
+ if(ret)
+ {
+ error("Failed to resolve remote address '%s' of node %s: %s",
+ remote, node_name(n), gai_strerror(ret));
+ }
+
+ // Set correct Work Completion function
+ ib->poll.on_compl = ib_completion_source;
+ }
+ else
+ {
+ ib->is_source = 0;
+
+ // Set correct Work Completion function
+ ib->poll.on_compl = ib_completion_target;
+ }
+
+ return 0;
+}
+
+char * ib_print(struct node *n)
+{
+ return 0;
+}
+
+int ib_destroy(struct node *n)
+{
+ return 0;
+}
+
+void * ib_disconnect_thread(void *n)
+{
+ struct node *node = (struct node *)n;
+ struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
+ struct rdma_cm_event *event;
+
+ while(rdma_get_cm_event(ib->ctx.ec, &event) == 0)
+ {
+ if(event->event == RDMA_CM_EVENT_DISCONNECTED)
+ {
+ rdma_ack_cm_event(event);
+ ib->conn.rdma_disconnect_called = 1;
+
+ node_stop(node);
+ return NULL;
+ }
+ }
+ return NULL;
+}
+
+int ib_start(struct node *n)
+{
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+ struct rdma_cm_event *event = NULL;
+ int ret;
+
+ // Create event channel
+ ib->ctx.ec = rdma_create_event_channel();
+ if(!ib->ctx.ec)
+ error("Failed to create event channel in node %s!", node_name(n));
+
+ ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space);
+ if(ret)
+ {
+ error("Failed to create rdma_cm_id of node %s: %s",
+ node_name(n), gai_strerror(ret));
+ }
+ info("Succesfully created rdma_cm_id.");
+
+ // Bind rdma_cm_id to the HCA
+ ret = rdma_bind_addr(ib->ctx.id, ib->conn.src_addr->ai_addr);
+ if(ret)
+ {
+ error("Failed to bind to local device of node %s: %s",
+ node_name(n), gai_strerror(ret));
+ }
+ info("Bound rdma_cm_id to Infiniband device.");
+
+ if(ib->is_source)
+ {
+ // Resolve address
+ ret = rdma_resolve_addr(ib->ctx.id,
+ NULL,
+ ib->conn.dst_addr->ai_addr,
+ ib->conn.timeout);
+ if(ret)
+ {
+ error("Failed to resolve remote address after %ims of node %s: %s",
+ ib->conn.timeout, node_name(n), gai_strerror(ret));
+ }
+ }
+ else
+ {
+ // The ID will be overwritten for the target. If the event type is
+ // RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for
+ // that communication.
+ ib->ctx.listen_id = ib->ctx.id;
+
+ // Listen on rdma_cm_id for events
+ ret = rdma_listen(ib->ctx.listen_id, 10);
+ if(ret)
+ error("Failed to listen to rdma_cm_id on node %s", node_name(n));
+ }
+
+ // Several events should occur on the event channel, to make
+ // sure the nodes are succesfully connected.
+ info("Starting to monitor events on rdma_cm_id.");
+
+ while(rdma_get_cm_event(ib->ctx.ec, &event) == 0)
+ {
+ struct rdma_cm_event event_copy;
+ memcpy(&event_copy, event, sizeof(*event));
+
+ rdma_ack_cm_event(event);
+
+ if(ib_event(n, &event_copy))
+ break;
+ }
+
+ ret = pthread_create(&ib->conn.stop_thread, NULL, ib_disconnect_thread, n);
+ if(ret)
+ {
+ error("Failed to create thread to monitor disconnects in node %s: %s",
+ node_name(n), gai_strerror(ret));
+ }
+
+ return 0;
+}
+
+int ib_stop(struct node *n)
+{
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+ struct rdma_cm_event *event = NULL;
+ int ret;
+
+ // Call RDMA disconnect function
+ // Will flush all outstanding WRs to the Completion Queue and
+ // will call RDMA_CM_EVENT_DISCONNECTED if that is done.
+ ret = rdma_disconnect(ib->ctx.id);
+ if(ret)
+ {
+ error("Error while calling rdma_disconnect in node %s: %s",
+ node_name(n), gai_strerror(ret));
+ }
+ info("Called rdma_disconnect.");
+
+ // If disconnected event already occured, directly call cleanup function
+ if(ib->conn.rdma_disconnect_called)
+ {
+ ib_cleanup(n);
+ }
+ // Else, wait for event to occur
+ else
+ {
+ ib->conn.rdma_disconnect_called = 1;
+ rdma_get_cm_event(ib->ctx.ec, &event);
+
+ rdma_ack_cm_event(event);
+
+ ib_event(n, event);
+ }
+
+ return 0;
+}
+
+int ib_init(struct super_node *n)
+{
+ return 0;
+}
+
+int ib_deinit()
+{
+ return 0;
+}
+
+int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
+{
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+ struct ibv_wc wc[n->in.vectorize];
+ struct ibv_recv_wr wr[cnt], *bad_wr = NULL;
+ struct ibv_sge sge[cnt];
+ struct ibv_mr * mr;
+ int ret;
+
+
+
+ if(ib->conn.available_recv_wrs <= ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize)
+ {
+ // Get Memory Region
+ mr = memory_ib_get_mr(smps[0]);
+
+ for(int i=0; idata;
+ sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
+ sge[i].lkey = mr->lkey;
+
+ // Prepare a receive Work Request
+ wr[i].wr_id = (uintptr_t)smps[i];
+ wr[i].next = &wr[i+1];
+ wr[i].sg_list = &sge[i];
+ wr[i].num_sge = 1;
+
+ ib->conn.available_recv_wrs++;
+
+ if(ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1))
+ {
+ wr[i].next = NULL;
+ break;
+ }
+ }
+ // Post list of Work Requests
+ ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr);
+
+ }
+
+ // Poll Completion Queue
+ ret = ibv_poll_cq(ib->ctx.recv_cq, n->in.vectorize, wc);
+
+ if(ret)
+ {
+ ib->conn.available_recv_wrs -= ret;
+
+ for(int i=0; ilength = wc[i].byte_len/sizeof(double);
+ }
+ else
+ ret = 0;
+
+ //Release sample
+ sample_put((struct sample*)(wc[i].wr_id));
+ }
+ }
+
+ return ret;
+}
+
+int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
+{
+ struct infiniband *ib = (struct infiniband *) n->_vd;
+ struct ibv_send_wr wr[cnt], *bad_wr = NULL;
+ struct ibv_sge sge[cnt];
+ struct ibv_mr * mr;
+ int ret;
+
+ memset(&wr, 0, sizeof(wr));
+
+ //ToDo: Place this into configuration and create checks if settings are valid
+ int send_inline = 1;
+
+ // Get Memory Region
+ mr = memory_ib_get_mr(smps[0]);
+
+ for(int i=0; idata;
+ sge[i].length = smps[i]->length*sizeof(double);
+ sge[i].lkey = mr->lkey;
+
+ // Set Send Work Request
+ wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC
+ wr[i].sg_list = &sge[i];
+ wr[i].num_sge = 1;
+
+ if(i == (cnt-1))
+ wr[i].next = NULL;
+ else
+ wr[i].next = &wr[i+1];
+
+ wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline<<3);
+ wr[i].imm_data = htonl(0); //ToDo: set this to a useful value
+ wr[i].opcode = IBV_WR_SEND_WITH_IMM;
+ }
+
+ //Send linked list of Work Requests
+ ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr);
+ if(ret)
+ {
+ error("Failed to send message in node %s: %s",
+ node_name(n), gai_strerror(ret));
+
+ return -ret;
+ }
+
+ return cnt;
+}
+
+int ib_fd(struct node *n)
+{
+ return 0;
+}
+
+static struct plugin p = {
+ .name = "infiniband",
+ .description = "Infiniband",
+ .type = PLUGIN_TYPE_NODE,
+ .node = {
+ .vectorize = 0,
+ .size = sizeof(struct infiniband),
+ .reverse = ib_reverse,
+ .parse = ib_parse,
+ .print = ib_print,
+ .start = ib_start,
+ .destroy = ib_destroy,
+ .stop = ib_stop,
+ .init = ib_init,
+ .deinit = ib_deinit,
+ .read = ib_read,
+ .write = ib_write,
+ .fd = ib_fd,
+ .memory_type = memory_ib
+ }
+};
+
+REGISTER_PLUGIN(&p)
+LIST_INIT_STATIC(&p.node.instances)
diff --git a/lib/nodes/loopback.c b/lib/nodes/loopback.c
index e25efcd67..62b11519c 100644
--- a/lib/nodes/loopback.c
+++ b/lib/nodes/loopback.c
@@ -50,11 +50,11 @@ int loopback_open(struct node *n)
int ret;
struct loopback *l = (struct loopback *) n->_vd;
- ret = pool_init(&l->pool, l->queuelen, SAMPLE_LEN(n->samplelen), &memtype_hugepage);
+ ret = pool_init(&l->pool, l->queuelen, SAMPLE_LEN(n->samplelen), &memory_hugepage);
if (ret)
return ret;
- return queue_signalled_init(&l->queue, l->queuelen, &memtype_hugepage, QUEUE_SIGNALLED_EVENTFD);
+ return queue_signalled_init(&l->queue, l->queuelen, &memory_hugepage, QUEUE_SIGNALLED_EVENTFD);
}
int loopback_close(struct node *n)
diff --git a/lib/nodes/mqtt.c b/lib/nodes/mqtt.c
index bc3692f13..893d10d8d 100644
--- a/lib/nodes/mqtt.c
+++ b/lib/nodes/mqtt.c
@@ -301,11 +301,11 @@ int mqtt_start(struct node *n)
if (ret)
return ret;
- ret = pool_init(&m->pool, 1024, SAMPLE_LEN(n->samplelen), &memtype_hugepage);
+ ret = pool_init(&m->pool, 1024, SAMPLE_LEN(n->samplelen), &memory_hugepage);
if (ret)
return ret;
- ret = queue_signalled_init(&m->queue, 1024, &memtype_hugepage, 0);
+ ret = queue_signalled_init(&m->queue, 1024, &memory_hugepage, 0);
if (ret)
return ret;
diff --git a/lib/nodes/websocket.c b/lib/nodes/websocket.c
index b30926fff..a81b8b2e8 100644
--- a/lib/nodes/websocket.c
+++ b/lib/nodes/websocket.c
@@ -81,7 +81,7 @@ static int websocket_connection_init(struct websocket_connection *c)
c->_name = NULL;
- ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memtype_hugepage);
+ ret = queue_init(&c->queue, DEFAULT_QUEUELEN, &memory_hugepage);
if (ret)
return ret;
@@ -400,11 +400,11 @@ int websocket_start(struct node *n)
int ret;
struct websocket *w = (struct websocket *) n->_vd;
- ret = pool_init(&w->pool, DEFAULT_WEBSOCKET_QUEUELEN, SAMPLE_LEN(DEFAULT_WEBSOCKET_SAMPLELEN), &memtype_hugepage);
+ ret = pool_init(&w->pool, DEFAULT_WEBSOCKET_QUEUELEN, SAMPLE_LEN(DEFAULT_WEBSOCKET_SAMPLELEN), &memory_hugepage);
if (ret)
return ret;
- ret = queue_signalled_init(&w->queue, DEFAULT_WEBSOCKET_QUEUELEN, &memtype_hugepage, 0);
+ ret = queue_signalled_init(&w->queue, DEFAULT_WEBSOCKET_QUEUELEN, &memory_hugepage, 0);
if (ret)
return ret;
diff --git a/lib/path.c b/lib/path.c
index 27e66861a..48048d7e8 100644
--- a/lib/path.c
+++ b/lib/path.c
@@ -46,7 +46,7 @@ static int path_source_init(struct path_source *ps)
{
int ret;
- ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, ps->node->in.vectorize), SAMPLE_LEN(ps->node->samplelen), &memtype_hugepage);
+ ret = pool_init(&ps->pool, MAX(DEFAULT_QUEUELEN, ps->node->in.vectorize), SAMPLE_LEN(ps->node->samplelen), &memory_hugepage);
if (ret)
return ret;
@@ -148,7 +148,7 @@ static int path_destination_init(struct path_destination *pd, int queuelen)
{
int ret;
- ret = queue_init(&pd->queue, queuelen, &memtype_hugepage);
+ ret = queue_init(&pd->queue, queuelen, &memory_hugepage);
if (ret)
return ret;
@@ -430,7 +430,7 @@ int path_init2(struct path *p)
if (!p->samplelen)
p->samplelen = DEFAULT_SAMPLELEN;
- ret = pool_init(&p->pool, MAX(1, list_length(&p->destinations)) * p->queuelen, SAMPLE_LEN(p->samplelen), &memtype_hugepage);
+ ret = pool_init(&p->pool, MAX(1, list_length(&p->destinations)) * p->queuelen, SAMPLE_LEN(p->samplelen), &memory_hugepage);
if (ret)
return ret;
diff --git a/lib/pool.c b/lib/pool.c
index 0f77df83a..1a7ce726f 100644
--- a/lib/pool.c
+++ b/lib/pool.c
@@ -25,7 +25,7 @@
#include
#include
-int pool_init(struct pool *p, size_t cnt, size_t blocksz, struct memtype *m)
+int pool_init(struct pool *p, size_t cnt, size_t blocksz, struct memory_type *m)
{
int ret;
@@ -35,7 +35,6 @@ int pool_init(struct pool *p, size_t cnt, size_t blocksz, struct memtype *m)
p->alignment = kernel_get_cacheline_size();
p->blocksz = p->alignment * CEIL(blocksz, p->alignment);
p->len = cnt * p->blocksz;
- p->mem = m;
void *buffer = memory_alloc_aligned(m, p->len, p->alignment);
if (!buffer)
@@ -66,7 +65,7 @@ int pool_destroy(struct pool *p)
queue_destroy(&p->queue);
void *buffer = (char*) p + p->buffer_off;
- ret = memory_free(p->mem, buffer, p->len);
+ ret = memory_free(buffer);
if (ret == 0)
p->state = STATE_DESTROYED;
diff --git a/lib/queue.c b/lib/queue.c
index af65d0bde..0b020b192 100644
--- a/lib/queue.c
+++ b/lib/queue.c
@@ -36,7 +36,7 @@
#include
/** Initialize MPMC queue */
-int queue_init(struct queue *q, size_t size, struct memtype *mem)
+int queue_init(struct queue *q, size_t size, struct memory_type *m)
{
assert(q->state == STATE_DESTROYED);
@@ -47,9 +47,8 @@ int queue_init(struct queue *q, size_t size, struct memtype *mem)
warn("A queue size was changed from %lu to %lu", old_size, size);
}
- q->mem = mem;
q->buffer_mask = size - 1;
- struct queue_cell *buffer = (struct queue_cell *) memory_alloc(q->mem, sizeof(struct queue_cell) * size);
+ struct queue_cell *buffer = (struct queue_cell *) memory_alloc(m, sizeof(struct queue_cell) * size);
if (!buffer)
return -2;
@@ -74,8 +73,7 @@ int queue_destroy(struct queue *q)
if (q->state == STATE_DESTROYED)
return 0;
- ret = memory_free(q->mem, buffer, (q->buffer_mask + 1) * sizeof(struct queue_cell));
-
+ ret = memory_free(buffer);
if (ret == 0)
q->state = STATE_DESTROYED;
diff --git a/lib/queue_signalled.c b/lib/queue_signalled.c
index dc6d6f9f6..cd9be6ecf 100644
--- a/lib/queue_signalled.c
+++ b/lib/queue_signalled.c
@@ -37,7 +37,7 @@ static void queue_signalled_cleanup(void *p)
pthread_mutex_unlock(&qs->pthread.mutex);
}
-int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memtype *mem, int flags)
+int queue_signalled_init(struct queue_signalled *qs, size_t size, struct memory_type *mem, int flags)
{
int ret;
diff --git a/lib/shmem.c b/lib/shmem.c
index 56020a249..294e7ae67 100644
--- a/lib/shmem.c
+++ b/lib/shmem.c
@@ -35,8 +35,8 @@
size_t shmem_total_size(int queuelen, int samplelen)
{
- /* We have the constant const of the memtype header */
- return sizeof(struct memtype)
+ /* We have the constant const of the memory_type header */
+ return sizeof(struct memory_type)
/* and the shared struct itself */
+ sizeof(struct shmem_shared)
/* the size of the actual queue and the queue for the pool */
@@ -44,7 +44,7 @@ size_t shmem_total_size(int queuelen, int samplelen)
/* the size of the pool */
+ queuelen * kernel_get_cacheline_size() * CEIL(SAMPLE_LEN(samplelen), kernel_get_cacheline_size())
/* a memblock for each allocation (1 shmem_shared, 2 queues, 1 pool) */
- + 4 * sizeof(struct memblock)
+ + 4 * sizeof(struct memory_block)
/* and some extra buffer for alignment */
+ 1024;
}
@@ -55,7 +55,7 @@ int shmem_int_open(const char *wname, const char* rname, struct shmem_int *shm,
int fd, ret;
size_t len;
void *base;
- struct memtype *manager;
+ struct memory_type *manager;
struct shmem_shared *shared;
struct stat stat_buf;
sem_t *sem_own, *sem_other;
@@ -92,7 +92,7 @@ retry: fd = shm_open(wname, O_RDWR|O_CREAT|O_EXCL, 0600);
close(fd);
- manager = memtype_managed_init(base, len);
+ manager = memory_managed(base, len);
shared = memory_alloc(manager, sizeof(struct shmem_shared));
if (!shared) {
errno = ENOMEM;
@@ -144,7 +144,7 @@ retry: fd = shm_open(wname, O_RDWR|O_CREAT|O_EXCL, 0600);
if (base == MAP_FAILED)
return -10;
- cptr = (char *) base + sizeof(struct memtype) + sizeof(struct memblock);
+ cptr = (char *) base + sizeof(struct memory_type) + sizeof(struct memory_block);
shared = (struct shmem_shared *) cptr;
shm->read.base = base;
shm->read.name = rname;
diff --git a/packaging/docker/Dockerfile.dev b/packaging/docker/Dockerfile.dev
index 33045f21d..b2a1d9311 100644
--- a/packaging/docker/Dockerfile.dev
+++ b/packaging/docker/Dockerfile.dev
@@ -83,6 +83,27 @@ RUN dnf -y install \
mosquitto-devel \
comedilib-devel comedilib
+# IB Verbs Dependencies
+RUN dnf -y install \
+ libibverbs-utils \
+ libibverbs-devel \
+ libibverbs-devel-static \
+ libmlx4 \
+ libmlx5 \
+ ibutils \
+ libibcm \
+ libibcommon \
+ libibmad \
+ libibumad
+
+# RDMA CM Dependencies
+RUN dnf -y install \
+ librdmacm-utils \
+ librdmacm-devel \
+ librdmacm \
+ libibumad-devel \
+ perftest
+
# Build & Install Criterion
RUN cd /tmp && \
git clone --recursive https://github.com/Snaipe/Criterion && \
diff --git a/src/villas-hook.cpp b/src/villas-hook.cpp
index 02a51ec24..039844542 100644
--- a/src/villas-hook.cpp
+++ b/src/villas-hook.cpp
@@ -182,7 +182,7 @@ check: if (optarg == endptr)
smps = (struct sample **) alloc(cnt * sizeof(struct sample *));
- ret = pool_init(&q, 10 * cnt, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage);
+ ret = pool_init(&q, 10 * cnt, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memory_hugepage);
if (ret)
error("Failed to initilize memory pool");
diff --git a/src/villas-pipe.cpp b/src/villas-pipe.cpp
index 347ee939d..31d83e3c1 100644
--- a/src/villas-pipe.cpp
+++ b/src/villas-pipe.cpp
@@ -133,7 +133,8 @@ static void * send_loop(void *ctx)
struct sample *smps[node->out.vectorize];
/* Initialize memory */
- ret = pool_init(&sendd.pool, LOG2_CEIL(node->out.vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage);
+ ret = pool_init(&sendd.pool, MAX(16384, 2*LOG2_CEIL(node->out.vectorize)), SAMPLE_LEN(DEFAULT_SAMPLELEN), node_memory_type(node, &memory_type_heap));
+
if (ret < 0)
error("Failed to allocate memory for receive pool.");
@@ -197,7 +198,8 @@ static void * recv_loop(void *ctx)
struct sample *smps[node->in.vectorize];
/* Initialize memory */
- ret = pool_init(&recvv.pool, LOG2_CEIL(node->in.vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_hugepage);
+ ret = pool_init(&recvv.pool, MAX(16*8192, 2*LOG2_CEIL(node->in.vectorize)), SAMPLE_LEN(DEFAULT_SAMPLELEN), node_memory_type(node, &memory_type_heap));
+
if (ret < 0)
error("Failed to allocate memory for receive pool.");
diff --git a/src/villas-signal.cpp b/src/villas-signal.cpp
index 25f8bd346..d01691ca5 100644
--- a/src/villas-signal.cpp
+++ b/src/villas-signal.cpp
@@ -155,7 +155,7 @@ int main(int argc, char *argv[])
if (ret)
error("Failed to verify node configuration");
- ret = pool_init(&q, 16, SAMPLE_LEN(n.samplelen), &memtype_heap);
+ ret = pool_init(&q, 16, SAMPLE_LEN(n.samplelen), &memory_type_heap);
if (ret)
error("Failed to initialize pool");
diff --git a/src/villas-test-cmp.cpp b/src/villas-test-cmp.cpp
index d07472f8a..f32d5c9e0 100644
--- a/src/villas-test-cmp.cpp
+++ b/src/villas-test-cmp.cpp
@@ -120,7 +120,7 @@ check: if (optarg == endptr)
int n = argc - optind; /* The number of files which we compare */
struct side s[n];
- ret = pool_init(&pool, n, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memtype_heap);
+ ret = pool_init(&pool, n, SAMPLE_LEN(DEFAULT_SAMPLELEN), &memory_type_heap);
if (ret)
error("Failed to initialize pool");
diff --git a/tests/unit/io.c b/tests/unit/io.c
index cf8d5e162..e2bab96dd 100644
--- a/tests/unit/io.c
+++ b/tests/unit/io.c
@@ -185,7 +185,7 @@ ParameterizedTest(char *fmt, io, lowlevel)
struct sample *smps[NUM_SAMPLES];
struct sample *smpt[NUM_SAMPLES];
- ret = pool_init(&p, 2 * NUM_SAMPLES, SAMPLE_LEN(NUM_VALUES), &memtype_hugepage);
+ ret = pool_init(&p, 2 * NUM_SAMPLES, SAMPLE_LEN(NUM_VALUES), &memory_hugepage);
cr_assert_eq(ret, 0);
info("Running test for format = %s", fmt);
@@ -232,7 +232,7 @@ ParameterizedTest(char *fmt, io, highlevel)
info("Running test for format = %s", fmt);
- ret = pool_init(&p, 2 * NUM_SAMPLES, SAMPLE_LEN(NUM_VALUES), &memtype_hugepage);
+ ret = pool_init(&p, 2 * NUM_SAMPLES, SAMPLE_LEN(NUM_VALUES), &memory_hugepage);
cr_assert_eq(ret, 0);
generate_samples(&p, smps, smpt, NUM_SAMPLES, NUM_VALUES);
diff --git a/tests/unit/memory.c b/tests/unit/memory.c
index 55435d45c..821795afd 100644
--- a/tests/unit/memory.c
+++ b/tests/unit/memory.c
@@ -28,26 +28,31 @@
#include
#include
+#define HUGEPAGESIZE (1 << 22)
+
TheoryDataPoints(memory, aligned) = {
DataPoints(size_t, 1, 32, 55, 1 << 10, 1 << 20),
DataPoints(size_t, 1, 8, 1 << 12),
- DataPoints(struct memtype *, &memtype_heap, &memtype_hugepage)
+ DataPoints(struct memory_type *, &memory_type_heap, &memory_hugepage)
};
-Theory((size_t len, size_t align, struct memtype *m), memory, aligned) {
+Theory((size_t len, size_t align, struct memory_type *m), memory, aligned) {
int ret;
void *ptr;
+ ret = memory_init(100);
+ cr_assert(!ret);
+
ptr = memory_alloc_aligned(m, len, align);
cr_assert_neq(ptr, NULL, "Failed to allocate memory");
cr_assert(IS_ALIGNED(ptr, align));
- if (m == &memtype_hugepage) {
+ if (m == &memory_hugepage) {
cr_assert(IS_ALIGNED(ptr, HUGEPAGESIZE));
}
- ret = memory_free(m, ptr, len);
+ ret = memory_free(ptr);
cr_assert_eq(ret, 0, "Failed to release memory: ret=%d, ptr=%p, len=%zu: %s", ret, ptr, len, strerror(errno));
}
@@ -57,15 +62,18 @@ Test(memory, manager) {
int ret;
void *p, *p1, *p2, *p3;
- struct memtype *m;
+ struct memory_type *m;
total_size = 1 << 10;
- max_block = total_size - sizeof(struct memtype) - sizeof(struct memblock);
+ max_block = total_size - sizeof(struct memory_type) - sizeof(struct memory_block);
- p = memory_alloc(&memtype_heap, total_size);
+ ret = memory_init(0);
+ cr_assert(!ret);
+
+ p = memory_alloc(&memory_type_heap, total_size);
cr_assert_not_null(p);
- m = memtype_managed_init(p, total_size);
+ m = memory_managed(p, total_size);
cr_assert_not_null(m);
p1 = memory_alloc(m, 16);
@@ -74,7 +82,7 @@ Test(memory, manager) {
p2 = memory_alloc(m, 32);
cr_assert_not_null(p2);
- ret = memory_free(m, p1, 16);
+ ret = memory_free(p1);
cr_assert(ret == 0);
p1 = memory_alloc_aligned(m, 128, 128);
@@ -85,21 +93,21 @@ Test(memory, manager) {
cr_assert(p3);
cr_assert(IS_ALIGNED(p3, 256));
- ret = memory_free(m, p2, 32);
+ ret = memory_free(p2);
cr_assert(ret == 0);
- ret = memory_free(m, p1, 128);
+ ret = memory_free(p1);
cr_assert(ret == 0);
- ret = memory_free(m, p3, 128);
+ ret = memory_free(p3);
cr_assert(ret == 0);
p1 = memory_alloc(m, max_block);
cr_assert_not_null(p1);
- ret = memory_free(m, p1, max_block);
+ ret = memory_free(p1);
cr_assert(ret == 0);
- ret = memory_free(&memtype_heap, p, total_size);
+ ret = memory_free(p);
cr_assert(ret == 0);
}
diff --git a/tests/unit/pool.c b/tests/unit/pool.c
index d2ef6160a..e2958185a 100644
--- a/tests/unit/pool.c
+++ b/tests/unit/pool.c
@@ -32,16 +32,16 @@ struct param {
int thread_count;
int pool_size;
size_t block_size;
- struct memtype *memtype;
+ struct memory_type *memory_type;
};
ParameterizedTestParameters(pool, basic)
{
static struct param params[] = {
- { 1, 4096, 150, &memtype_heap },
- { 1, 128, 8, &memtype_hugepage },
- { 1, 4, 8192, &memtype_hugepage },
- { 1, 1 << 13, 4, &memtype_heap }
+ { 1, 4096, 150, &memory_type_heap },
+ { 1, 128, 8, &memory_hugepage },
+ { 1, 4, 8192, &memory_hugepage },
+ { 1, 1 << 13, 4, &memory_type_heap }
};
return cr_make_param_array(struct param, params, ARRAY_LEN(params));
@@ -54,7 +54,7 @@ ParameterizedTest(struct param *p, pool, basic)
void *ptr, *ptrs[p->pool_size];
- ret = pool_init(&pool, p->pool_size, p->block_size, p->memtype);
+ ret = pool_init(&pool, p->pool_size, p->block_size, p->memory_type);
cr_assert_eq(ret, 0, "Failed to create pool");
ptr = pool_get(&pool);
diff --git a/tests/unit/queue.c b/tests/unit/queue.c
index fad646c11..dca93f12d 100644
--- a/tests/unit/queue.c
+++ b/tests/unit/queue.c
@@ -51,7 +51,7 @@ struct param {
int batch_size;
void * (*thread_func)(void *);
struct queue queue;
- const struct memtype *memtype;
+ const struct memory_type *memory_type;
};
/** Get thread id as integer
@@ -243,7 +243,7 @@ Test(queue, single_threaded)
.start = 1 /* we start immeadiatly */
};
- ret = queue_init(&p.queue, p.queue_size, &memtype_heap);
+ ret = queue_init(&p.queue, p.queue_size, &memory_type_heap);
cr_assert_eq(ret, 0, "Failed to create queue");
producer(&p);
@@ -265,35 +265,35 @@ ParameterizedTestParameters(queue, multi_threaded)
.thread_count = 32,
.thread_func = producer_consumer_many,
.batch_size = 10,
- .memtype = &memtype_heap
+ .memory_type = &memory_type_heap
}, {
.iter_count = 1 << 8,
.queue_size = 1 << 9,
.thread_count = 4,
.thread_func = producer_consumer_many,
.batch_size = 100,
- .memtype = &memtype_heap
+ .memory_type = &memory_type_heap
}, {
.iter_count = 1 << 16,
.queue_size = 1 << 14,
.thread_count = 16,
.thread_func = producer_consumer_many,
.batch_size = 100,
- .memtype = &memtype_heap
+ .memory_type = &memory_type_heap
}, {
.iter_count = 1 << 8,
.queue_size = 1 << 9,
.thread_count = 4,
.thread_func = producer_consumer_many,
.batch_size = 10,
- .memtype = &memtype_heap
+ .memory_type = &memory_type_heap
}, {
.iter_count = 1 << 16,
.queue_size = 1 << 9,
.thread_count = 16,
.thread_func = producer_consumer,
.batch_size = 10,
- .memtype = &memtype_hugepage
+ .memory_type = &memory_hugepage
}
};
@@ -308,7 +308,7 @@ ParameterizedTest(struct param *p, queue, multi_threaded, .timeout = 20)
p->start = 0;
- ret = queue_init(&p->queue, p->queue_size, &memtype_heap);
+ ret = queue_init(&p->queue, p->queue_size, &memory_type_heap);
cr_assert_eq(ret, 0, "Failed to create queue");
uint64_t start_tsc_time, end_tsc_time;
@@ -350,7 +350,7 @@ Test(queue, init_destroy)
int ret;
struct queue q = { .state = STATE_DESTROYED };
- ret = queue_init(&q, 1024, &memtype_heap);
+ ret = queue_init(&q, 1024, &memory_type_heap);
cr_assert_eq(ret, 0); /* Should succeed */
ret = queue_destroy(&q);
diff --git a/tests/unit/queue_signalled.c b/tests/unit/queue_signalled.c
index 0acf8d1ea..030ef9117 100644
--- a/tests/unit/queue_signalled.c
+++ b/tests/unit/queue_signalled.c
@@ -132,7 +132,7 @@ ParameterizedTest(struct param *param, queue_signalled, simple, .timeout = 5)
pthread_t t1, t2;
- ret = queue_signalled_init(&q, LOG2_CEIL(NUM_ELEM), &memtype_heap, param->flags);
+ ret = queue_signalled_init(&q, LOG2_CEIL(NUM_ELEM), &memory_type_heap, param->flags);
cr_assert_eq(ret, 0, "Failed to initialize queue: flags=%#x, ret=%d", param->flags, ret);
ret = pthread_create(&t1, NULL, producer, &q);