From 2bf122991c69b973ddb2a89e4ccdee4b7661ef8b Mon Sep 17 00:00:00 2001
From: Dennis Potter <dennis@dennispotter.eu>
Date: Thu, 5 Jul 2018 18:26:32 +0200
Subject: [PATCH 1/7] Started to convert the RDMA_CM_EVENT loop to a separate
 thread and added a new state to the node. This commit is still broken

---
 include/villas/common.h           |  3 +-
 include/villas/nodes/infiniband.h |  1 -
 lib/nodes/infiniband.c            | 51 ++++++++++++++++++++-----------
 3 files changed, 35 insertions(+), 20 deletions(-)

diff --git a/include/villas/common.h b/include/villas/common.h
index 64de9bb8c..a7093ac38 100644
--- a/include/villas/common.h
+++ b/include/villas/common.h
@@ -38,7 +38,8 @@ enum state {
 	STATE_OPENED		= 4, /* alias for STATE_STARTED used by struct io */
 	STATE_STOPPED		= 5,
 	STATE_UNLOADED		= 5, /* alias for STATE_STARTED used by struct plugin */
-	STATE_CLOSED		= 5  /* alias for STATE_STARTED used by struct io */
+	STATE_CLOSED		= 5,  /* alias for STATE_STARTED used by struct io */
+	STATE_CONNECTED		= 6
 };
 
 /** Callback to destroy list elements.
diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h
index 0e0b3136c..4e7c056e4 100644
--- a/include/villas/nodes/infiniband.h
+++ b/include/villas/nodes/infiniband.h
@@ -55,7 +55,6 @@ struct infiniband {
 	struct context_s {
 		struct rdma_cm_id *listen_id;
 		struct rdma_cm_id *id;
-		struct rdma_event_channel *ec;
 
 		struct ibv_pd *pd;
 		struct ibv_cq *recv_cq;
diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c
index ba82bb1f7..ce5245986 100644
--- a/lib/nodes/infiniband.c
+++ b/lib/nodes/infiniband.c
@@ -32,7 +32,7 @@
 #include <villas/memory.h>
 #include <villas/memory/ib.h>
 
-#include <rdma/rdma_cma.h>
+static struct rdma_event_channel *rdma_event_channel;
 
 int ib_cleanup(struct node *n)
 {
@@ -59,7 +59,7 @@ int ib_cleanup(struct node *n)
 	debug(LOG_IB | 3, "Destroyed rdma_cm_id");
 
 	// Destroy event channel
-	rdma_destroy_event_channel(ib->ctx.ec);
+	rdma_destroy_event_channel(rdma_event_channel);
 	debug(LOG_IB | 3, "Destroyed event channel");
 
 	return 0;
@@ -587,6 +587,27 @@ int ib_destroy(struct node *n)
 	return 0;
 }
 
+void * ib_rdma_cm_event_thread(void *n)
+{
+	struct node *node = (struct node *) n;
+	//struct infiniband *ib = (struct infiniband *) node->_vd;
+	struct rdma_cm_event *event;
+
+	debug(LOG_IB | 1, "Started rdma_cm_event thread of node %s", node_name(node));
+
+	while (rdma_get_cm_event(rdma_event_channel, &event) == 0) {
+		struct rdma_cm_event event_copy;
+
+		memcpy(&event_copy, event, sizeof(*event));
+		rdma_ack_cm_event(event);
+
+		if (ib_event(n, &event_copy))
+			break;
+	}
+
+	return NULL;
+}
+
 void * ib_disconnect_thread(void *n)
 {
 	struct node *node = (struct node *) n;
@@ -595,7 +616,7 @@ void * ib_disconnect_thread(void *n)
 
 	debug(LOG_IB | 1, "Started disconnect thread of node %s", node_name(node));
 
-	while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) {
+	while (rdma_get_cm_event(rdma_event_channel, &event) == 0) {
 		if (event->event == RDMA_CM_EVENT_DISCONNECTED) {
 			debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED");
 
@@ -618,13 +639,13 @@ int ib_start(struct node *n)
 	debug(LOG_IB | 1, "Started ib_start");
 
 	// Create event channel
-	ib->ctx.ec = rdma_create_event_channel();
-	if (!ib->ctx.ec)
+	rdma_event_channel = rdma_create_event_channel();
+	if (!rdma_event_channel)
 		error("Failed to create event channel in node %s!", node_name(n));
 
 	debug(LOG_IB | 3, "Created event channel");
 
-	ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space);
+	ret = rdma_create_id(rdma_event_channel, &ib->ctx.id, NULL, ib->conn.port_space);
 	if (ret)
 		error("Failed to create rdma_cm_id of node %s: %s",
 			node_name(n), gai_strerror(ret));
@@ -667,22 +688,16 @@ int ib_start(struct node *n)
 	// sure the nodes are succesfully connected.
 	debug(LOG_IB | 1, "Starting to monitor events on rdma_cm_id");
 
-	while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) {
-		struct rdma_cm_event event_copy;
-
-		memcpy(&event_copy, event, sizeof(*event));
-
-		rdma_ack_cm_event(event);
-
-		if (ib_event(n, &event_copy))
-			break;
-	}
-
+	//Create thread to monitor rdma_cm_event channel
 	ret = pthread_create(&ib->conn.stop_thread, NULL, ib_disconnect_thread, n);
 	if (ret)
 		error("Failed to create thread to monitor disconnects in node %s: %s",
 			node_name(n), gai_strerror(ret));
 
+
+	//Wait until rdma_cm_event thread sets state to STARTED
+	while ( !(n->state == STATE_CONNECTED));
+
 	return 0;
 }
 
@@ -708,7 +723,7 @@ int ib_stop(struct node *n)
 	else {
 		// Else, wait for event to occur
 		ib->conn.rdma_disconnect_called = 1;
-		rdma_get_cm_event(ib->ctx.ec, &event);
+		rdma_get_cm_event(rdma_event_channel, &event);
 
 		rdma_ack_cm_event(event);
 

From e2061e58fc1deacd0a4437772fdc5c42c7a26f15 Mon Sep 17 00:00:00 2001
From: Dennis Potter <dennis@dennispotter.eu>
Date: Sat, 7 Jul 2018 12:49:22 +0200
Subject: [PATCH 2/7] Events are now monitored in a separate thread. The
 segmentation faults we saw earlier were caused because we exited ib_start
 before we created a protection domain, which is used by memory_allocation

---
 include/villas/nodes/infiniband.h |   5 +-
 lib/nodes/infiniband.c            | 378 +++++++++++++++---------------
 2 files changed, 187 insertions(+), 196 deletions(-)

diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h
index 4e7c056e4..449d8c88c 100644
--- a/include/villas/nodes/infiniband.h
+++ b/include/villas/nodes/infiniband.h
@@ -38,6 +38,7 @@
 /* Function pointer typedefs */
 typedef void  (*ib_on_completion) (struct node*, struct ibv_wc*, int*);
 typedef void * (*ib_poll_function) (void*);
