diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 7a5058f71..56f591a72 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -67,10 +67,7 @@ struct infiniband { int send_inline; - struct send_wc_stack_s { - uint64_t* array; - unsigned top; - } send_wc_stack; + struct queue send_wc_buffer; int available_recv_wrs; int buffer_subtraction; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index f5c37c777..0723eef8d 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -57,9 +57,14 @@ static int ib_disconnect(struct node *n) sample_put((struct sample *) (wc[j].wr_id)); // Send Queue Stack - while (ib->conn.send_wc_stack.top != 0) { - ib->conn.send_wc_stack.top--; - sample_put((struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]); + + struct sample *smp = NULL; + while (queue_available(&ib->conn.send_wc_buffer)) { + // Because of queue_available, queue_pull should always return. No need + // to double check return of queue_pull. + queue_pull(&ib->conn.send_wc_buffer, (void **) &smp); + + sample_put(smp); } // Destroy QP @@ -580,11 +585,10 @@ int ib_start(struct node *n) // Create rdma_cm_id and bind to device ib_create_bind_id(n); - // Initialize send Work Completion stack - ib->conn.send_wc_stack.top = 0; - ib->conn.send_wc_stack.array = alloc(ib->qp_init.cap.max_recv_wr * sizeof(uint64_t) ); + // Initialize send Work Completion queue + queue_init(&ib->conn.send_wc_buffer, ib->qp_init.cap.max_send_wr, &memory_type_heap); - debug(LOG_IB | 3, "Initialized Work Completion Stack"); + debug(LOG_IB | 3, "Initialized Work Completion Buffer"); // Resolve address or listen to rdma_cm_id if (ib->is_source) { @@ -810,6 +814,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele int send_inline = (sge[sent].length < ib->qp_init.cap.max_inline_data) ? ib->conn.send_inline : 0; + debug(LOG_IB | 10, "Sample will be send inline [0/1]: %i", send_inline); // Set Send Work Request @@ -858,10 +863,10 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele } - debug(LOG_IB | 4, "%i samples will be released", *release); + debug(LOG_IB | 4, "%i samples will be released (before WC)", *release); // Always poll cnt items from Receive Queue. If there is not enough space in - // smps, we temporarily save it on a stack + // smps, we temporarily save it in a queue ret = ibv_poll_cq(ib->ctx.send_cq, cnt, wc); for (int i = 0; i < ret; i++) { @@ -876,26 +881,26 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele (*release)++; } else { - ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top] = wc[i].wr_id; - ib->conn.send_wc_stack.top++; + queue_push(&ib->conn.send_wc_buffer, (struct sample *) (wc[i].wr_id)); + debug(LOG_IB | 10, "Push in send WC Queue: %px", (struct sample *) (wc[i].wr_id)); } } } - - // Check if we still have some space and try to get rid of some addresses on our stack - if (ib->conn.send_wc_stack.top > 0) { + // Check if we still have some space and try to get rid of some addresses in our queue + if (queue_available(&ib->conn.send_wc_buffer)) { int empty_smps = cnt - *release; for (int i = 0; i < empty_smps; i++) { - ib->conn.send_wc_stack.top--; + ret = queue_pull(&ib->conn.send_wc_buffer, (void **) &smps[*release]); + debug(LOG_IB | 10, "Pull from send WC Queue: %px", (struct sample *) smps[*release]); - smps[*release] = (struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]; - - (*release)++; - - if(ib->conn.send_wc_stack.top == 0) break; + if (ret) + (*release)++; + else + break; } } + debug(LOG_IB | 4, "%i samples will be released (after WC)", *release); } return sent;