diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 718c5508b..af93cbaf1 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -89,7 +89,7 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size){} void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) { for (int i = 0; i < *size; i++) { - if (wc[i].status != IBV_WC_SUCCESS) + if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR) warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", node_name(n), wc[i].status); @@ -738,6 +738,9 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) struct ibv_sge sge[*cnt]; struct ibv_mr *mr; int ret = 0; + int i = 0; //Used for first loop: post receive Work Requests + int j = 0; //Used for second loop: get values from Completion Queue + int k = 0; //Used for third loop: reorder list debug(LOG_IB | 15, "ib_read is called"); @@ -747,10 +750,7 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for (int i = 0; i < *cnt; i++) { - // Increase refcnt of sample - sample_get(smps[i]); - + for (i = 0; i < *cnt; i++) { // Prepare receive Scatter/Gather element sge[i].addr = (uint64_t) &smps[i]->data; sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); @@ -779,7 +779,6 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) node_name(n), ret, bad_wr->wr_id); debug(LOG_IB | 10, "Succesfully posted receive Work Requests"); - } // Poll Completion Queue @@ -790,31 +789,44 @@ int ib_read(struct node *n, struct sample *smps[], int *cnt) ib->conn.available_recv_wrs -= ret; - for (int i = 0; i < ret; i++) { - if (wc[i].status == IBV_WC_WR_FLUSH_ERR) { + for (j = 0; j < ret; j++) { + if (wc[j].status == IBV_WC_WR_FLUSH_ERR) { debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it."); ret = 0; } - else if (wc[i].status != IBV_WC_SUCCESS) { + else if (wc[j].status != IBV_WC_SUCCESS) { warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", - node_name(n), wc[i].status); + node_name(n), wc[j].status); ret = 0; } - else if (wc[i].opcode & IBV_WC_RECV) { - smps[i] = (struct sample*)(wc[i].wr_id); - smps[i]->length = wc[i].byte_len/sizeof(double); + else if (wc[j].opcode & IBV_WC_RECV) { + smps[j] = (struct sample*)(wc[j].wr_id); + smps[j]->length = wc[i].byte_len/sizeof(double); } else ret = 0; - //Release sample - sample_put((struct sample *) (wc[i].wr_id)); - debug(LOG_IB | 10, "Releasing sample %p", (struct sample *) (wc[i].wr_id)); } } - } + // Reorder list + // The first part of the list is fine. It's filled with received values. Now append + // unused values. + // * j ==> Values from completion queue + // * i ==> Values posted to receive queue + // * (*cnt) ==> Available values to post to receive queue + // + // Thus: + // * (*cnt - i) ==> number of unused values + int l; + for (k = j, l = 0; k < j + (*cnt - i); k++, l++) { + smps[k] = smps[i + l]; + } + + + *cnt = k; + } return ret; }