+typedef void * (*ib_event_function) (void*);
 
 /* Enums */
 enum poll_mode_e {
@@ -55,6 +56,7 @@ struct infiniband {
 	struct context_s {
 		struct rdma_cm_id *listen_id;
 		struct rdma_cm_id *id;
+		struct rdma_event_channel *ec;
 
 		struct ibv_pd *pd;
 		struct ibv_cq *recv_cq;
@@ -87,8 +89,7 @@ struct infiniband {
 
 		struct r_addr_key_s *r_addr_key;
 
-		pthread_t stop_thread;
-		int rdma_disconnect_called;
+		pthread_t rdma_cm_event_thread;
 
 		int available_recv_wrs;
 	} conn;
diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c
index ce5245986..18736b088 100644
--- a/lib/nodes/infiniband.c
+++ b/lib/nodes/infiniband.c
@@ -22,6 +22,7 @@
 
 #include <string.h>
 #include <math.h>
+#include <unistd.h> //ToDo: remove me.
 
 #include <villas/nodes/infiniband.h>
 #include <villas/plugin.h>
@@ -32,8 +33,6 @@
 #include <villas/memory.h>
 #include <villas/memory/ib.h>
 
-static struct rdma_event_channel *rdma_event_channel;
-
 int ib_cleanup(struct node *n)
 {
 	struct infiniband *ib = (struct infiniband *) n->_vd;
@@ -59,7 +58,7 @@ int ib_cleanup(struct node *n)
 	debug(LOG_IB | 3, "Destroyed rdma_cm_id");
 
 	// Destroy event channel
-	rdma_destroy_event_channel(rdma_event_channel);
+	rdma_destroy_event_channel(ib->ctx.ec);
 	debug(LOG_IB | 3, "Destroyed event channel");
 
 	return 0;
@@ -216,13 +215,6 @@ static void ib_build_ibv(struct node *n)
 
 	debug(LOG_IB | 1, "Starting to build IBV components");
 
-	//Allocate protection domain
-	ib->ctx.pd = ibv_alloc_pd(ib->ctx.id->verbs);
-	if (!ib->ctx.pd)
-		error("Could not allocate protection domain in node %s", node_name(n));
-
-	debug(LOG_IB | 3, "Allocated Protection Domain");
-
 	// Initiate poll mode
 	ib_init_wc_poll(n);
 
@@ -362,51 +354,6 @@ static int ib_connect_request(struct node *n, struct rdma_cm_id *id)
 	return 0;
 }
 
-static int ib_event(struct node *n, struct rdma_cm_event *event)
-{
-	int ret = 0;
-
-	switch(event->event) {
-		case RDMA_CM_EVENT_ADDR_RESOLVED:
-			debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_RESOLVED");
-			ret = ib_addr_resolved(n);
-			break;
-		case RDMA_CM_EVENT_ADDR_ERROR:
-			debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_ERROR");
-			error("Address resolution (rdma_resolve_addr) failed!");
-		case RDMA_CM_EVENT_ROUTE_RESOLVED:
-			debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_RESOLVED");
-			ret = ib_route_resolved(n);
-			break;
-		case RDMA_CM_EVENT_ROUTE_ERROR:
-			debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_ERROR");
-			error("Route resolution (rdma_resovle_route) failed!");
-		case RDMA_CM_EVENT_CONNECT_REQUEST:
-			debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST");
-			ret = ib_connect_request(n, event->id);
-			break;
-		case RDMA_CM_EVENT_CONNECT_ERROR:
-			debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_ERROR");
-			error("An error has occurred trying to establish a connection!");
-		case RDMA_CM_EVENT_REJECTED:
-			debug(LOG_IB | 2, "Received RDMA_CM_EVENT_REJECTED");
-			error("Connection request or response was rejected by the remote end point!");
-		case RDMA_CM_EVENT_ESTABLISHED:
-			debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ESTABLISHED");
-			info("Connection established in node %s", node_name(n));
-			ret = 1;
-			break;
-		case RDMA_CM_EVENT_DISCONNECTED:
-			debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED");
-			ret = ib_cleanup(n);
-			break;
-		default:
-			error("Unknown event occurred: %u", event->event);
-	}
-
-	return ret;
-}
-
 int ib_reverse(struct node *n)
 {
 	return 0;
@@ -590,62 +537,109 @@ int ib_destroy(struct node *n)
 void * ib_rdma_cm_event_thread(void *n)
 {
 	struct node *node = (struct node *) n;
-	//struct infiniband *ib = (struct infiniband *) node->_vd;
+	struct infiniband *ib = (struct infiniband *) node->_vd;
 	struct rdma_cm_event *event;
+	int ret = 0;
 
-	debug(LOG_IB | 1, "Started rdma_cm_event thread of node %s", node_name(node));
 
-	while (rdma_get_cm_event(rdma_event_channel, &event) == 0) {
+	//debug(LOG_IB | 1, "Started rdma_cm_event thread of node %s", node_name(node));
+
+	info("EVENT: %p", ib->ctx.ec);
+	info("FD, i: %i", ib->ctx.ec->fd);
+
+	// Wait until node is completely started
+	//while (node->state != STATE_STARTED);
+
+	while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) {
 		struct rdma_cm_event event_copy;
 
+		//ToDo: check if copy is really necessary
 		memcpy(&event_copy, event, sizeof(*event));
 		rdma_ack_cm_event(event);
 
-		if (ib_event(n, &event_copy))
+		switch(event_copy.event) {
+			case RDMA_CM_EVENT_ADDR_RESOLVED:
+				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_RESOLVED");
+
+				ret = ib_addr_resolved(n);
+				break;
+
+			case RDMA_CM_EVENT_ADDR_ERROR:
+				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_ERROR");
+
+				error("Address resolution (rdma_resolve_addr) failed!");
+				break;
+
+			case RDMA_CM_EVENT_ROUTE_RESOLVED:
+				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_RESOLVED");
+
+				ret = ib_route_resolved(n);
+				break;
+
+			case RDMA_CM_EVENT_ROUTE_ERROR:
+				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_ERROR");
+
+				error("Route resolution (rdma_resovle_route) failed!");
+				break;
+
+			case RDMA_CM_EVENT_CONNECT_REQUEST:
+				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST");
+
+				ret = ib_connect_request(n, event_copy.id);
+				break;
+
+			case RDMA_CM_EVENT_CONNECT_ERROR:
+				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_ERROR");
+
+				error("An error has occurred trying to establish a connection!");
+				break;
+
+			case RDMA_CM_EVENT_REJECTED:
+				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_REJECTED");
+
+				error("Connection request or response was rejected by the remote end point!");
+				break;
+			case RDMA_CM_EVENT_ESTABLISHED:
+				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ESTABLISHED");
+
+	//			node->state = STATE_CONNECTED;
+
+				info("Connection established in node %s", node_name(n));
+				break;
+
+			case RDMA_CM_EVENT_DISCONNECTED:
+				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED");
+
+				ret = ib_cleanup(n);
+				break;
+
+			default:
+				error("Unknown event occurred: %u", event_copy.event);
+		}
+
+
+		if (ret) //ToDo: Fix me
 			break;
 	}
 
 	return NULL;
 }
 
-void * ib_disconnect_thread(void *n)
-{
-	struct node *node = (struct node *) n;
-	struct infiniband *ib = (struct infiniband *) node->_vd;
-	struct rdma_cm_event *event;
-
-	debug(LOG_IB | 1, "Started disconnect thread of node %s", node_name(node));
-
-	while (rdma_get_cm_event(rdma_event_channel, &event) == 0) {
-		if (event->event == RDMA_CM_EVENT_DISCONNECTED) {
-			debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED");
-
-			rdma_ack_cm_event(event);
-			ib->conn.rdma_disconnect_called = 1;
-
-			node_stop(node);
-			return NULL;
-		}
-	}
-	return NULL;
-}
-
 int ib_start(struct node *n)
 {
 	struct infiniband *ib = (struct infiniband *) n->_vd;
-	struct rdma_cm_event *event = NULL;
 	int ret;
 
 	debug(LOG_IB | 1, "Started ib_start");
 
 	// Create event channel
-	rdma_event_channel = rdma_create_event_channel();
-	if (!rdma_event_channel)
+	ib->ctx.ec = rdma_create_event_channel();
+	if (!ib->ctx.ec)
 		error("Failed to create event channel in node %s!", node_name(n));
 
 	debug(LOG_IB | 3, "Created event channel");
 
-	ret = rdma_create_id(rdma_event_channel, &ib->ctx.id, NULL, ib->conn.port_space);
+	ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space);
 	if (ret)
 		error("Failed to create rdma_cm_id of node %s: %s",
 			node_name(n), gai_strerror(ret));
@@ -684,27 +678,30 @@ int ib_start(struct node *n)
 		debug(LOG_IB | 3, "Started to listen to rdma_cm_id");
 	}
 
+	//Allocate protection domain
+	ib->ctx.pd = ibv_alloc_pd(ib->ctx.id->verbs);
+	if (!ib->ctx.pd)
+		error("Could not allocate protection domain in node %s", node_name(n));
+
+	debug(LOG_IB | 3, "Allocated Protection Domain");
+
+
 	// Several events should occur on the event channel, to make
 	// sure the nodes are succesfully connected.
 	debug(LOG_IB | 1, "Starting to monitor events on rdma_cm_id");
 
 	//Create thread to monitor rdma_cm_event channel
-	ret = pthread_create(&ib->conn.stop_thread, NULL, ib_disconnect_thread, n);
+	ret = pthread_create(&ib->conn.rdma_cm_event_thread, NULL, ib_rdma_cm_event_thread, n);
 	if (ret)
-		error("Failed to create thread to monitor disconnects in node %s: %s",
+		error("Failed to create thread to monitor rdma_cm events in node %s: %s",
 			node_name(n), gai_strerror(ret));
 
-
-	//Wait until rdma_cm_event thread sets state to STARTED
-	while ( !(n->state == STATE_CONNECTED));
-
 	return 0;
 }
 
 int ib_stop(struct node *n)
 {
 	struct infiniband *ib = (struct infiniband *) n->_vd;
-	struct rdma_cm_event *event = NULL;
 	int ret;
 
 	// Call RDMA disconnect function
@@ -717,18 +714,6 @@ int ib_stop(struct node *n)
 
 	debug(LOG_IB | 3, "Called rdma_disconnect");
 
-	// If disconnected event already occured, directly call cleanup function
-	if (ib->conn.rdma_disconnect_called)
-		ib_cleanup(n);
-	else {
-		// Else, wait for event to occur
-		ib->conn.rdma_disconnect_called = 1;
-		rdma_get_cm_event(rdma_event_channel, &event);
-
-		rdma_ack_cm_event(event);
-
-		ib_event(n, event);
-	}
 
 	return 0;
 }
@@ -750,78 +735,81 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
 	struct ibv_recv_wr wr[cnt], *bad_wr = NULL;
     	struct ibv_sge sge[cnt];
 	struct ibv_mr *mr;
-	int ret;
+	int ret = 0;
 
 	debug(LOG_IB | 15, "ib_read is called");
 
-	if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize)	{
-		// Get Memory Region
-		mr = memory_ib_get_mr(smps[0]);
+	if (n->state == STATE_CONNECTED) {
 
-		for (int i = 0; i < cnt; i++) {
-			// Increase refcnt of sample
-			sample_get(smps[i]);
+		if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt==n->in.vectorize)	{
+			// Get Memory Region
+			mr = memory_ib_get_mr(smps[0]);
 
-			// Prepare receive Scatter/Gather element
-    			sge[i].addr = (uint64_t) &smps[i]->data;
-    			sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
-    			sge[i].lkey = mr->lkey;
+			for (int i = 0; i < cnt; i++) {
+				// Increase refcnt of sample
+				sample_get(smps[i]);
 
-    			// Prepare a receive Work Request
-    			wr[i].wr_id = (uintptr_t) smps[i];
-			wr[i].next = &wr[i+1];
-    			wr[i].sg_list = &sge[i];
-    			wr[i].num_sge = 1;
+				// Prepare receive Scatter/Gather element
+    				sge[i].addr = (uint64_t) &smps[i]->data;
+    				sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
+    				sge[i].lkey = mr->lkey;
 
-			ib->conn.available_recv_wrs++;
+    				// Prepare a receive Work Request
+    				wr[i].wr_id = (uintptr_t) smps[i];
+				wr[i].next = &wr[i+1];
+    				wr[i].sg_list = &sge[i];
+    				wr[i].num_sge = 1;
 
-			if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) {
-				debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1));
+				ib->conn.available_recv_wrs++;
 
-				wr[i].next = NULL;
-				break;
+				if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) {
+					debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1));
+
+					wr[i].next = NULL;
+					break;
+				}
 			}
