diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 9cff766e5..33da1e143 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -269,6 +269,9 @@ int ib_parse(struct node *n, json_t *cfg) char* ip_adr = strtok(local, ":"); char* port = strtok(NULL, ":"); + //n->in.vectorize = 1024; + //n->out.vectorize = 1024; //ToDo: make configurable + ret = getaddrinfo(ip_adr, port, NULL, &ib->conn.src_addr); if (ret) error("Failed to resolve local address '%s' of node %s: %s", @@ -621,178 +624,197 @@ int ib_deinit() int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct infiniband *ib = (struct infiniband *) n->_vd; - struct ibv_wc wc[n->in.vectorize]; + struct ibv_wc wc[cnt]; struct ibv_recv_wr wr[cnt], *bad_wr = NULL; struct ibv_sge sge[cnt]; struct ibv_mr *mr; - int ret = 0; - int i = 0; //Used for first loop: post receive Work Requests - int j = 0; //Used for second loop: get values from Completion Queue - int k = 0; //Used for third loop: reorder list + int ret = 0, wcs = 0, read_values = 0, max_wr_post; debug(LOG_IB | 15, "ib_read is called"); if (n->state == STATE_CONNECTED) { - if (ib->conn.available_recv_wrs < ib->qp_init.cap.max_recv_wr && cnt) { - // Get Memory Region - mr = memory_ib_get_mr(smps[0]); + max_wr_post = cnt; - for (i = 0; i < cnt; i++) { - // Prepare receive Scatter/Gather element - sge[i].addr = (uint64_t) &smps[i]->data; - sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); - sge[i].lkey = mr->lkey; + // Poll Completion Queue + // If we've already posted enough receive WRs, try to pull cnt + //if (ib->conn.available_recv_wrs > (ib->qp_init.cap.max_recv_wr / 2)) { //ToDo: Make configurable + if (ib->conn.available_recv_wrs > ( ib->qp_init.cap.max_recv_wr - (1024 * n->in.vectorize) )) { //ToDo: Make configurable + while(1) { + wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc); + if (wcs) { + debug(LOG_IB | 10, "Received %i Work Completions", wcs); - // Prepare a receive Work Request - wr[i].wr_id = (uintptr_t) smps[i]; - wr[i].next = &wr[i+1]; - wr[i].sg_list = &sge[i]; - wr[i].num_sge = 1; - - ib->conn.available_recv_wrs++; - - if (ib->conn.available_recv_wrs == ib->qp_init.cap.max_recv_wr || i==(cnt-1)) { - debug(LOG_IB | 10, "Prepared %i new receive Work Requests", (i+1)); - - wr[i].next = NULL; - i++; + read_values = wcs; // Value to return + max_wr_post = wcs; // Make space free in smps[] break; } } - // Post list of Work Requests - ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr); - if (ret) - error("Was unable to post receive WR in node %s: %i, bad WR ID: 0x%lx", - node_name(n), ret, bad_wr->wr_id); - - debug(LOG_IB | 10, "Succesfully posted receive Work Requests"); + // All samples (wcs * received + unposted) should be released. Let + // *release be equal to allocated. + // + // This is set in the framework, before this function was called + } + else { + ib->conn.available_recv_wrs += max_wr_post; + *release = 0; // While we fill the receive queue, we always use all samples } - // Poll Completion Queue - ret = ibv_poll_cq(ib->ctx.recv_cq, n->in.vectorize, wc); + // Get Memory Region + mr = memory_ib_get_mr(smps[0]); - if (ret) { - debug(LOG_IB | 10, "Received %i Work Completions", ret); + for (int i = 0; i < max_wr_post; i++) { + // Prepare receive Scatter/Gather element + sge[i].addr = (uint64_t) &smps[i]->data; + sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); + sge[i].lkey = mr->lkey; - ib->conn.available_recv_wrs -= ret; - - for (j = 0; j < ret; j++) { - if (wc[j].status == IBV_WC_WR_FLUSH_ERR) { - debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it."); - - ret = 0; - } - else if (wc[j].status != IBV_WC_SUCCESS) { - warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", - node_name(n), wc[j].status); - ret = 0; - } - else if (wc[j].opcode & IBV_WC_RECV) { - smps[j] = (struct sample *) (wc[j].wr_id); - smps[j]->length = wc[i].byte_len / sizeof(double); - } - else - ret = 0; - - } + // Prepare a receive Work Request + wr[i].wr_id = (uintptr_t) smps[i]; + wr[i].next = &wr[i+1]; + wr[i].sg_list = &sge[i]; + wr[i].num_sge = 1; } - // Reorder list - // The first part of the list is fine. It's filled with received values. Now append - // unused values. - // * j ==> Values from completion queue - // * i ==> Values posted to receive queue - // * (cnt) ==> Available values to post to receive queue - // - // Thus: - // * (cnt - i) ==> number of unused values - int l; - for (k = j, l = 0; k < j + (cnt - i); k++, l++) { - smps[k] = smps[i + l]; + wr[max_wr_post-1].next = NULL; + + debug(LOG_IB | 5, "Prepared %i new receive Work Requests", max_wr_post); + debug(LOG_IB | 5, "%i receive Work Requests in Receive Queue", ib->conn.available_recv_wrs); + + // Post list of Work Requests + ret = ibv_post_recv(ib->ctx.id->qp, &wr[0], &bad_wr); + + if (ret) + error("Was unable to post receive WR in node %s: %i, bad WR ID: 0x%lx", + node_name(n), ret, bad_wr->wr_id); + + debug(LOG_IB | 10, "Succesfully posted receive Work Requests"); + + // Doesn't start, if wcs == 0 + for (int j = 0; j < wcs; j++) { + if ( !( (wc[j].opcode & IBV_WC_RECV) && wc[j].status == IBV_WC_SUCCESS) ) + read_values--; + + if (wc[j].status == IBV_WC_WR_FLUSH_ERR) + debug(LOG_IB | 5, "Received IBV_WC_WR_FLUSH_ERR (ib_read). Ignore it."); + else if (wc[j].status != IBV_WC_SUCCESS) + warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", + node_name(n), wc[j].status); + + smps[j] = (struct sample *) (wc[j].wr_id); + smps[j]->length = wc[j].byte_len / sizeof(double); } - - - *release = k; } - return ret; + return read_values; } -int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *ready) +int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *release) { struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_send_wr wr[cnt], *bad_wr = NULL; struct ibv_sge sge[cnt]; - struct ibv_wc wc[ib->cq_size]; + struct ibv_wc wc[cnt]; struct ibv_mr *mr; + int ret; - int i = 0; //Used for first loop: prepare work requests to post to send queue - int j = 0; //Used for second loop: get values from Completion Queue + int sent = 0; //Used for first loop: prepare work requests to post to send queue debug(LOG_IB | 10, "ib_write is called"); + *release = 0; + if (n->state == STATE_CONNECTED) { // First, write //ToDo: Place this into configuration and create checks if settings are valid - int send_inline = 1; + int send_inline = 0; debug(LOG_IB | 10, "Data will be send inline [0/1]: %i", send_inline); // Get Memory Region mr = memory_ib_get_mr(smps[0]); - for (i = 0; i < cnt; i++) { - //Set Scatter/Gather element to data of sample - sge[i].addr = (uint64_t) &smps[i]->data; - sge[i].length = smps[i]->length*sizeof(double); - sge[i].lkey = mr->lkey; + for (sent = 0; sent < cnt; sent++) { + // Set Scatter/Gather element to data of sample + sge[sent].addr = (uint64_t) &smps[sent]->data; + sge[sent].length = smps[sent]->length*sizeof(double); + sge[sent].lkey = mr->lkey; // Set Send Work Request - wr[i].wr_id = (uintptr_t) smps[i]; //This way the sample can be release in WC - wr[i].sg_list = &sge[i]; - wr[i].num_sge = 1; + wr[sent].wr_id = send_inline ? 0 : (uintptr_t) smps[sent]; // This way the sample can be release in WC + wr[sent].sg_list = &sge[sent]; + wr[sent].num_sge = 1; - if (i == (cnt-1)) { - debug(LOG_IB | 10, "Prepared %i send Work Requests", (i+1)); - wr[i].next = NULL; + if (sent == (cnt-1)) { + debug(LOG_IB | 10, "Prepared %i send Work Requests", (sent+1)); + wr[sent].next = NULL; } else - wr[i].next = &wr[i+1]; + wr[sent].next = &wr[sent+1]; - wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline << 3); - wr[i].imm_data = htonl(0); //ToDo: set this to a useful value - wr[i].opcode = IBV_WR_SEND_WITH_IMM; + wr[sent].send_flags = IBV_SEND_SIGNALED | (send_inline << 3); + wr[sent].imm_data = htonl(0); //ToDo: set this to a useful value + wr[sent].opcode = IBV_WR_SEND_WITH_IMM; } - //Send linked list of Work Requests + // Send linked list of Work Requests ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr); - if (ret) { - error("Failed to send message in node %s: %i, bad WR ID: 0x%lx", - node_name(n), ret, bad_wr->wr_id); + debug(LOG_IB | 4, "Posted send Work Requests"); + + // Reorder list. Place inline and unposted samples to the top + // m will always be equal or smaller than *release + for (int m = 0; m < cnt; m++) { + // We can't use wr_id as identifier, since it is 0 for inline + // elements + if (ret && (wr[m].sg_list == bad_wr->sg_list)) { + // The remaining work requests will be bad. Ripple through list + // and prepare them to be released + debug(LOG_IB | 4, "Bad WR occured with ID: 0x%lx and S/G address: 0x%px: %i", + bad_wr->wr_id, bad_wr->sg_list, ret); + + while (1) { + smps[*release] = smps[m]; + + (*release)++; // Increment number of samples to be released + sent--; // Decrement the number of succesfully posted elements + + if (++m == cnt) break; + } + } + else if (wr[m].send_flags & IBV_SEND_INLINE) { + smps[*release] = smps[m]; + + (*release)++; + } - return -ret; } - debug(LOG_IB | 4, "Succesfully posted receive Work Requests"); + debug(LOG_IB | 4, "%i samples will be released", *release); // Subsequently, check if something is available in completion queue - ret = ibv_poll_cq(ib->ctx.send_cq, ib->cq_size, wc); + // Take only as much Work Completions from queue, as are currently available + // in smps[] + ret = ibv_poll_cq(ib->ctx.send_cq, (cnt - (*release)), wc); + //ret = ibv_poll_cq(ib->ctx.send_cq, 200, wc); - for (j = 0; j < ret; j++) { - if (wc[j].status != IBV_WC_SUCCESS && wc[j].status != IBV_WC_WR_FLUSH_ERR) + for (int i = 0; i < ret; i++) { + if (wc[i].status != IBV_WC_SUCCESS && wc[i].status != IBV_WC_WR_FLUSH_ERR) warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", - node_name(n), wc[j].status); + node_name(n), wc[i].status); - smps[j] = (struct sample *) (wc[j].wr_id); + // Release only samples which were not send inline + if (wc[i].wr_id) { + smps[*release] = (struct sample *) (wc[i].wr_id); + + (*release)++; + } } - *ready = j; } - return i; + return sent; } int ib_fd(struct node *n)