diff --git a/etc/infiniband.conf b/etc/infiniband.conf index 1ca45de10..8cfb2fb58 100644 --- a/etc/infiniband.conf +++ b/etc/infiniband.conf @@ -13,7 +13,7 @@ nodes = { address = "10.0.0.2:1337", max_wrs = 8192, - cq_size = 2048, + cq_size = 8192, vectorize = 1, diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 1ddd85eb3..03a2ae7a3 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -80,11 +80,7 @@ struct infiniband { /* Bool, should data be send inline if possible? */ int send_inline; - /* Stack to temporarily save sent sample */ - struct send_wc_stack_s { - uint64_t* array; - unsigned top; - } send_wc_stack; + struct queue send_wc_buffer; /* Counter to keep track of available recv. WRs */ int available_recv_wrs; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index b27bc3e0a..255e9bcf3 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -33,18 +33,44 @@ static int ib_disconnect(struct node *n) { struct infiniband *ib = (struct infiniband *) n->_vd; + struct ibv_wc wc[MAX(ib->recv_cq_size, ib->send_cq_size)]; + int wcs; debug(LOG_IB | 1, "Starting to clean up"); rdma_disconnect(ib->ctx.id); + // If there is anything in the Completion Queue, it should be given back to the framework + // Receive Queue + while (ib->conn.available_recv_wrs) { + wcs = ibv_poll_cq(ib->ctx.recv_cq, ib->recv_cq_size, wc); + + ib->conn.available_recv_wrs -= wcs; + + for (int j = 0; j < wcs; j++) + sample_put((struct sample *) (wc[j].wr_id)); + } + + // Send Queue + while ((wcs = ibv_poll_cq(ib->ctx.send_cq, ib->send_cq_size, wc))) + for (int j = 0; j < wcs; j++) + if (wc[j].wr_id > 0) + sample_put((struct sample *) (wc[j].wr_id)); + + // Send Queue Stack + + struct sample *smp = NULL; + while (queue_available(&ib->conn.send_wc_buffer)) { + // Because of queue_available, queue_pull should always return. No need + // to double check return of queue_pull. + queue_pull(&ib->conn.send_wc_buffer, (void **) &smp); + + sample_put(smp); + } + // Destroy QP rdma_destroy_qp(ib->ctx.id); debug(LOG_IB | 3, "Destroyed QP"); - // Set available receive WRs and stack top to zero - ib->conn.available_recv_wrs = 0; - ib->conn.send_wc_stack.top = 0; - return ib->stopThreads; } @@ -462,16 +488,14 @@ void * ib_rdma_cm_event_thread(void *n) // Monitor event channel while (rdma_get_cm_event(ib->ctx.ec, &event) == 0) { + debug(LOG_IB | 2, "Received communication event: %s", rdma_event_str(event->event)); switch(event->event) { case RDMA_CM_EVENT_ADDR_RESOLVED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_RESOLVED"); - ret = ib_addr_resolved(n); break; case RDMA_CM_EVENT_ADDR_ERROR: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ADDR_ERROR"); warn("Address resolution (rdma_resolve_addr) failed!"); ib_continue_as_listen(n, event); @@ -479,13 +503,10 @@ void * ib_rdma_cm_event_thread(void *n) break; case RDMA_CM_EVENT_ROUTE_RESOLVED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_RESOLVED"); - ret = ib_route_resolved(n); break; case RDMA_CM_EVENT_ROUTE_ERROR: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ROUTE_ERROR"); warn("Route resolution (rdma_resovle_route) failed!"); ib_continue_as_listen(n, event); @@ -499,8 +520,6 @@ void * ib_rdma_cm_event_thread(void *n) break; case RDMA_CM_EVENT_CONNECT_REQUEST: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST"); - ret = ib_connect_request(n, event->id); //ToDo: Think about this. In this context, we say that the QP is initialized @@ -510,7 +529,6 @@ void * ib_rdma_cm_event_thread(void *n) break; case RDMA_CM_EVENT_CONNECT_ERROR: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_ERROR"); warn("An error has occurred trying to establish a connection!"); ib_continue_as_listen(n, event); @@ -518,7 +536,6 @@ void * ib_rdma_cm_event_thread(void *n) break; case RDMA_CM_EVENT_REJECTED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_REJECTED"); warn("Connection request or response was rejected by the remote end point!"); ib_continue_as_listen(n, event); @@ -526,8 +543,6 @@ void * ib_rdma_cm_event_thread(void *n) break; case RDMA_CM_EVENT_ESTABLISHED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_ESTABLISHED"); - // If the connection is unreliable connectionless, set appropriate variables if (ib->conn.port_space == RDMA_PS_UDP) ib->conn.ud = event->param.ud; @@ -535,12 +550,12 @@ void * ib_rdma_cm_event_thread(void *n) node->state = STATE_CONNECTED; info("Connection established in node %s", node_name(n)); + break; case RDMA_CM_EVENT_DISCONNECTED: - debug(LOG_IB | 2, "Received RDMA_CM_EVENT_DISCONNECTED"); - node->state = STATE_STARTED; + ret = ib_disconnect(n); if (!ret) @@ -581,11 +596,10 @@ int ib_start(struct node *n) // Create rdma_cm_id and bind to device ib_create_bind_id(n); - // Initialize send Work Completion stack - ib->conn.send_wc_stack.top = 0; - ib->conn.send_wc_stack.array = alloc(ib->qp_init.cap.max_recv_wr * sizeof(uint64_t) ); + // Initialize send Work Completion queue + queue_init(&ib->conn.send_wc_buffer, ib->qp_init.cap.max_send_wr, &memory_type_heap); - debug(LOG_IB | 3, "Initialized Work Completion Stack"); + debug(LOG_IB | 3, "Initialized Work Completion Buffer"); // Resolve address or listen to rdma_cm_id if (ib->is_source) { @@ -704,6 +718,8 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea // If we've already posted enough receive WRs, try to pull cnt if (ib->conn.available_recv_wrs >= (ib->qp_init.cap.max_recv_wr - ib->conn.buffer_subtraction) ) { while(1) { + if (n->state != STATE_CONNECTED) return 0; + wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc); if (wcs) { debug(LOG_IB | 10, "Received %i Work Completions", wcs); @@ -826,6 +842,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele int send_inline = (sge[sent].length < ib->qp_init.cap.max_inline_data) ? ib->conn.send_inline : 0; + debug(LOG_IB | 10, "Sample will be send inline [0/1]: %i", send_inline); // Set Send Work Request @@ -874,10 +891,10 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele } - debug(LOG_IB | 4, "%i samples will be released", *release); + debug(LOG_IB | 4, "%i samples will be released (before WC)", *release); // Always poll cnt items from Receive Queue. If there is not enough space in - // smps, we temporarily save it on a stack + // smps, we temporarily save it in a queue ret = ibv_poll_cq(ib->ctx.send_cq, cnt, wc); for (int i = 0; i < ret; i++) { @@ -892,26 +909,26 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele (*release)++; } else { - ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top] = wc[i].wr_id; - ib->conn.send_wc_stack.top++; + queue_push(&ib->conn.send_wc_buffer, (struct sample *) (wc[i].wr_id)); + debug(LOG_IB | 10, "Push in send WC Queue: %px", (struct sample *) (wc[i].wr_id)); } } } - - // Check if we still have some space and try to get rid of some addresses on our stack - if (ib->conn.send_wc_stack.top > 0) { + // Check if we still have some space and try to get rid of some addresses in our queue + if (queue_available(&ib->conn.send_wc_buffer)) { int empty_smps = cnt - *release; for (int i = 0; i < empty_smps; i++) { - ib->conn.send_wc_stack.top--; + ret = queue_pull(&ib->conn.send_wc_buffer, (void **) &smps[*release]); + debug(LOG_IB | 10, "Pull from send WC Queue: %px", (struct sample *) smps[*release]); - smps[*release] = (struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]; - - (*release)++; - - if(ib->conn.send_wc_stack.top == 0) break; + if (ret) + (*release)++; + else + break; } } + debug(LOG_IB | 4, "%i samples will be released (after WC)", *release); } return sent;