+
+    			// Post list of Work Requests
+    			ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr);
+			if (ret)
+				error("Was unable to post receive WR in node %s: %i, bad WR ID: 0x%lx",
+				    node_name(n), ret, bad_wr->wr_id);
+
+			debug(LOG_IB | 10, "Succesfully posted receive Work Requests");
+
 		}
 
-    		// Post list of Work Requests
-    		ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr);
-		if (ret)
-			error("Was unable to post receive WR in node %s: %i, bad WR ID: 0x%lx",
-			    node_name(n), ret, bad_wr->wr_id);
+		// Poll Completion Queue
+		ret = ibv_poll_cq(ib->ctx.recv_cq, n->in.vectorize, wc);
 
-		debug(LOG_IB | 10, "Succesfully posted receive Work Requests");
+		if (ret) {
+			debug(LOG_IB | 10, "Received %i Work Completions", ret);
 
-	}
+			ib->conn.available_recv_wrs -= ret;
 
-	// Poll Completion Queue
-	ret = ibv_poll_cq(ib->ctx.recv_cq, n->in.vectorize, wc);
+			for (int i = 0; i < ret; i++) {
+				if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
+					debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_read. Ignore it.");
 
-	if (ret) {
-		debug(LOG_IB | 10, "Received %i Work Completions", ret);
+					ret = 0;
+				}
+				else if (wc[i].status != IBV_WC_SUCCESS) {
+					warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
+						node_name(n), wc[i].status);
+					ret = 0;
+				}
+				else if (wc[i].opcode & IBV_WC_RECV) {
+					smps[i] = (struct sample*)(wc[i].wr_id);
+					smps[i]->length = wc[i].byte_len/sizeof(double);
+				}
+				else
+					ret = 0;
 
