From 3f58359ce40f2aa4e4481a87d650ef19f10de60d Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Mon, 16 Jul 2018 13:41:42 +0200 Subject: [PATCH 1/6] It is a bad idea to have a smaller CQ than recv. Queue --- etc/infiniband.conf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/etc/infiniband.conf b/etc/infiniband.conf index 7668e1c81..df3fba081 100644 --- a/etc/infiniband.conf +++ b/etc/infiniband.conf @@ -14,9 +14,9 @@ nodes = { address = "10.0.0.2:1337", max_wrs = 8192, - cq_size = 2048, + cq_size = 8192, - vectorize = 64, + vectorize = 1, poll_mode = "BUSY", buffer_subtraction = 128, @@ -29,7 +29,7 @@ nodes = { max_wrs = 8192, cq_size = 256, - vectorize = 64, + vectorize = 1, send_inline = 1, max_inline_data = 60, From 0230389593decc8febd74ade17bdb2c14466529f Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Mon, 16 Jul 2018 13:41:53 +0200 Subject: [PATCH 2/6] Node gives back samples to framework at disconnect The node blocks a certain amount of samples to use in its queues. Before this commit, the only moment to release them to the framwork was during ib_read()/ib_write(). But, there were a couple of problems. In the following I will take ib_read() as example, but ib_write() will be analogous. The first problem was: 1. If a QP disconnect, all Work Requests get invalidated and will be "flushed" to a Completion Queue. A possible solution would be, to save them in an intermediate buffer. We could then "exchange" these samples with the framework as soon as the node connects again and ib_read() is called again. So, we would get valid samples from the framwork, post them, and give the "invalidated" samples back. But, there is a second problem: 2. We cannot assume that ib_read() is ever called again after ib_disconnect(). This is for example the case if the disconnect is triggered by ib_stop() and not by an external node that disconnects. This would result in a memory leak, since the samples would never be returned to the framework, although the node is stopped. Because of this second problem, I decided to return all samples with sample_put() in the disconnect function. An additional benefit is that this is more convenient than another buffer to temporarily safe the invalidated samples. --- lib/nodes/infiniband.c | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index ff13b2ebf..aacb3fd4f 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -33,18 +34,42 @@ static int ib_disconnect(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; + struct ibv_wc wc[MAX(ib->recv_cq_size, ib->send_cq_size)]; + int wcs; debug(LOG_IB | 1, "Starting to clean up"); rdma_disconnect(ib->ctx.id); + // Give the Completion Queues a chance to fill after rdma_disconnect + usleep(50000); + + // If there is anything in the Completion Queue, it should be given back to the framework + // Receive Queue + while ((wcs = ibv_poll_cq(ib->ctx.recv_cq, ib->recv_cq_size, wc))) { + ib->conn.available_recv_wrs -= wcs; + + for (int j = 0; j < wcs; j++) + sample_put((struct sample *) (wc[j].wr_id)); + } + + // Send Queue + while ((wcs = ibv_poll_cq(ib->ctx.send_cq, ib->send_cq_size, wc))) + for (int j = 0; j < wcs; j++) + if (wc[j].wr_id > 0) + sample_put((struct sample *) (wc[j].wr_id)); + + // Send Queue Stack + while (ib->conn.send_wc_stack.top != 0) { + ib->conn.send_wc_stack.top--; + sample_put((struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]); + } + + info("WCS: %i", ib->conn.available_recv_wrs); + // Destroy QP rdma_destroy_qp(ib->ctx.id); debug(LOG_IB | 3, "Destroyed QP"); - // Set available receive WRs and stack top to zero - ib->conn.available_recv_wrs = 0; - ib->conn.send_wc_stack.top = 0; - return ib->stopThreads; } @@ -529,8 +554,8 @@ void * ib_rdma_cm_event_thread(void *n) case RDMA_CM_EVENT_DISCONNECTED: debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED"); - node->state = STATE_STARTED; + ret = ib_disconnect(n); if (!ret) @@ -699,6 +724,8 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea // If we've already posted enough receive WRs, try to pull cnt if (ib->conn.available_recv_wrs >= (ib->qp_init.cap.max_recv_wr - ib->conn.buffer_subtraction) ) { while(1) { + if (n->state != STATE_CONNECTED) return 0; + wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc); if (wcs) { debug(LOG_IB | 10, "Received %i Work Completions", wcs); From 3df5d37b15deea41d46e45cdc97e922500479471 Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Mon, 16 Jul 2018 17:10:52 +0200 Subject: [PATCH 3/6] Added warning if not all samples are returned --- lib/nodes/infiniband.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index aacb3fd4f..4a063a017 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -64,7 +64,8 @@ static int ib_disconnect(struct node *n) sample_put((struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]); } - info("WCS: %i", ib->conn.available_recv_wrs); + if (ib->conn.available_recv_wrs != 0) + warn("Was not able to return all samples! %i samples are still blocked ", ib->conn.available_recv_wrs); // Destroy QP rdma_destroy_qp(ib->ctx.id); From 2bee7d24dd8be6227b6566b3d5bcae0929049c8b Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Tue, 17 Jul 2018 11:10:05 +0200 Subject: [PATCH 4/6] Added rdma_event_str() This replaces the manual translation of enumerations in the switch statements. --- lib/nodes/infiniband.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 4a063a017..049e9f030 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -493,16 +493,14 @@ void * ib_rdma_cm_event_thread(void *n) // Monitor event channel while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) { + debug(LOG_IB | 2, "Received communication event: %s", rdma_event_str(event->event)); switch(event->event) { case RDMA_CM_EVENT_ADDR_RESOLVED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_RESOLVED"); - ret = ib_addr_resolved(n); break; case RDMA_CM_EVENT_ADDR_ERROR: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_ERROR"); warn("Address resolution (rdma_resolve_addr) failed!"); ib_continue_as_listen(n, event); @@ -510,13 +508,10 @@ void * ib_rdma_cm_event_thread(void *n) break; case RDMA_CM_EVENT_ROUTE_RESOLVED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_RESOLVED"); - ret = ib_route_resolved(n); break; case RDMA_CM_EVENT_ROUTE_ERROR: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_ERROR"); warn("Route resolution (rdma_resovle_route) failed!"); ib_continue_as_listen(n, event); @@ -524,13 +519,10 @@ void * ib_rdma_cm_event_thread(void *n) break; case RDMA_CM_EVENT_CONNECT_REQUEST: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST"); - ret = ib_connect_request(n, event->id); break; case RDMA_CM_EVENT_CONNECT_ERROR: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_ERROR"); warn("An error has occurred trying to establish a connection!"); ib_continue_as_listen(n, event); @@ -538,7 +530,6 @@ void * ib_rdma_cm_event_thread(void *n) break; case RDMA_CM_EVENT_REJECTED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_REJECTED"); warn("Connection request or response was rejected by the remote end point!"); ib_continue_as_listen(n, event); @@ -546,15 +537,12 @@ void * ib_rdma_cm_event_thread(void *n) break; case RDMA_CM_EVENT_ESTABLISHED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ESTABLISHED"); - node->state = STATE_CONNECTED; info("Connection established in node %s", node_name(n)); break; case RDMA_CM_EVENT_DISCONNECTED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED"); node->state = STATE_STARTED; ret = ib_disconnect(n); From a5068e28ea8dee0e4150dea0ced8ae16557da1bc Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Fri, 20 Jul 2018 23:40:43 +0200 Subject: [PATCH 5/6] Replace sleep by a better check Prior to this commit, we called rdma_disconnect() and waited for a fixed amount of time. This check was kind of arbitrary. Now, we keep polling the receive Completion Queue until ib->conn.available_recv_wrs is zero and all receive samples are thus given back to the framework. --- lib/nodes/infiniband.c | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 4a063a017..aee81da3c 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -40,12 +39,11 @@ static int ib_disconnect(struct node *n) rdma_disconnect(ib->ctx.id); - // Give the Completion Queues a chance to fill after rdma_disconnect - usleep(50000); - // If there is anything in the Completion Queue, it should be given back to the framework // Receive Queue - while ((wcs = ibv_poll_cq(ib->ctx.recv_cq, ib->recv_cq_size, wc))) { + while (ib->conn.available_recv_wrs) { + wcs = ibv_poll_cq(ib->ctx.recv_cq, ib->recv_cq_size, wc); + ib->conn.available_recv_wrs -= wcs; for (int j = 0; j < wcs; j++) @@ -64,9 +62,6 @@ static int ib_disconnect(struct node *n) sample_put((struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]); } - if (ib->conn.available_recv_wrs != 0) - warn("Was not able to return all samples! %i samples are still blocked ", ib->conn.available_recv_wrs); - // Destroy QP rdma_destroy_qp(ib->ctx.id); debug(LOG_IB | 3, "Destroyed QP"); From 8704683bf2d82c8f14e4862397be78aa5ee6f3aa Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Sat, 21 Jul 2018 12:07:43 +0200 Subject: [PATCH 6/6] Replaced send WC stack by queue --- include/villas/nodes/infiniband.h | 5 +--- lib/nodes/infiniband.c | 45 +++++++++++++++++-------------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 7a5058f71..56f591a72 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -67,10 +67,7 @@ struct infiniband { int send_inline; - struct send_wc_stack_s { - uint64_t* array; - unsigned top; - } send_wc_stack; + struct queue send_wc_buffer; int available_recv_wrs; int buffer_subtraction; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index f5c37c777..0723eef8d 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -57,9 +57,14 @@ static int ib_disconnect(struct node *n) sample_put((struct sample *) (wc[j].wr_id)); // Send Queue Stack - while (ib->conn.send_wc_stack.top != 0) { - ib->conn.send_wc_stack.top--; - sample_put((struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]); + + struct sample *smp = NULL; + while (queue_available(&ib->conn.send_wc_buffer)) { + // Because of queue_available, queue_pull should always return. No need + // to double check return of queue_pull. + queue_pull(&ib->conn.send_wc_buffer, (void **) &smp); + + sample_put(smp); } // Destroy QP @@ -580,11 +585,10 @@ int ib_start(struct node *n) // Create rdma_cm_id and bind to device ib_create_bind_id(n); - // Initialize send Work Completion stack - ib->conn.send_wc_stack.top = 0; - ib->conn.send_wc_stack.array = alloc(ib->qp_init.cap.max_recv_wr * sizeof(uint64_t) ); + // Initialize send Work Completion queue + queue_init(&ib->conn.send_wc_buffer, ib->qp_init.cap.max_send_wr, &memory_type_heap); - debug(LOG_IB | 3, "Initialized Work Completion Stack"); + debug(LOG_IB | 3, "Initialized Work Completion Buffer"); // Resolve address or listen to rdma_cm_id if (ib->is_source) { @@ -810,6 +814,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele int send_inline = (sge[sent].length < ib->qp_init.cap.max_inline_data) ? ib->conn.send_inline : 0; + debug(LOG_IB | 10, "Sample will be send inline [0/1]: %i", send_inline); // Set Send Work Request @@ -858,10 +863,10 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele } - debug(LOG_IB | 4, "%i samples will be released", *release); + debug(LOG_IB | 4, "%i samples will be released (before WC)", *release); // Always poll cnt items from Receive Queue. If there is not enough space in - // smps, we temporarily save it on a stack + // smps, we temporarily save it in a queue ret = ibv_poll_cq(ib->ctx.send_cq, cnt, wc); for (int i = 0; i < ret; i++) { @@ -876,26 +881,26 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele (*release)++; } else { - ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top] = wc[i].wr_id; - ib->conn.send_wc_stack.top++; + queue_push(&ib->conn.send_wc_buffer, (struct sample *) (wc[i].wr_id)); + debug(LOG_IB | 10, "Push in send WC Queue: %px", (struct sample *) (wc[i].wr_id)); } } } - - // Check if we still have some space and try to get rid of some addresses on our stack - if (ib->conn.send_wc_stack.top > 0) { + // Check if we still have some space and try to get rid of some addresses in our queue + if (queue_available(&ib->conn.send_wc_buffer)) { int empty_smps = cnt - *release; for (int i = 0; i < empty_smps; i++) { - ib->conn.send_wc_stack.top--; + ret = queue_pull(&ib->conn.send_wc_buffer, (void **) &smps[*release]); + debug(LOG_IB | 10, "Pull from send WC Queue: %px", (struct sample *) smps[*release]); - smps[*release] = (struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]; - - (*release)++; - - if(ib->conn.send_wc_stack.top == 0) break; + if (ret) + (*release)++; + else + break; } } + debug(LOG_IB | 4, "%i samples will be released (after WC)", *release); } return sent;