diff --git a/lib/memory_ib.c b/lib/memory_ib.c index 38e9f0f0d..9a1432aaf 100644 --- a/lib/memory_ib.c +++ b/lib/memory_ib.c @@ -82,14 +82,3 @@ struct memtype * ib_memtype(struct node *n, struct memtype *parent) return mt; } - -/* Ausserhalb von lib/nodes/infiniband.c */ -/* -struct pool p = { .state = STATE_DESTROYED }; -struct node *n = ..; - -pool_init(&p, 100, 32, node_get_memtype(n)); - -*/ - - diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 1a847365d..efe372816 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -95,23 +95,30 @@ void ib_completion_target(struct node* n, struct ibv_wc* wc, int* size) void ib_completion_source(struct node* n, struct ibv_wc* wc, int* size) { - struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + struct infiniband *ib = (struct infiniband *)((struct node *)n)->_vd; + struct sample* smpl; - for(int i=0; i<*size; i++) - { - //On disconnect, the QP set to error state and will be flushed - if(wc[i].status == IBV_WC_WR_FLUSH_ERR) - { - ib->poll.stopThread = 1; - return; - } - - if(wc[i].status != IBV_WC_SUCCESS) - warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", - node_name(n), wc[i].status); - - } + for(int i=0; i<*size; i++) + { + //On disconnect, the QP set to error state and will be flushed + if(wc[i].status == IBV_WC_WR_FLUSH_ERR) + { + ib->poll.stopThread = 1; + return; + } + if(wc[i].status != IBV_WC_SUCCESS) + { + warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", + node_name(n), wc[i].status); + } + else + { + // Release sample + smpl = (struct sample*)wc[i].wr_id; + sample_put(smpl); + } + } } void * ib_event_thread(void *n) @@ -232,7 +239,7 @@ static void ib_build_ibv(struct node *n) // Set pool size to maximum size of Receive Queue pool_init(&ib->mem.p_recv, ib->qp_init.cap.max_recv_wr, - sizeof(double), + 64*sizeof(double), &memtype_heap); if(ret) { @@ -705,115 +712,108 @@ int ib_deinit() int ib_read(struct node *n, struct sample *smps[], unsigned cnt) { - //Create separate thread for polling! This impelemtation is just - //for testing purposes - struct infiniband *ib = (struct infiniband *) n->_vd; - int ret; - struct ibv_wc wc[cnt]; - union { - double f; - int64_t i; - } *data; + //Create separate thread for polling! This impelemtation is just + //for testing purposes + struct infiniband *ib = (struct infiniband *) n->_vd; + int ret; + struct ibv_wc wc[cnt]; + char *ptr; - ret = ibv_poll_cq(ib->ctx.cq, cnt, wc); + ret = ibv_poll_cq(ib->ctx.cq, cnt, wc); - if(ret) - { - ib->conn.used_recv_wrs += ret; + if(ret) + { + ib->conn.used_recv_wrs += ret; - data = malloc(ret*sizeof(double)); + for(int i=0; imem.p_recv, (double*)(wc[i].wr_id)); + } + smps[i]->length = wc[i].byte_len/sizeof(double); + smps[i]->capacity = 64; + memcpy(smps[i]->data, ptr, wc[i].byte_len); + } + } + else + { + //No data received? Put new receive Work Requests to Receive Queue + for(int i=0; iconn.used_recv_wrs; i++) + ib_post_recv_wrs(n); - //Release memory - pool_put(&ib->mem.p_recv, (double*)(wc[i].wr_id)); - } - } - smps[0]->length = ret; - smps[0]->capacity = cnt; - memcpy(smps[0]->data, data, ret*sizeof(double)); - } - else - { - //No data received? Put new receive Work Requests to Receive Queue - for(int i=0; iconn.used_recv_wrs; i++) - ib_post_recv_wrs(n); + ib->conn.used_recv_wrs = 0; + } - ib->conn.used_recv_wrs = 0; - } - - return ret; + return ret; } int ib_write(struct node *n, struct sample *smps[], unsigned cnt) { - /* Send pool is not used at this moment! */ - struct infiniband *ib = (struct infiniband *) n->_vd; - int ret; - struct ibv_send_wr wr[smps[0]->length], *bad_wr = NULL; - struct ibv_sge sge[smps[0]->length]; + struct infiniband *ib = (struct infiniband *) n->_vd; + struct ibv_send_wr wr[cnt], *bad_wr = NULL; + struct ibv_sge sge[cnt]; + struct pool *p; + struct ibv_mr ** mr; + int ret; - memset(&wr, 0, sizeof(wr)); + memset(&wr, 0, sizeof(wr)); - //ToDo: Place this into configuration and create checks if settings are valid - int send_inline = 1; + //ToDo: Place this into configuration and create checks if settings are valid + int send_inline = 0; - for(int i=0; ilength; i++) - { - // If data is send inline, it is not necessary to copy data to protected - // memory region first. - if(send_inline) - { - sge[i].addr = (uint64_t)&smps[0]->data[i].f; - sge[i].length = sizeof(double); - } - else - { - //- copy value to send_region - //- give pointer to start of array - } + // Get Memory Region + p = sample_pool(smps[0]); + mr = (struct ibv_mr **)((char *)(p)+p->buffer_off-8); - // Set Send Work Request - wr[i].wr_id = 0; //ToDo: set this to a useful value - wr[i].sg_list = &sge[i]; - wr[i].num_sge = 1; //ToDo: Right now only smps[0] is sg_list. This can be extended - //furthermore we should break the transaction up if inline mode - //is selected + for(int i=0; ilength-1)) - wr[i].next = NULL; - else - wr[i].next = &wr[i+1]; - wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline<<3); - wr[i].imm_data = htonl(0); //ToDo: set this to a useful value - wr[i].opcode = IBV_WR_SEND_WITH_IMM; + //Set Scatter/Gather element to data of sample + sge[i].addr = (uint64_t)&smps[i]->data->f; + sge[i].length = smps[i]->length*sizeof(double); + sge[i].lkey = (*mr)->lkey; - } + // Set Send Work Request + wr[i].wr_id = (uint64_t)&smps[i]; //This way the sample can be release in WC + wr[i].sg_list = &sge[i]; + wr[i].num_sge = 1; - //Send linked list of Work Requests - ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr); - if(ret) - { - error("Failed to send message in node %s: %s", - node_name(n), gai_strerror(ret)); + if(i == (cnt-1)) + wr[i].next = NULL; + else + wr[i].next = &wr[i+1]; - return -ret; - } + wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline<<3); + wr[i].imm_data = htonl(0); //ToDo: set this to a useful value + wr[i].opcode = IBV_WR_SEND_WITH_IMM; + } - return cnt; + //Send linked list of Work Requests + ret = ibv_post_send(ib->ctx.id->qp, wr, &bad_wr); + if(ret) + { + error("Failed to send message in node %s: %s", + node_name(n), gai_strerror(ret)); + + return -ret; + } + + return cnt; } int ib_fd(struct node *n)