-		ib->conn.available_recv_wrs -= ret;
-
-		for (int i = 0; i < ret; i++) {
-			if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
-				debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_read. Ignore it.");
-
-				ret = 0;
+				//Release sample
+				sample_put((struct sample *) (wc[i].wr_id));
+				debug(LOG_IB | 10, "Releasing sample %p", (struct sample *) (wc[i].wr_id));
 			}
-			else if (wc[i].status != IBV_WC_SUCCESS) {
-				warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
-					node_name(n), wc[i].status);
-				ret = 0;
-			}
-			else if (wc[i].opcode & IBV_WC_RECV) {
-				smps[i] = (struct sample*)(wc[i].wr_id);
-				smps[i]->length = wc[i].byte_len/sizeof(double);
-			}
-			else
-				ret = 0;
-
-			//Release sample
-			sample_put((struct sample *) (wc[i].wr_id));
-			debug(LOG_IB | 10, "Releasing sample %p", (struct sample *) (wc[i].wr_id));
 		}
 	}
 
@@ -838,53 +826,55 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt)
 
 	debug(LOG_IB | 10, "ib_write is called");
 
-	memset(&wr, 0, sizeof(wr));
+	if (n->state == STATE_CONNECTED) {
+		memset(&wr, 0, sizeof(wr));
 
-	//ToDo: Place this into configuration and create checks if settings are valid
-	int send_inline = 1;
+		//ToDo: Place this into configuration and create checks if settings are valid
+		int send_inline = 1;
 
-	debug(LOG_IB | 10, "Data will be send inline [0/1]: %i", send_inline);
+		debug(LOG_IB | 10, "Data will be send inline [0/1]: %i", send_inline);
 
-	// Get Memory Region
-	mr = memory_ib_get_mr(smps[0]);
+		// Get Memory Region
+		mr = memory_ib_get_mr(smps[0]);
 
-	for (int i = 0; i < cnt; i++) {
-		// Increase refcnt of sample
-		sample_get(smps[i]);
+		for (int i = 0; i < cnt; i++) {
+			// Increase refcnt of sample
+			sample_get(smps[i]);
 
-		//Set Scatter/Gather element to data of sample
-		sge[i].addr = (uint64_t)&smps[i]->data;
-		sge[i].length = smps[i]->length*sizeof(double);
-		sge[i].lkey = mr->lkey;
+			//Set Scatter/Gather element to data of sample
+			sge[i].addr = (uint64_t)&smps[i]->data;
+			sge[i].length = smps[i]->length*sizeof(double);
+			sge[i].lkey = mr->lkey;
 
-		// Set Send Work Request
-		wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC
-		wr[i].sg_list = &sge[i];
-		wr[i].num_sge = 1;
+			// Set Send Work Request
+			wr[i].wr_id = (uintptr_t)smps[i]; //This way the sample can be release in WC
+			wr[i].sg_list = &sge[i];
+			wr[i].num_sge = 1;
 
-		if (i == (cnt-1)) {
-			debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1));
-			wr[i].next = NULL;
+			if (i == (cnt-1)) {
+				debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1));
+				wr[i].next = NULL;
+			}
+			else
+				wr[i].next = &wr[i+1];
+
+			wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline << 3);
+			wr[i].imm_data = htonl(0); //ToDo: set this to a useful value
+			wr[i].opcode = IBV_WR_SEND_WITH_IMM;
 		}
