diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 969382a70..66974bd57 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -84,9 +84,6 @@ struct infiniband { /* Bool, should node have a fallback if it can't connect to a remote host? */ int use_fallback; - /* Buffer, used to temporarily store Work Completions from send queue */ - struct queue send_wc_buffer; - /* Counter to keep track of available recv. WRs */ int available_recv_wrs; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index b97d411fd..2571570d4 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -57,16 +57,6 @@ static int ib_disconnect(struct node *n) if (wc[j].wr_id > 0) sample_decref((struct sample *) (wc[j].wr_id)); - // Send Queue Stack - struct sample *smp = NULL; - while (queue_available(&ib->conn.send_wc_buffer)) { - // Because of queue_available, queue_pull should always return. No need - // to double check return of queue_pull. - queue_pull(&ib->conn.send_wc_buffer, (void **) &smp); - - sample_decref(smp); - } - // Destroy QP rdma_destroy_qp(ib->ctx.id); debug(LOG_IB | 3, "Destroyed QP"); @@ -630,9 +620,6 @@ int ib_start(struct node *n) // Create rdma_cm_id and bind to device ib_create_bind_id(n); - // Initialize send Work Completion queue - queue_init(&ib->conn.send_wc_buffer, ib->qp_init.cap.max_send_wr, &memory_heap); - debug(LOG_IB | 3, "Initialized Work Completion Buffer"); // Resolve address or listen to rdma_cm_id @@ -930,12 +917,12 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele debug(LOG_IB | 10, "Sample will be send inline [0/1]: %i", send_inline); // Set Send Work Request - wr[sent].wr_id = send_inline ? 0 : (uintptr_t) smps[sent]; // This way the sample can be release in WC + wr[sent].wr_id = (uintptr_t) smps[sent]; wr[sent].sg_list = sge[sent]; wr[sent].num_sge = j; wr[sent].next = &wr[sent+1]; - wr[sent].send_flags = IBV_SEND_SIGNALED | (send_inline << 3); + wr[sent].send_flags = send_inline ? IBV_SEND_INLINE : IBV_SEND_SIGNALED; wr[sent].opcode = IBV_WR_SEND; } @@ -976,39 +963,16 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele debug(LOG_IB | 4, "%i samples will be released (before WC)", *release); - // Always poll cnt items from Receive Queue. If there is not enough space in - // smps, we temporarily save it in a queue - ret = ibv_poll_cq(ib->ctx.send_cq, cnt, wc); + // Try to grab as many CQEs from CQ as there is space in *smps[] + ret = ibv_poll_cq(ib->ctx.send_cq, cnt - *release, wc); for (int i = 0; i < ret; i++) { if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR) warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", node_name(n), wc[i].status); - // Release only samples which were not send inline - if (wc[i].wr_id) { - if (cnt - *release > 0) { - smps[*release] = (struct sample *) (wc[i].wr_id); - (*release)++; - } - else { - queue_push(&ib->conn.send_wc_buffer, (struct sample *) (wc[i].wr_id)); - debug(LOG_IB | 10, "Push in send WC Queue: %px", (struct sample *) (wc[i].wr_id)); - } - } - } - // Check if we still have some space and try to get rid of some addresses in our queue - if (queue_available(&ib->conn.send_wc_buffer)) { - int empty_smps = cnt - *release; - for (int i = 0; i < empty_smps; i++) { - ret = queue_pull(&ib->conn.send_wc_buffer, (void **) &smps[*release]); - debug(LOG_IB | 10, "Pull from send WC Queue: %px", (struct sample *) smps[*release]); - - if (ret) - (*release)++; - else - break; - } + smps[*release] = (struct sample *) (wc[i].wr_id); + (*release)++; } debug(LOG_IB | 4, "%i samples will be released (after WC)", *release);