From 5598f935820a20d99dc901fcaf4cc97f81eceb2a Mon Sep 17 00:00:00 2001 From: Dennis Potter Date: Thu, 28 Jun 2018 12:46:16 +0200 Subject: [PATCH] Implemented disconnect function on source and target side. The cleanup function doesn't go through completely yet, probably because rdma_destroy_id blocks because not everything in the rdma_cm_id is destroyed yet. --- include/villas/nodes/infiniband.h | 7 +- lib/nodes/infiniband.c | 117 +++++++++++++++++++++++++++--- 2 files changed, 113 insertions(+), 11 deletions(-) diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index cb0034711..ebb443394 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -63,7 +63,6 @@ struct infiniband { struct ibv_cq *cq; struct ibv_comp_channel *comp_channel; } ctx; - /* Work Completion related */ struct poll_s { enum poll_mode_e poll_mode; @@ -76,6 +75,8 @@ struct infiniband { /* Poll thread */ pthread_t cq_poller_thread; + + int stopThread; } poll; /* Connection specific variables */ @@ -86,6 +87,9 @@ struct infiniband { int timeout; struct r_addr_key_s *r_addr_key; + + pthread_t stop_thread; + int rdma_disconnect_called; } conn; /* Memory related variables */ @@ -103,7 +107,6 @@ struct infiniband { /* Misc settings */ int is_source; int cq_size; - }; /** @see node_type::reverse */ diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index ea248fac4..b3faf33e0 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -31,6 +31,31 @@ #include +int ib_cleanup(struct node *n) +{ + struct infiniband *ib = (struct infiniband *) n->_vd; + info("Starting to clean up"); + + // Destroy QP + rdma_destroy_qp(ib->ctx.id); + info("Destroyed QP"); + + // Deregister memory regions + ibv_dereg_mr(ib->mem.mr_recv); + if(ib->is_source) + ibv_dereg_mr(ib->mem.mr_send); + info("Deregistered memory regions"); + + // Destroy pools + pool_destroy(&ib->mem.p_recv); + pool_destroy(&ib->mem.p_send); + info("Destroyed memory pools"); + + rdma_destroy_id(ib->ctx.id); + info("Destroyed rdma_cm_id"); + + return 0; +} int ib_post_recv_wrs(struct node *n) { @@ -63,11 +88,21 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size) void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) { + struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + for(int i=0; i<*size; i++) { + //On disconnect, the QP set to error state and will be flushed + if(wc[i].status == IBV_WC_WR_FLUSH_ERR) + { + ib->poll.stopThread = 1; + return; + } + if(wc[i].status != IBV_WC_SUCCESS) - error("Work Completion status was not IBV_WC_SUCCES in node %s: %s", - node_name(n), gai_strerror(wc[i].status)); + warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", + node_name(n), wc[i].status); + } } @@ -101,13 +136,12 @@ void * ib_busy_poll_thread(void *n) while(1) { - //ToDo: Implement stopThreads variable - if(0) - return NULL; - // Poll as long as WCs are available while((size = ibv_poll_cq(ib->ctx.cq, ib->cq_size, wc))) ib->poll.on_compl(n, wc, &size); + + if(ib->poll.stopThread) + return NULL; } } @@ -351,6 +385,9 @@ static int ib_event(struct node *n, struct rdma_cm_event *event) info("Connection established!"); ret = 1; break; + case RDMA_CM_EVENT_DISCONNECTED: + ret = ib_cleanup(n); + break; default: error("Unknown event occurred: %u", event->event); @@ -448,6 +485,7 @@ int ib_parse(struct node *n, json_t *cfg) } // Set max. send and receive Work Requests + //ToDo: Set hint that max_*_wr can only be a value 1<< ib->qp_init.cap.max_send_wr = max_send_wr; ib->qp_init.cap.max_recv_wr = max_recv_wr; @@ -493,6 +531,23 @@ int ib_destroy(struct node *n) return 0; } +void * ib_stop_thread(void *n) +{ + struct node *node = (struct node *)n; + struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + struct rdma_cm_event *event; + while(rdma_get_cm_event(ib->ctx.ec, &event) == 0) + { + if(event->event == RDMA_CM_EVENT_DISCONNECTED) + { + ib->conn.rdma_disconnect_called = 1; + node_stop(node); + return NULL; + } + } + return NULL; +} + int ib_start(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; @@ -566,11 +621,49 @@ int ib_start(struct node *n) break; } + ret = pthread_create(&ib->conn.stop_thread, NULL, ib_stop_thread, n); + if(ret) + { + error("Failed to create thread to monitor disconnects in node %s: %s", + node_name(n), gai_strerror(ret)); + } + return 0; } int ib_stop(struct node *n) { + struct infiniband *ib = (struct infiniband *) n->_vd; + struct rdma_cm_event *event = NULL; + int ret; + + // Call RDMA disconnect function + // Will flush all outstanding WRs to the Completion Queue and + // will call RDMA_CM_EVENT_DISCONNECTED if that is done. + ret = rdma_disconnect(ib->ctx.id); + if(ret) + { + error("Error while calling rdma_disconnect in node %s: %s", + node_name(n), gai_strerror(ret)); + } + info("Called rdma_disconnect."); + + // If disconnected event already occured, directly call cleanup function + if(ib->conn.rdma_disconnect_called) + { + ib_cleanup(n); + } + // Else, wait for event to occur + else + { + ib->conn.rdma_disconnect_called = 1; + rdma_get_cm_event(ib->ctx.ec, &event); + + ib_event(n, event); + + rdma_ack_cm_event(event); + } + return 0; } @@ -605,10 +698,16 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) for(int i=0; ilength = ret; smps[0]->capacity = cnt;