diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 449d8c88c..5f5727ea0 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -77,9 +77,11 @@ struct infiniband { /* Poll thread */ pthread_t cq_poller_thread; - int stopThread; + int stopThreads; } poll; + int stopThreads; + /* Connection specific variables */ struct connection_s { struct addrinfo *src_addr; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 6544ae0d9..2c326eacb 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -22,7 +22,6 @@ #include #include -#include //ToDo: remove me. #include #include @@ -33,7 +32,7 @@ #include #include -int ib_cleanup(struct node *n) +int ib_disconnect(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; debug(LOG_IB | 1, "Starting to clean up"); @@ -53,13 +52,8 @@ int ib_cleanup(struct node *n) pool_destroy(&ib->mem.p_send); debug(LOG_IB | 3, "Destroyed memory pools"); - // Destroy RDMA CM ID - rdma_destroy_id(ib->ctx.id); - debug(LOG_IB | 3, "Destroyed rdma_cm_id"); - - // Destroy event channel - rdma_destroy_event_channel(ib->ctx.ec); - debug(LOG_IB | 3, "Destroyed event channel"); + // Set available receive work requests to zero + ib->conn.available_recv_wrs = 0; return 0; } @@ -97,9 +91,9 @@ void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) for (int i = 0; i < *size; i++) { //On disconnect, the QP set to error state and will be flushed if (wc[i].status == IBV_WC_WR_FLUSH_ERR) { - debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_completion_source. Stopping thread."); + debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_completion_source). Stopping thread."); - ib->poll.stopThread = 1; + ib->poll.stopThreads = 1; return; } @@ -146,7 +140,7 @@ void * ib_busy_poll_thread(void *n) while ((size = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc))) ib->poll.on_compl(n, wc, &size); - if (ib->poll.stopThread) + if (ib->poll.stopThreads) return NULL; } } @@ -169,21 +163,13 @@ static void ib_init_wc_poll(struct node *n) } // Create completion queues and bind to channel (or NULL) - ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, - ib->cq_size, - NULL, - NULL, - 0); + ib->ctx.recv_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, NULL, 0); if (!ib->ctx.recv_cq) error("Could not create receive completion queue in node %s", node_name(n)); debug(LOG_IB | 3, "Created receive Completion Queue"); - ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, - ib->cq_size, - NULL, - ib->ctx.comp_channel, - 0); + ib->ctx.send_cq = ibv_create_cq(ib->ctx.id->verbs, ib->cq_size, NULL, ib->ctx.comp_channel, 0); if (!ib->ctx.send_cq) error("Could not create send completion queue in node %s", node_name(n)); @@ -237,10 +223,7 @@ static void ib_build_ibv(struct node *n) ib->mem.p_recv.queue.state = STATE_DESTROYED; // Set pool size to maximum size of Receive Queue - pool_init(&ib->mem.p_recv, - ib->qp_init.cap.max_recv_wr, - SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN), - &memory_type_heap); + pool_init(&ib->mem.p_recv, ib->qp_init.cap.max_recv_wr, SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN), &memory_type_heap); if (ret) error("Failed to init recv memory pool of node %s: %s", node_name(n), gai_strerror(ret)); @@ -252,8 +235,7 @@ static void ib_build_ibv(struct node *n) // Register memory for IB Device. Not necessary if data is send // exclusively inline - ib->mem.mr_recv = ibv_reg_mr( - ib->ctx.pd, + ib->mem.mr_recv = ibv_reg_mr(ib->ctx.pd, (char*)&ib->mem.p_recv+ib->mem.p_recv.buffer_off, ib->mem.p_recv.len, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); @@ -268,29 +250,22 @@ static void ib_build_ibv(struct node *n) ib->mem.p_send.queue.state = STATE_DESTROYED; // Set pool size to maximum size of Receive Queue - pool_init(&ib->mem.p_send, - ib->qp_init.cap.max_send_wr, - sizeof(double), - &memory_type_heap); + pool_init(&ib->mem.p_send, ib->qp_init.cap.max_send_wr, sizeof(double), &memory_type_heap); if (ret) - error("Failed to init send memory of node %s: %s", - node_name(n), gai_strerror(ret)); + error("Failed to init send memory of node %s: %s", node_name(n), gai_strerror(ret)); - debug(LOG_IB | 3, "Created internal send pool with %i elements", - ib->qp_init.cap.max_recv_wr); + debug(LOG_IB | 3, "Created internal send pool with %i elements", ib->qp_init.cap.max_recv_wr); //ToDo: initialize r_addr_key struct if mode is RDMA // Register memory for IB Device. Not necessary if data is send // exclusively inline - ib->mem.mr_send = ibv_reg_mr( - ib->ctx.pd, + ib->mem.mr_send = ibv_reg_mr(ib->ctx.pd, (char*)&ib->mem.p_send+ib->mem.p_send.buffer_off, ib->mem.p_send.len, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); if (!ib->mem.mr_send) - error("Failed to register mr_send with ibv_reg_mr of node %s", - node_name(n)); + error("Failed to register mr_send with ibv_reg_mr of node %s", node_name(n)); debug(LOG_IB | 3, "Registered send pool with ibv_reg_mr"); } @@ -463,7 +438,7 @@ int ib_parse(struct node *n, json_t *cfg) //Check if node is a source and connect to target if (remote) { - debug(LOG_IB | 3, "Node %s is set up to be able to send data (source and target)", node_name(n)); + debug(LOG_IB | 3, "Node %s is up as source and target", node_name(n)); ib->is_source = 1; @@ -476,13 +451,13 @@ int ib_parse(struct node *n, json_t *cfg) error("Failed to resolve remote address '%s' of node %s: %s", remote, node_name(n), gai_strerror(ret)); - debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo in node %s", ip_adr, port, node_name(n)); + debug(LOG_IB | 4, "Translated %s:%s to a struct addrinfo", ip_adr, port); // Set correct Work Completion function ib->poll.on_compl = ib_completion_source; } else { - debug(LOG_IB | 3, "Node %s is set up to be able to only receive data (target)", node_name(n)); + debug(LOG_IB | 3, "Node %s is set up as target", node_name(n)); ib->is_source = 0; @@ -504,11 +479,11 @@ int ib_check(struct node *n) int max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr))); if (ib->qp_init.cap.max_send_wr != max_send_pow) - warn("Max nr. of send WRs (%i) is not a power of 2! The HCA will change it to the next power of 2: %i", + warn("Max nr. of send WRs (%i) is not a power of 2! It will be changed to a power of 2: %i", ib->qp_init.cap.max_send_wr, max_send_pow); if (ib->qp_init.cap.max_recv_wr != max_recv_pow) - warn("Max nr. of recv WRs (%i) is not a power of 2! The HCA will change it to the next power of 2: %i", + warn("Max nr. of recv WRs (%i) is not a power of 2! It will be changed to a power of 2: %i", ib->qp_init.cap.max_recv_wr, max_recv_pow); @@ -603,7 +578,12 @@ void * ib_rdma_cm_event_thread(void *n) case RDMA_CM_EVENT_DISCONNECTED: debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED"); - ret = ib_cleanup(n); + node->state = STATE_STARTED; + ret = ib_disconnect(n); + + break; + + case RDMA_CM_EVENT_TIMEWAIT_EXIT: break; default: @@ -612,7 +592,7 @@ void * ib_rdma_cm_event_thread(void *n) rdma_ack_cm_event(event); - if (ret) //ToDo: Fix me + if (ret || ib->stopThreads) break; } @@ -635,8 +615,7 @@ int ib_start(struct node *n) ret = rdma_create_id(ib->ctx.ec, &ib->ctx.id, NULL, ib->conn.port_space); if (ret) - error("Failed to create rdma_cm_id of node %s: %s", - node_name(n), gai_strerror(ret)); + error("Failed to create rdma_cm_id of node %s: %s", node_name(n), gai_strerror(ret)); debug(LOG_IB | 3, "Created rdma_cm_id"); @@ -650,10 +629,7 @@ int ib_start(struct node *n) if (ib->is_source) { // Resolve address - ret = rdma_resolve_addr(ib->ctx.id, - NULL, - ib->conn.dst_addr->ai_addr, - ib->conn.timeout); + ret = rdma_resolve_addr(ib->ctx.id, NULL, ib->conn.dst_addr->ai_addr, ib->conn.timeout); if (ret) error("Failed to resolve remote address after %ims of node %s: %s", ib->conn.timeout, node_name(n), gai_strerror(ret)); @@ -698,16 +674,41 @@ int ib_stop(struct node *n) struct infiniband *ib = (struct infiniband *) n->_vd; int ret; + debug(LOG_IB | 1, "Called ib_stop"); + + ib->stopThreads = 1; + // Call RDMA disconnect function // Will flush all outstanding WRs to the Completion Queue and // will call RDMA_CM_EVENT_DISCONNECTED if that is done. ret = rdma_disconnect(ib->ctx.id); if (ret) error("Error while calling rdma_disconnect in node %s: %s", - node_name(n), gai_strerror(ret)); + node_name(n), gai_strerror(ret)); debug(LOG_IB | 3, "Called rdma_disconnect"); + info("Disconnecting... Please give me a few seconds."); + // Wait for event thread to join + ret = pthread_join(ib->conn.rdma_cm_event_thread, NULL); + if (ret) + error("Error while joining rdma_cm_event_thread in node %s: %i", node_name(n), ret); + + debug(LOG_IB | 3, "Joined rdma_cm_event_thread"); + + // Destroy RDMA CM ID + rdma_destroy_id(ib->ctx.id); + debug(LOG_IB | 3, "Destroyed rdma_cm_id"); + + // Dealloc Protection Domain + ibv_dealloc_pd(ib->ctx.pd); + debug(LOG_IB | 3, "Destroyed protection domain"); + + // Destroy event channel + rdma_destroy_event_channel(ib->ctx.ec); + debug(LOG_IB | 3, "Destroyed event channel"); + + info("Successfully stopped %s", node_name(n)); return 0; } @@ -784,7 +785,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) for (int i = 0; i < ret; i++) { if (wc[i].status == IBV_WC_WR_FLUSH_ERR) { - debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR in ib_read. Ignore it."); + debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it."); ret = 0; }