-		else
-			wr[i].next = &wr[i+1];
 
-		wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline << 3);
-		wr[i].imm_data = htonl(0); //ToDo: set this to a useful value
-		wr[i].opcode = IBV_WR_SEND_WITH_IMM;
+		//Send linked list of Work Requests
+		ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr);
+		if (ret) {
+			error("Failed to send message in node %s: %i, bad WR ID: 0x%lx",
+				node_name(n), ret, bad_wr->wr_id);
+
+			return -ret;
+		}
+
+		debug(LOG_IB | 4, "Succesfully posted receive Work Requests");
 	}
 
-	//Send linked list of Work Requests
-	ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr);
-	if (ret) {
-		error("Failed to send message in node %s: %i, bad WR ID: 0x%lx",
-			node_name(n), ret, bad_wr->wr_id);
-
-		return -ret;
-	}
-
-	debug(LOG_IB | 4, "Succesfully posted receive Work Requests");
-
 	return cnt;
 }
 

From e16644b0e907ea8650ee474b4590c9832e769dba Mon Sep 17 00:00:00 2001
From: Dennis Potter <dennis@dennispotter.eu>
Date: Sat, 7 Jul 2018 12:56:08 +0200
Subject: [PATCH 3/7] Added some error handling in IB memory type

---
 lib/memory/ib.c | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/lib/memory/ib.c b/lib/memory/ib.c
index 6cf0dbc66..5943fd8e9 100644
--- a/lib/memory/ib.c
+++ b/lib/memory/ib.c
@@ -55,6 +55,9 @@ static struct memory_allocation * memory_ib_alloc(struct memory_type *m, size_t
 	ma->parent = mi->parent->alloc(mi->parent, len + sizeof(struct ibv_mr *), alignment);
 	ma->address = ma->parent->address;
 
+	if(!mi->pd)
+		error("Protection domain is not registered!");
+
 	ma->ib.mr = ibv_reg_mr(mi->pd, ma->address, ma->length, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
 	if(!ma->ib.mr) {
 		mi->parent->free(mi->parent, ma->parent);

From 80da4801e15f8482f30c3c225ef1eba5a77ec79d Mon Sep 17 00:00:00 2001
From: Dennis Potter <dennis@dennispotter.eu>
Date: Sat, 7 Jul 2018 13:08:08 +0200
Subject: [PATCH 4/7] Source and target successfully connect and node changes
 status from STATE_STARTED to STATE_CONNECTED in this commit. Next step will
 be to fix ib_stop and ib_disconnect to make the target able to accept new
 connections.

---
 lib/nodes/infiniband.c | 22 ++++++++--------------
 1 file changed, 8 insertions(+), 14 deletions(-)

diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c
index 18736b088..6544ae0d9 100644
--- a/lib/nodes/infiniband.c
+++ b/lib/nodes/infiniband.c
@@ -542,22 +542,15 @@ void * ib_rdma_cm_event_thread(void *n)
 	int ret = 0;
 
 
-	//debug(LOG_IB | 1, "Started rdma_cm_event thread of node %s", node_name(node));
-
-	info("EVENT: %p", ib->ctx.ec);
-	info("FD, i: %i", ib->ctx.ec->fd);
+	debug(LOG_IB | 1, "Started rdma_cm_event thread of node %s", node_name(node));
 
 	// Wait until node is completely started
-	//while (node->state != STATE_STARTED);
+	while (node->state != STATE_STARTED);
 
+	// Monitor event channel
 	while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) {
-		struct rdma_cm_event event_copy;
 
-		//ToDo: check if copy is really necessary
-		memcpy(&event_copy, event, sizeof(*event));
-		rdma_ack_cm_event(event);
-
-		switch(event_copy.event) {
+		switch(event->event) {
 			case RDMA_CM_EVENT_ADDR_RESOLVED:
 				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_RESOLVED");
 
@@ -585,7 +578,7 @@ void * ib_rdma_cm_event_thread(void *n)
 			case RDMA_CM_EVENT_CONNECT_REQUEST:
 				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST");
 
-				ret = ib_connect_request(n, event_copy.id);
+				ret = ib_connect_request(n, event->id);
 				break;
 
 			case RDMA_CM_EVENT_CONNECT_ERROR:
@@ -602,7 +595,7 @@ void * ib_rdma_cm_event_thread(void *n)
 			case RDMA_CM_EVENT_ESTABLISHED:
 				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ESTABLISHED");
 
-	//			node->state = STATE_CONNECTED;
+				node->state = STATE_CONNECTED;
 
 				info("Connection established in node %s", node_name(n));
 				break;
@@ -614,9 +607,10 @@ void * ib_rdma_cm_event_thread(void *n)
 				break;
 
 			default:
-				error("Unknown event occurred: %u", event_copy.event);
+				error("Unknown event occurred: %u", event->event);
 		}
 
+		rdma_ack_cm_event(event);
 
 		if (ret) //ToDo: Fix me
 			break;

From 836adee4d6ee9cd3477b85052ee6f6d9d3598c44 Mon Sep 17 00:00:00 2001
From: Dennis Potter <dennis@dennispotter.eu>
Date: Sat, 7 Jul 2018 14:36:23 +0200
Subject: [PATCH 5/7] Node is able to clean everything up and reconnect. Node
 can abort if it is in STARTED and in CONNECTED state

---
 include/villas/nodes/infiniband.h |   4 +-
 lib/nodes/infiniband.c            | 111 +++++++++++++++---------------
 2 files changed, 59 insertions(+), 56 deletions(-)

diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h
index 449d8c88c..5f5727ea0 100644
--- a/include/villas/nodes/infiniband.h
+++ b/include/villas/nodes/infiniband.h
@@ -77,9 +77,11 @@ struct infiniband {
 		/* Poll thread */
 		pthread_t cq_poller_thread;
 
-		int stopThread;
+		int stopThreads;
 	} poll;
 
+	int stopThreads;
+
 	/* Connection specific variables */
 	struct connection_s {
 		struct addrinfo *src_addr;
diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c
index 6544ae0d9..2c326eacb 100644
--- a/lib/nodes/infiniband.c
+++ b/lib/nodes/infiniband.c
@@ -22,7 +22,6 @@
 
 #include <string.h>
 #include <math.h>
-#include <unistd.h> //ToDo: remove me.
 
 #include <villas/nodes/infiniband.h>
 #include <villas/plugin.h>
@@ -33,7 +32,7 @@
 #include <villas/memory.h>
 #include <villas/memory/ib.h>
 
-int ib_cleanup(struct node *n)
+int ib_disconnect(struct node *n)
 {
 	struct infiniband *ib = (struct infiniband *) n->_vd;
 	debug(LOG_IB | 1, "Starting to clean up");
@@ -53,13 +52,8 @@ int ib_cleanup(struct node *n)
 	pool_destroy(&ib->mem.p_send);
 	debug(LOG_IB | 3, "Destroyed memory pools");
 
-	// Destroy RDMA CM ID
-	rdma_destroy_id(ib->ctx.id);
-	debug(LOG_IB | 3, "Destroyed rdma_cm_id");
-
-	// Destroy event channel
-	rdma_destroy_event_channel(ib->ctx.ec);
-	debug(LOG_IB | 3, "Destroyed event channel");
+	// Set available receive work requests to zero
+	ib->conn.available_recv_wrs = 0;
 
 	return 0;
 }
@@ -97,9 +91,9 @@ void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
 	for (int i = 0; i < *size; i++)	{
 		//On disconnect, the QP set to error state and will be flushed
 		if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
-			debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_completion_source. Stopping thread.");
+			debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_completion_source). Stopping thread.");
 
-			ib->poll.stopThread = 1;
+			ib->poll.stopThreads = 1;
 			return;
 		}
 
@@ -146,7 +140,7 @@ void * ib_busy_poll_thread(void *n)
 		while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc)))
 			ib->poll.on_compl(n, wc, &size);
 
-		if (ib->poll.stopThread)
+		if (ib->poll.stopThreads)
 			return NULL;
 	}
 }
