diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index d68a3507d..8ff29cdba 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -111,8 +111,7 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id) // Set pool size to maximum size of Receive Queue pool_init(&ib->mem.p_recv, - //ib->qp_init.cap.max_recv_wr, - 1, + ib->qp_init.cap.max_recv_wr, sizeof(struct payload_s), &memtype_heap); if(ret) { @@ -121,28 +120,19 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id) } //ToDo: initialize r_addr_key struct if mode is RDMA - struct payload_s* test; - test = pool_get(&ib->mem.p_recv); - - printf("Address value: %p\n", test); - printf("Address pool: %p\n", &ib->mem.p_recv); - printf("Address calculated: %p\n", &ib->mem.p_recv+ib->mem.p_recv.buffer_off); - - printf("Offset: %li\n", ib->mem.p_recv.buffer_off); - printf("Size of struct: %lu\n", sizeof(struct payload_s)); - printf("Size of block: %lu\n", ib->mem.p_recv.blocksz); // Register memory for IB Device. Not necessary if data is send // exclusively inline ib->mem.mr_recv = ibv_reg_mr( ib->ctx.pd, - &ib->mem.p_recv+ib->mem.p_recv.buffer_off, - ib->mem.p_recv.len*ib->mem.p_recv.blocksz, + (char*)&ib->mem.p_recv+ib->mem.p_recv.buffer_off, + ib->mem.p_recv.len, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); if(!ib->mem.mr_recv) { error("Failed to register mr_recv with ibv_reg_mr of node %s.", node_name(n)); } + info("Allocated receive memory."); if(ib->is_source) { @@ -165,13 +155,14 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id) // exclusively inline ib->mem.mr_send = ibv_reg_mr( ib->ctx.pd, - &ib->mem.p_send+ib->mem.p_send.buffer_off, - ib->mem.p_send.len*ib->mem.p_send.blocksz, + (char*)&ib->mem.p_send+ib->mem.p_send.buffer_off, + ib->mem.p_send.len, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE); if(!ib->mem.mr_send) { error("Failed to register mr_send with ibv_reg_mr of node %s.", node_name(n)); } + info("Allocated send memory."); } } @@ -472,17 +463,77 @@ int ib_deinit() int ib_read(struct node *n, struct sample *smps[], unsigned cnt) { - return 0; + //Create separate thread for polling! This impelemtation is just + //for testing purposes + struct infiniband *ib = (struct infiniband *) n->_vd; + int ret; + struct ibv_wc wc[100]; + + ret = ibv_poll_cq(ib->ctx.cq, 100, wc); + + + + + return ret; } int ib_write(struct node *n, struct sample *smps[], unsigned cnt) { + struct infiniband *ib = (struct infiniband *) n->_vd; + int ret; + struct ibv_send_wr wr; + struct ibv_send_wr *bad_wr = NULL; + struct ibv_sge sg_list; + + memset(&wr, 0, sizeof(wr)); + + struct payload_s *payl; + payl = pool_get(&ib->mem.p_send); + + payl->data = 1337; + + // If data is send inline, it is not necessary to copy data to protected + // memory region first. + if(1) + { + //sg_list.addr = (uint64_t)smps[0]->data; + //sg_list.length = smps[0]->length-1; + sg_list.addr = (uintptr_t)payl; + sg_list.length = 1; + // lkey not necessary + } + else + { + //- copy value to send_region + //- give pointer to start of array + } + + // Set Send Work Request + wr.wr_id = 123; //ToDo: set this to a useful value + wr.sg_list = &sg_list; + wr.num_sge = 1; //ToDo: Right now only smps[0] is sg_list. This can be extended + //furthermore we should break the transaction up if inline mode + //is selected + wr.next = NULL; + wr.send_flags = IBV_SEND_SIGNALED; + wr.imm_data = htonl(0); //ToDo: set this to a useful value + wr.opcode = IBV_WR_SEND_WITH_IMM; for(int i = 0; i < smps[0]->length; i++) { printf("Sample %i: %f\n", i, smps[0]->data[i].f); } - return 0; + + ret = ibv_post_send(ib->id->qp, &wr, &bad_wr); + if(ret) + { + error("Failed to send message in node %s: %s", + node_name(n), gai_strerror(ret)); + + return -ret; + } + + return cnt; } int ib_fd(struct node *n)