diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 67289a4dc..188baf523 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -77,6 +77,12 @@ struct infiniband { pthread_t rdma_cm_event_thread; int available_recv_wrs; + + struct send_wc_stack_s { + uint64_t* array; + unsigned top; + } send_wc_stack; + } conn; /* Memory related variables */ diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 33da1e143..482d9d893 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -371,14 +371,21 @@ int ib_check(struct node *n) int max_send_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_send_wr))); int max_recv_pow = (int) pow(2, ceil(log2(ib->qp_init.cap.max_recv_wr))); - if (ib->qp_init.cap.max_send_wr != max_send_pow) + if (ib->qp_init.cap.max_send_wr != max_send_pow) { warn("Max nr. of send WRs (%i) is not a power of 2! It will be changed to a power of 2: %i", ib->qp_init.cap.max_send_wr, max_send_pow); - if (ib->qp_init.cap.max_recv_wr != max_recv_pow) + // Change it now, because otherwise errors are possible in ib_start(). + ib->qp_init.cap.max_send_wr = max_send_pow; + } + + if (ib->qp_init.cap.max_recv_wr != max_recv_pow) { warn("Max nr. of recv WRs (%i) is not a power of 2! It will be changed to a power of 2: %i", ib->qp_init.cap.max_recv_wr, max_recv_pow); + // Change it now, because otherwise errors are possible in ib_start(). + ib->qp_init.cap.max_recv_wr = max_recv_pow; + } // Check maximum size of max_recv_wr and max_send_wr if (ib->qp_init.cap.max_send_wr > 8192) @@ -526,6 +533,11 @@ int ib_start(struct node *n) // that communication. ib->ctx.listen_id = ib->ctx.id; + // Initialize send Work Completion stack + ib->conn.send_wc_stack.top = 0; + ib->conn.send_wc_stack.array = alloc(ib->qp_init.cap.max_recv_wr * sizeof(uint64_t) ); + + debug(LOG_IB | 3, "Initialized Work Completion Stack"); if (ib->is_source) { // Resolve address @@ -638,8 +650,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea // Poll Completion Queue // If we've already posted enough receive WRs, try to pull cnt - //if (ib->conn.available_recv_wrs > (ib->qp_init.cap.max_recv_wr / 2)) { //ToDo: Make configurable - if (ib->conn.available_recv_wrs > ( ib->qp_init.cap.max_recv_wr - (1024 * n->in.vectorize) )) { //ToDo: Make configurable + if (ib->conn.available_recv_wrs > ( ib->qp_init.cap.max_recv_wr - (1024 * n->in.vectorize) ) ) { //ToDo: Make configurable while(1) { wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc); if (wcs) { @@ -728,7 +739,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele if (n->state == STATE_CONNECTED) { // First, write //ToDo: Place this into configuration and create checks if settings are valid - int send_inline = 0; + int send_inline = 1; debug(LOG_IB | 10, "Data will be send inline [0/1]: %i", send_inline); @@ -792,12 +803,9 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele debug(LOG_IB | 4, "%i samples will be released", *release); - - // Subsequently, check if something is available in completion queue - // Take only as much Work Completions from queue, as are currently available - // in smps[] - ret = ibv_poll_cq(ib->ctx.send_cq, (cnt - (*release)), wc); - //ret = ibv_poll_cq(ib->ctx.send_cq, 200, wc); + // Always poll cnt items from Receive Queue. If there is not enough space in + // smps, we temporarily save it on a stack + ret = ibv_poll_cq(ib->ctx.send_cq, cnt, wc); for (int i = 0; i < ret; i++) { if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR) @@ -806,9 +814,28 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele // Release only samples which were not send inline if (wc[i].wr_id) { - smps[*release] = (struct sample *) (wc[i].wr_id); + if (cnt - *release > 0) { + smps[*release] = (struct sample *) (wc[i].wr_id); + (*release)++; + } + else { + ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top] = wc[i].wr_id; + ib->conn.send_wc_stack.top++; + } + } + } + + // Check if we still have some space and try to get rid of some addresses on our stack + if (ib->conn.send_wc_stack.top > 0) { + int empty_smps = cnt - *release; + for (int i = 0; i < empty_smps; i++) { + ib->conn.send_wc_stack.top--; + + smps[*release] = (struct sample *) ib->conn.send_wc_stack.array[ib->conn.send_wc_stack.top]; (*release)++; + + if(ib->conn.send_wc_stack.top == 0) break; } }