@@ -169,21 +163,13 @@ static void ib_init_wc_poll(struct node *n)
 	}
 
 	// Create completion queues and bind to channel (or NULL)
-	ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs,
-					ib->cq_size,
-					NULL,
-					NULL,
-					0);
+	ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, NULL, 0);
 	if (!ib->ctx.recv_cq)
 		error("Could not create receive completion queue in node %s", node_name(n));
 
 	debug(LOG_IB | 3, "Created receive Completion Queue");
 
-	ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs,
-					ib->cq_size,
-					NULL,
-					ib->ctx.comp_channel,
-					0);
+	ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, ib->ctx.comp_channel, 0);
 	if (!ib->ctx.send_cq)
 		error("Could not create send completion queue in node %s", node_name(n));
 
@@ -237,10 +223,7 @@ static void ib_build_ibv(struct node *n)
 	ib->mem.p_recv.queue.state = STATE_DESTROYED;
 
 	// Set pool size to maximum size of Receive Queue
-	pool_init(&ib->mem.p_recv,
-		ib->qp_init.cap.max_recv_wr,
-		SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN),
-		&memory_type_heap);
+	pool_init(&ib->mem.p_recv, ib->qp_init.cap.max_recv_wr, SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN), &memory_type_heap);
 	if (ret)
 		error("Failed to init recv memory pool of node %s: %s",
 			node_name(n), gai_strerror(ret));
@@ -252,8 +235,7 @@ static void ib_build_ibv(struct node *n)
 
 	// Register memory for IB Device. Not necessary if data is send
 	// exclusively inline
