diff --git a/lib/Makefile.villas.inc b/lib/Makefile.villas.inc
index dc0a3ca0a..ff375a743 100644
--- a/lib/Makefile.villas.inc
+++ b/lib/Makefile.villas.inc
@@ -28,7 +28,7 @@ LIB = $(BUILDDIR)/$(LIB_NAME).so.$(LIB_ABI_VERSION)
LIB_SRCS += $(addprefix lib/kernel/, kernel.c rt.c) \
$(addprefix lib/, sample.c path.c node.c hook.c log.c log_config.c \
utils.c super_node.c hist.c timing.c pool.c list.c queue.c \
- queue_signalled.c memory.c advio.c plugin.c node_type.c stats.c \
+ queue_signalled.c memory.c memory_ib.c advio.c plugin.c node_type.c stats.c \
mapping.c shmem.c config_helper.c crypt.c compat.c \
log_helper.c task.c buffer.c table.c bitset.c signal.c \
)
diff --git a/lib/memory_ib.c b/lib/memory_ib.c
index 80f2c868c..38e9f0f0d 100644
--- a/lib/memory_ib.c
+++ b/lib/memory_ib.c
@@ -20,12 +20,9 @@
* along with this program. If not, see .
*********************************************************************************/
-#include
-
-struct memory_ib {
- struct ibv_pd *pd;
- struct memtype *parent;
-};
+#include
+#include
+#include
struct ibv_mr * memory_ib_mr(void *ptr)
{
@@ -36,14 +33,13 @@ struct ibv_mr * memory_ib_mr(void *ptr)
void * memory_ib_alloc(struct memtype *m, size_t len, size_t alignment)
{
- struct memtype_ib *mi = (struct memtype_ib *) m->_vd;
+ struct memory_ib *mi = (struct memory_ib *) m->_vd;
- struct ibv_mr **mr = memory_alloc_aligned(m->parent, len + sizeof(struct ibv_mr *), alignment);
+ struct ibv_mr **mr = memory_alloc_aligned(mi->parent, len + sizeof(struct ibv_mr *), alignment);
char *ptr = (char *) (mr + 1);
- *mr = ibv_reg_mr(mi->pd, ptr, len,
- IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
- if(!*mr) {
+ *mr = ibv_reg_mr(mi->pd, ptr, len, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
+ if(!*mr) {
free(ptr);
return NULL;
}
@@ -53,6 +49,7 @@ void * memory_ib_alloc(struct memtype *m, size_t len, size_t alignment)
int memory_ib_free(struct memtype *m, void *ptr, size_t len)
{
+ struct memory_ib *mi = (struct memory_ib *) m->_vd;
struct ibv_mr *mr = memory_ib_mr(ptr);
ibv_dereg_mr(mr);
@@ -60,7 +57,7 @@ int memory_ib_free(struct memtype *m, void *ptr, size_t len)
ptr -= sizeof(struct ibv_mr *);
len += sizeof(struct ibv_mr *);
- memory_free(m->parent, ptr, len);
+ memory_free(mi->parent, ptr, len);
return 0;
}
@@ -68,7 +65,7 @@ int memory_ib_free(struct memtype *m, void *ptr, size_t len)
struct memtype * ib_memtype(struct node *n, struct memtype *parent)
{
struct infiniband *i = (struct infiniband *) n->_vd;
- struct memtype *mt = alloc(struct memtype);
+ struct memtype *mt = malloc(sizeof(struct memtype));
mt->name = "ib";
mt->flags = 0;
@@ -87,10 +84,12 @@ struct memtype * ib_memtype(struct node *n, struct memtype *parent)
}
/* Ausserhalb von lib/nodes/infiniband.c */
+/*
struct pool p = { .state = STATE_DESTROYED };
struct node *n = ..;
pool_init(&p, 100, 32, node_get_memtype(n));
+*/
diff --git a/lib/node.c b/lib/node.c
index eae2583e9..866cbe576 100644
--- a/lib/node.c
+++ b/lib/node.c
@@ -32,6 +32,7 @@
#include
#include
#include
+#include
static int node_direction_init(struct node_direction *nd, struct node *n)
{
@@ -546,7 +547,7 @@ int node_fd(struct node *n)
struct memtype * node_memtype(struct node *n, struct memtype *parent)
{
- return n->_vt->memtype ? n->_vt->memtype(n) : &memtype_huge;
+ return n->_vt->memtype(n, parent) ? n->_vt->memtype(n, parent) : &memtype_hugepage;
}
int node_parse_list(struct list *list, json_t *cfg, struct list *all)
diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c
index 963c26376..1a847365d 100644
--- a/lib/nodes/infiniband.c
+++ b/lib/nodes/infiniband.c
@@ -29,6 +29,7 @@
#include
#include
#include
+#include
#include
@@ -36,7 +37,7 @@ int ib_cleanup(struct node *n)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
info("Starting to clean up");
-
+
// Destroy QP
rdma_destroy_qp(ib->ctx.id);
info("Destroyed QP");
@@ -46,7 +47,7 @@ int ib_cleanup(struct node *n)
if(ib->is_source)
ibv_dereg_mr(ib->mem.mr_send);
info("Deregistered memory regions");
-
+
// Destroy pools
pool_destroy(&ib->mem.p_recv);
pool_destroy(&ib->mem.p_send);
@@ -84,7 +85,7 @@ int ib_post_recv_wrs(struct node *n)
// Post Work Request
ret = ibv_post_recv(ib->ctx.id->qp, &wr, &bad_wr);
- return ret;
+ return ret;
}
void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size)
@@ -93,7 +94,7 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size)
}
void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
-{
+{
struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd;
for(int i=0; i<*size; i++)
@@ -130,7 +131,7 @@ void * ib_event_thread(void *n)
// Request a new event in the CQ and acknowledge event
ibv_req_notify_cq(ib->ctx.cq, 0);
- ibv_ack_cq_events(ib->ctx.cq, 1);
+ ibv_ack_cq_events(ib->ctx.cq, 1);
}
}
@@ -166,10 +167,10 @@ static void ib_init_wc_poll(struct node *n)
}
// Create completion queue and bind to channel (or NULL)
- ib->ctx.cq = ibv_create_cq(ib->ctx.id->verbs,
- ib->cq_size,
- NULL,
- ib->ctx.comp_channel,
+ ib->ctx.cq = ibv_create_cq(ib->ctx.id->verbs,
+ ib->cq_size,
+ NULL,
+ ib->ctx.comp_channel,
0);
if(!ib->ctx.cq)
error("Could not create completion queue in node %s.", node_name(n));
@@ -200,7 +201,7 @@ static void ib_build_ibv(struct node *n)
{
struct infiniband *ib = (struct infiniband *) n->_vd;
int ret;
-
+
//Allocate protection domain
ib->ctx.pd = ibv_alloc_pd(ib->ctx.id->verbs);
if(!ib->ctx.pd)
@@ -218,7 +219,7 @@ static void ib_build_ibv(struct node *n)
// Create the actual QP
ret = rdma_create_qp(ib->ctx.id, ib->ctx.pd, &ib->qp_init);
- if(ret)
+ if(ret)
error("Failed to create Queue Pair in node %s.", node_name(n));
info("Created Queue Pair with %i receive and %i send elements.",
@@ -229,18 +230,18 @@ static void ib_build_ibv(struct node *n)
ib->mem.p_recv.queue.state = STATE_DESTROYED;
// Set pool size to maximum size of Receive Queue
- pool_init(&ib->mem.p_recv,
- ib->qp_init.cap.max_recv_wr,
- sizeof(double),
+ pool_init(&ib->mem.p_recv,
+ ib->qp_init.cap.max_recv_wr,
+ sizeof(double),
&memtype_heap);
- if(ret)
+ if(ret)
{
error("Failed to init recv memory pool of node %s: %s",
node_name(n), gai_strerror(ret));
}
//ToDo: initialize r_addr_key struct if mode is RDMA
-
+
// Register memory for IB Device. Not necessary if data is send
// exclusively inline
ib->mem.mr_recv = ibv_reg_mr(
@@ -260,18 +261,18 @@ static void ib_build_ibv(struct node *n)
ib->mem.p_send.queue.state = STATE_DESTROYED;
// Set pool size to maximum size of Receive Queue
- pool_init(&ib->mem.p_send,
- ib->qp_init.cap.max_send_wr,
- sizeof(double),
+ pool_init(&ib->mem.p_send,
+ ib->qp_init.cap.max_send_wr,
+ sizeof(double),
&memtype_heap);
- if(ret)
+ if(ret)
{
error("Failed to init send memory of node %s: %s",
node_name(n), gai_strerror(ret));
}
//ToDo: initialize r_addr_key struct if mode is RDMA
-
+
// Register memory for IB Device. Not necessary if data is send
// exclusively inline
ib->mem.mr_send = ibv_reg_mr(
@@ -286,13 +287,13 @@ static void ib_build_ibv(struct node *n)
info("Allocated send memory.");
}
-
+
// Post Receive Work Requests to be able to receive data
// Fill complete Receive Queue during initialization
for(int i=0; iqp_init.cap.max_recv_wr; i++)
{
ret = ib_post_recv_wrs(n);
- if(ret)
+ if(ret)
{
error("Failed to post initial receive Work Requests of node %s.",
node_name(n));
@@ -313,7 +314,7 @@ static int ib_addr_resolved(struct node *n)
// Resolve address
ret = rdma_resolve_route(ib->ctx.id, ib->conn.timeout);
- if(ret)
+ if(ret)
error("Failed to resolve route in node %s.", node_name(n));
return 0;
@@ -327,17 +328,17 @@ static int ib_route_resolved(struct node *n)
info("Successfully resolved route.");
//ToDo: Post receive WRs
-
+
struct rdma_conn_param cm_params;
memset(&cm_params, 0, sizeof(cm_params));
// Send connection request
ret = rdma_connect(ib->ctx.id, &cm_params);
- if(ret)
+ if(ret)
error("Failed to connect in node %s.", node_name(n));
info("Called rdma_connect.");
-
+
return 0;
}
@@ -349,17 +350,17 @@ static int ib_connect_request(struct node *n, struct rdma_cm_id *id)
ib->ctx.id = id;
ib_build_ibv(n);
-
+
struct rdma_conn_param cm_params;
memset(&cm_params, 0, sizeof(cm_params));
// Accept connection request
ret = rdma_accept(ib->ctx.id, &cm_params);
- if(ret)
+ if(ret)
error("Failed to connect in node %s.", node_name(n));
info("Successfully accepted connection request.");
-
+
return 0;
}
@@ -441,7 +442,7 @@ int ib_parse(struct node *n, json_t *cfg)
char* ip_adr = strtok(local, ":");
char* port = strtok(NULL, ":");
ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.src_addr);
- if(ret)
+ if(ret)
{
error("Failed to resolve local address '%s' of node %s: %s",
local, node_name(n), gai_strerror(ret));
@@ -459,20 +460,20 @@ int ib_parse(struct node *n, json_t *cfg)
// Set timeout
ib->conn.timeout = timeout;
-
+
// Translate poll mode
- if(strcmp(poll_mode, "EVENT") == 0)
+ if(strcmp(poll_mode, "EVENT") == 0)
{
ib->poll.poll_mode = EVENT;
ib->poll.poll_func = ib_event_thread;
}
- else if(strcmp(poll_mode, "BUSY") == 0)
+ else if(strcmp(poll_mode, "BUSY") == 0)
{
ib->poll.poll_mode = BUSY;
ib->poll.poll_func = ib_busy_poll_thread;
}
- else
+ else
{
error("Failed to translate poll_mode in node %s. %s is not a valid \
poll mode!", node_name(n), poll_mode);
@@ -522,7 +523,7 @@ int ib_parse(struct node *n, json_t *cfg)
char* ip_adr = strtok(remote, ":");
char* port = strtok(NULL, ":");
ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.dst_addr);
- if(ret)
+ if(ret)
{
error("Failed to resolve remote address '%s' of node %s: %s",
remote, node_name(n), gai_strerror(ret));
@@ -534,7 +535,7 @@ int ib_parse(struct node *n, json_t *cfg)
else
{
ib->is_source = 0;
-
+
// Set correct Work Completion function
ib->poll.on_compl = ib_completion_target;
}
@@ -569,7 +570,7 @@ void * ib_disconnect_thread(void *n)
return NULL;
}
}
- return NULL;
+ return NULL;
}
int ib_start(struct node *n)
@@ -586,7 +587,7 @@ int ib_start(struct node *n)
}
ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space);
- if(ret)
+ if(ret)
{
error("Failed to create rdma_cm_id of node %s: %s",
node_name(n), gai_strerror(ret));
@@ -595,21 +596,21 @@ int ib_start(struct node *n)
// Bind rdma_cm_id to the HCA
ret = rdma_bind_addr(ib->ctx.id, ib->conn.src_addr->ai_addr);
- if(ret)
+ if(ret)
{
error("Failed to bind to local device of node %s: %s",
node_name(n), gai_strerror(ret));
}
info("Bound rdma_cm_id to Infiniband device.");
-
+
if(ib->is_source)
{
// Resolve address
- ret = rdma_resolve_addr(ib->ctx.id,
- NULL,
- ib->conn.dst_addr->ai_addr,
- ib->conn.timeout);
- if(ret)
+ ret = rdma_resolve_addr(ib->ctx.id,
+ NULL,
+ ib->conn.dst_addr->ai_addr,
+ ib->conn.timeout);
+ if(ret)
{
error("Failed to resolve remote address after %ims of node %s: %s",
ib->conn.timeout, node_name(n), gai_strerror(ret));
@@ -617,14 +618,14 @@ int ib_start(struct node *n)
}
else
{
- // The ID will be overwritten for the target. If the event type is
- // RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for
- // that communication.
+ // The ID will be overwritten for the target. If the event type is
+ // RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for
+ // that communication.
ib->ctx.listen_id = ib->ctx.id;
// Listen on rdma_cm_id for events
ret = rdma_listen(ib->ctx.listen_id, 10);
- if(ret)
+ if(ret)
{
error("Failed to listen to rdma_cm_id on node %s", node_name(n));
}
@@ -667,11 +668,11 @@ int ib_stop(struct node *n)
ret = rdma_disconnect(ib->ctx.id);
if(ret)
{
- error("Error while calling rdma_disconnect in node %s: %s",
+ error("Error while calling rdma_disconnect in node %s: %s",
node_name(n), gai_strerror(ret));
}
info("Called rdma_disconnect.");
-
+
// If disconnected event already occured, directly call cleanup function
if(ib->conn.rdma_disconnect_called)
{
@@ -693,7 +694,7 @@ int ib_stop(struct node *n)
int ib_init(struct super_node *n)
{
-
+
return 0;
}
@@ -712,7 +713,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
union {
double f;
int64_t i;
- } *data;
+ } *data;
ret = ibv_poll_cq(ib->ctx.cq, cnt, wc);
@@ -791,7 +792,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
wr[i].num_sge = 1; //ToDo: Right now only smps[0] is sg_list. This can be extended
//furthermore we should break the transaction up if inline mode
//is selected
-
+
if(i == (smps[0]->length-1))
wr[i].next = NULL;
else
@@ -801,7 +802,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
wr[i].opcode = IBV_WR_SEND_WITH_IMM;
}
-
+
//Send linked list of Work Requests
ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr);
if(ret)
diff --git a/src/pipe.c b/src/pipe.c
index b9e967d97..2b0260398 100644
--- a/src/pipe.c
+++ b/src/pipe.c
@@ -132,7 +132,7 @@ static void * send_loop(void *ctx)
struct sample *smps[node->out.vectorize];
/* Initialize memory */
- ret = pool_init(&sendd.pool, LOG2_CEIL(node->out.vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), node_memtype(node));
+ ret = pool_init(&sendd.pool, LOG2_CEIL(node->out.vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), node_memtype(node, &memtype_hugepage));
if (ret < 0)
error("Failed to allocate memory for receive pool.");
@@ -196,7 +196,7 @@ static void * recv_loop(void *ctx)
struct sample *smps[node->in.vectorize];
/* Initialize memory */
- ret = pool_init(&recvv.pool, LOG2_CEIL(node->in.vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), node_memtype(node));
+ ret = pool_init(&recvv.pool, LOG2_CEIL(node->in.vectorize), SAMPLE_LEN(DEFAULT_SAMPLELEN), node_memtype(node, &memtype_hugepage));
if (ret < 0)
error("Failed to allocate memory for receive pool.");