diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index ff13b2ebf..aacb3fd4f 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -22,6 +22,7 @@ #include #include +#include #include #include @@ -33,18 +34,42 @@ static int ib_disconnect(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; + struct ibv_wc wc[MAX(ib->recv_cq_size, ib->send_cq_size)]; + int wcs; debug(LOG_IB | 1, "Starting to clean up"); rdma_disconnect(ib->ctx.id); + // Give the Completion Queues a chance to fill after rdma_disconnect + usleep(50000); + + // If there is anything in the Completion Queue, it should be given back to the framework + // Receive Queue + while ((wcs = ibv_poll_cq(ib->ctx.recv_cq, ib->recv_cq_size, wc))) { + ib->conn.available_recv_wrs -= wcs; + + for (int j = 0; j < wcs; j++) + sample_put((struct sample *) (wc[j].wr_id)); + } + + // Send Queue + while ((wcs = ibv_poll_cq(ib->ctx.send_cq, ib->send_cq_size, wc))) + for (int j = 0; j < wcs; j++) + if (wc[j].wr_id > 0) + sample_put((struct sample *) (wc[j].wr_id)); + + // Send Queue Stack + while (ib->conn.send_wc_stack.top != 0) { + ib->conn.send_wc_stack.top--; + sample_put((struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]); + } + + info("WCS: %i", ib->conn.available_recv_wrs); + // Destroy QP rdma_destroy_qp(ib->ctx.id); debug(LOG_IB | 3, "Destroyed QP"); - // Set available receive WRs and stack top to zero - ib->conn.available_recv_wrs = 0; - ib->conn.send_wc_stack.top = 0; - return ib->stopThreads; } @@ -529,8 +554,8 @@ void * ib_rdma_cm_event_thread(void *n) case RDMA_CM_EVENT_DISCONNECTED: debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED"); - node->state = STATE_STARTED; + ret = ib_disconnect(n); if (!ret) @@ -699,6 +724,8 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea // If we've already posted enough receive WRs, try to pull cnt if (ib->conn.available_recv_wrs >= (ib->qp_init.cap.max_recv_wr - ib->conn.buffer_subtraction) ) { while(1) { + if (n->state != STATE_CONNECTED) return 0; + wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc); if (wcs) { debug(LOG_IB | 10, "Received %i Work Completions", wcs);