-	ib->mem.mr_recv = ibv_reg_mr(
-				ib->ctx.pd,
+	ib->mem.mr_recv = ibv_reg_mr(ib->ctx.pd,
 				(char*)&ib->mem.p_recv+ib->mem.p_recv.buffer_off,
 				ib->mem.p_recv.len,
 				IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
@@ -268,29 +250,22 @@ static void ib_build_ibv(struct node *n)
 		ib->mem.p_send.queue.state = STATE_DESTROYED;
 
 		// Set pool size to maximum size of Receive Queue
-		pool_init(&ib->mem.p_send,
-			ib->qp_init.cap.max_send_wr,
-			sizeof(double),
-			&memory_type_heap);
+		pool_init(&ib->mem.p_send, ib->qp_init.cap.max_send_wr,	sizeof(double),	&memory_type_heap);
 		if (ret)
-			error("Failed to init send memory of node %s: %s",
-				node_name(n), gai_strerror(ret));
+			error("Failed to init send memory of node %s: %s", node_name(n), gai_strerror(ret));
 
-		debug(LOG_IB | 3, "Created internal send pool with %i elements",
-				ib->qp_init.cap.max_recv_wr);
+		debug(LOG_IB | 3, "Created internal send pool with %i elements", ib->qp_init.cap.max_recv_wr);
 
 		//ToDo: initialize r_addr_key struct if mode is RDMA
 
 		// Register memory for IB Device. Not necessary if data is send
 		// exclusively inline
-		ib->mem.mr_send = ibv_reg_mr(
-					ib->ctx.pd,
+		ib->mem.mr_send = ibv_reg_mr(ib->ctx.pd,
 					(char*)&ib->mem.p_send+ib->mem.p_send.buffer_off,
 					ib->mem.p_send.len,
 					IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
 		if (!ib->mem.mr_send)
-			error("Failed to register mr_send with ibv_reg_mr of node %s",
-				node_name(n));
+			error("Failed to register mr_send with ibv_reg_mr of node %s", node_name(n));
 
 		debug(LOG_IB | 3, "Registered send pool with ibv_reg_mr");
 	}
@@ -463,7 +438,7 @@ int ib_parse(struct node *n, json_t *cfg)
 
 	//Check if node is a source and connect to target
 	if (remote) {
-		debug(LOG_IB | 3, "Node %s is set up to be able to send data (source and target)", node_name(n));
+		debug(LOG_IB | 3, "Node %s is up as source and target", node_name(n));
 
 		ib->is_source = 1;
 
@@ -476,13 +451,13 @@ int ib_parse(struct node *n, json_t *cfg)
 			error("Failed to resolve remote address '%s' of node %s: %s",
 				remote, node_name(n), gai_strerror(ret));
 
-		debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo in node %s", ip_adr, port, node_name(n));
+		debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo", ip_adr, port);
 
 		// Set correct Work Completion function
 		ib->poll.on_compl = ib_completion_source;
 	}
 	else {
-		debug(LOG_IB | 3, "Node %s is set up to be able to only receive data (target)", node_name(n));
+		debug(LOG_IB | 3, "Node %s is set up as target", node_name(n));
 
 		ib->is_source = 0;
 
@@ -504,11 +479,11 @@ int ib_check(struct node *n)
 	int max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr)));
 
 	if (ib->qp_init.cap.max_send_wr != max_send_pow)
-		warn("Max nr. of send WRs (%i) is not a power of 2! The HCA will change it to the next power of 2: %i",
+		warn("Max nr. of send WRs (%i) is not a power of 2! It will be changed to a power of 2: %i",
 			ib->qp_init.cap.max_send_wr, max_send_pow);
 
 	if (ib->qp_init.cap.max_recv_wr != max_recv_pow)
-		warn("Max nr. of recv WRs (%i) is not a power of 2! The HCA will change it to the next power of 2: %i",
+		warn("Max nr. of recv WRs (%i) is not a power of 2! It will be changed to a power of 2: %i",
 			ib->qp_init.cap.max_recv_wr, max_recv_pow);
 
 
@@ -603,7 +578,12 @@ void * ib_rdma_cm_event_thread(void *n)
 			case RDMA_CM_EVENT_DISCONNECTED:
 				debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED");
 
-				ret = ib_cleanup(n);
+				node->state = STATE_STARTED;
+				ret = ib_disconnect(n);
+
+				break;
+
+			case RDMA_CM_EVENT_TIMEWAIT_EXIT:
 				break;
 
 			default:
@@ -612,7 +592,7 @@ void * ib_rdma_cm_event_thread(void *n)
 
 		rdma_ack_cm_event(event);
 
-		if (ret) //ToDo: Fix me
+		if (ret || ib->stopThreads)
 			break;
 	}
 
@@ -635,8 +615,7 @@ int ib_start(struct node *n)
 
 	ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space);
 	if (ret)
-		error("Failed to create rdma_cm_id of node %s: %s",
-			node_name(n), gai_strerror(ret));
+		error("Failed to create rdma_cm_id of node %s: %s", node_name(n), gai_strerror(ret));
 
 	debug(LOG_IB | 3, "Created rdma_cm_id");
 
@@ -650,10 +629,7 @@ int ib_start(struct node *n)
 
 	if (ib->is_source) {
 		// Resolve address
-		ret = rdma_resolve_addr(ib->ctx.id,
-					NULL,
-					ib->conn.dst_addr->ai_addr,
-					ib->conn.timeout);
+		ret = rdma_resolve_addr(ib->ctx.id, NULL, ib->conn.dst_addr->ai_addr, ib->conn.timeout);
 		if (ret)
 			error("Failed to resolve remote address after %ims of node %s: %s",
 				ib->conn.timeout, node_name(n), gai_strerror(ret));
@@ -698,16 +674,41 @@ int ib_stop(struct node *n)
 	struct infiniband *ib = (struct infiniband *) n->_vd;
 	int ret;
 
+	debug(LOG_IB | 1, "Called ib_stop");
+
+	ib->stopThreads = 1;
+
 	// Call RDMA disconnect function
 	// Will flush all outstanding WRs to the Completion Queue and
 	// will call RDMA_CM_EVENT_DISCONNECTED if that is done.
 	ret = rdma_disconnect(ib->ctx.id);
 	if (ret)
 		error("Error while calling rdma_disconnect in node %s: %s",
-		    node_name(n), gai_strerror(ret));
+			node_name(n), gai_strerror(ret));
 
 	debug(LOG_IB | 3, "Called rdma_disconnect");
+	info("Disconnecting... Please give me a few seconds.");
 
+	// Wait for event thread to join
+	ret = pthread_join(ib->conn.rdma_cm_event_thread, NULL);
+	if (ret)
+		error("Error while joining rdma_cm_event_thread in node %s: %i", node_name(n), ret);
+
+	debug(LOG_IB | 3, "Joined rdma_cm_event_thread");
+
+	// Destroy RDMA CM ID
+	rdma_destroy_id(ib->ctx.id);
+	debug(LOG_IB | 3, "Destroyed rdma_cm_id");
+
+	// Dealloc Protection Domain
+	ibv_dealloc_pd(ib->ctx.pd);
+	debug(LOG_IB | 3, "Destroyed protection domain");
+
+	// Destroy event channel
+	rdma_destroy_event_channel(ib->ctx.ec);
+	debug(LOG_IB | 3, "Destroyed event channel");
+
+	info("Successfully stopped %s", node_name(n));
 
 	return 0;
 }
