diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 5f5727ea0..cf2cb286d 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -76,8 +76,6 @@ struct infiniband { /* Poll thread */ pthread_t cq_poller_thread; - - int stopThreads; } poll; int stopThreads; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 2c326eacb..5b103b71e 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -37,6 +37,8 @@ int ib_disconnect(struct node *n) struct infiniband *ib = (struct infiniband *) n->_vd; debug(LOG_IB | 1, "Starting to clean up"); + rdma_disconnect(ib->ctx.id); + // Destroy QP rdma_destroy_qp(ib->ctx.id); debug(LOG_IB | 3, "Destroyed QP"); @@ -86,17 +88,7 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){} void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) { - struct infiniband *ib = (struct infiniband *) ((struct node *) n)->_vd; - for (int i = 0; i < *size; i++) { - //On disconnect, the QP set to error state and will be flushed - if (wc[i].status == IBV_WC_WR_FLUSH_ERR) { - debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_completion_source). Stopping thread."); - - ib->poll.stopThreads = 1; - return; - } - if (wc[i].status != IBV_WC_SUCCESS) warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", node_name(n), wc[i].status); @@ -140,7 +132,7 @@ void * ib_busy_poll_thread(void *n) while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) ib->poll.on_compl(n, wc, &size); - if (ib->poll.stopThreads) + if (ib->stopThreads) return NULL; } } @@ -581,6 +573,8 @@ void * ib_rdma_cm_event_thread(void *n) node->state = STATE_STARTED; ret = ib_disconnect(n); + info("Host disconnected. Ready to accept new connections."); + break; case RDMA_CM_EVENT_TIMEWAIT_EXIT: @@ -627,6 +621,12 @@ int ib_start(struct node *n) debug(LOG_IB | 3, "Bound rdma_cm_id to Infiniband device"); + // The ID will be overwritten for the target. If the event type is + // RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for + // that communication. + ib->ctx.listen_id = ib->ctx.id; + + if (ib->is_source) { // Resolve address ret = rdma_resolve_addr(ib->ctx.id, NULL, ib->conn.dst_addr->ai_addr, ib->conn.timeout); @@ -635,11 +635,6 @@ int ib_start(struct node *n) ib->conn.timeout, node_name(n), gai_strerror(ret)); } else { - // The ID will be overwritten for the target. If the event type is - // RDMA_CM_EVENT_CONNECT_REQUEST, >then this references a new id for - // that communication. - ib->ctx.listen_id = ib->ctx.id; - // Listen on rdma_cm_id for events ret = rdma_listen(ib->ctx.listen_id, 10); if (ret) @@ -681,7 +676,11 @@ int ib_stop(struct node *n) // Call RDMA disconnect function // Will flush all outstanding WRs to the Completion Queue and // will call RDMA_CM_EVENT_DISCONNECTED if that is done. - ret = rdma_disconnect(ib->ctx.id); + if(! ib->is_source && n->state == STATE_CONNECTED) + ret = rdma_disconnect(ib->ctx.id); + else + ret = rdma_disconnect(ib->ctx.listen_id); + if (ret) error("Error while calling rdma_disconnect in node %s: %s", node_name(n), gai_strerror(ret)); @@ -696,6 +695,14 @@ int ib_stop(struct node *n) debug(LOG_IB | 3, "Joined rdma_cm_event_thread"); + // Wait for polling thread to join + if (ib->is_source) { + ret = pthread_join(ib->poll.cq_poller_thread, NULL); + if (ret) + error("Error while joining cq_poller_thread in node %s: %i", node_name(n), ret); + } + + // Destroy RDMA CM ID rdma_destroy_id(ib->ctx.id); debug(LOG_IB | 3, "Destroyed rdma_cm_id");