@@ -784,7 +785,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt)
 
 			for (int i = 0; i < ret; i++) {
 				if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
-					debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_read. Ignore it.");
+					debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it.");
 
 					ret = 0;
 				}

From 1bdd0a9e3455c2d0197fc0d45d77ace5551af7ac Mon Sep 17 00:00:00 2001
From: Dennis Potter <dennis@dennispotter.eu>
Date: Sat, 7 Jul 2018 15:24:48 +0200
Subject: [PATCH 6/7] node_stop can be called if node is STARTED or CONNECTED

---
 lib/node.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/node.c b/lib/node.c
index 5580bc399..c7d93b84b 100644
--- a/lib/node.c
+++ b/lib/node.c
@@ -351,7 +351,7 @@ int node_stop(struct node *n)
 {
 	int ret;
 
-	if (n->state != STATE_STARTED)
+	if (n->state != STATE_STARTED && n->state != STATE_CONNECTED)
 		return 0;
 
 	info("Stopping node %s", node_name(n));

From 06e7434d6c23f102b6b37d8c842809d05f10aeec Mon Sep 17 00:00:00 2001
From: Dennis Potter <dennis@dennispotter.eu>
Date: Sat, 7 Jul 2018 15:34:07 +0200
Subject: [PATCH 7/7] Solved some state problems. This commit also solves #154,
 which was caused by a non-terminated thread. (This thread will be removed in
 a later commit anyway

---
 include/villas/nodes/infiniband.h |  2 --
 lib/nodes/infiniband.c            | 41 ++++++++++++++++++-------------
 2 files changed, 24 insertions(+), 19 deletions(-)

diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h
index 5f5727ea0..cf2cb286d 100644
--- a/include/villas/nodes/infiniband.h
+++ b/include/villas/nodes/infiniband.h
@@ -76,8 +76,6 @@ struct infiniband {
 
 		/* Poll thread */
 		pthread_t cq_poller_thread;
-
-		int stopThreads;
 	} poll;
 
 	int stopThreads;
diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c
index 2c326eacb..5b103b71e 100644
--- a/lib/nodes/infiniband.c
+++ b/lib/nodes/infiniband.c
@@ -37,6 +37,8 @@ int ib_disconnect(struct node *n)
 	struct infiniband *ib = (struct infiniband *) n->_vd;
 	debug(LOG_IB | 1, "Starting to clean up");
 
+	rdma_disconnect(ib->ctx.id);
+
 	// Destroy QP
 	rdma_destroy_qp(ib->ctx.id);
 	debug(LOG_IB | 3, "Destroyed QP");
@@ -86,17 +88,7 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){}
 
 void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size)
 {
-	struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd;
-
 	for (int i = 0; i < *size; i++)	{
-		//On disconnect, the QP set to error state and will be flushed
-		if (wc[i].status == IBV_WC_WR_FLUSH_ERR) {
-			debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_completion_source). Stopping thread.");
-
-			ib->poll.stopThreads = 1;
-			return;
-		}
-
 		if (wc[i].status != IBV_WC_SUCCESS)
 			warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
 				node_name(n), wc[i].status);
@@ -140,7 +132,7 @@ void * ib_busy_poll_thread(void *n)
 		while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc)))
 			ib->poll.on_compl(n, wc, &size);
 
-		if (ib->poll.stopThreads)
+		if (ib->stopThreads)
 			return NULL;
 	}
 }
@@ -581,6 +573,8 @@ void * ib_rdma_cm_event_thread(void *n)
 				node->state = STATE_STARTED;
 				ret = ib_disconnect(n);
 
+				info("Host disconnected. Ready to accept new connections.");
+
 				break;
 
 			case RDMA_CM_EVENT_TIMEWAIT_EXIT:
@@ -627,6 +621,12 @@ int ib_start(struct node *n)
 
 	debug(LOG_IB | 3, "Bound rdma_cm_id to Infiniband device");
 
+	// The ID will be overwritten for the target. If the event type is
+	// RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for
+	// that communication.
+	ib->ctx.listen_id = ib->ctx.id;
+
+
 	if (ib->is_source) {
 		// Resolve address
 		ret = rdma_resolve_addr(ib->ctx.id, NULL, ib->conn.dst_addr->ai_addr, ib->conn.timeout);
@@ -635,11 +635,6 @@ int ib_start(struct node *n)
 				ib->conn.timeout, node_name(n), gai_strerror(ret));
 	}
 	else {
-		// The ID will be overwritten for the target. If the event type is
-		// RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for
-		// that communication.
-		ib->ctx.listen_id = ib->ctx.id;
-
 		// Listen on rdma_cm_id for events
 		ret = rdma_listen(ib->ctx.listen_id, 10);
 		if (ret)
@@ -681,7 +676,11 @@ int ib_stop(struct node *n)
 	// Call RDMA disconnect function
 	// Will flush all outstanding WRs to the Completion Queue and
 	// will call RDMA_CM_EVENT_DISCONNECTED if that is done.
-	ret = rdma_disconnect(ib->ctx.id);
+	if(! ib->is_source && n->state == STATE_CONNECTED)
+		ret = rdma_disconnect(ib->ctx.id);
+	else
+		ret = rdma_disconnect(ib->ctx.listen_id);
+
 	if (ret)
 		error("Error while calling rdma_disconnect in node %s: %s",
 			node_name(n), gai_strerror(ret));
@@ -696,6 +695,14 @@ int ib_stop(struct node *n)
 
 	debug(LOG_IB | 3, "Joined rdma_cm_event_thread");
 
+	// Wait for polling thread to join
+	if (ib->is_source) {
+		ret = pthread_join(ib->poll.cq_poller_thread, NULL);
+		if (ret)
+			error("Error while joining cq_poller_thread in node %s: %i", node_name(n), ret);
+	}
+
+
 	// Destroy RDMA CM ID
 	rdma_destroy_id(ib->ctx.id);
 	debug(LOG_IB | 3, "Destroyed rdma_cm_id");