diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index f072613d3..c0a8a825a 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -44,16 +44,13 @@ enum poll_mode_e BUSY }; -struct payload_s { - int data; -}; - struct r_addr_key_s { uint64_t remote_addr; uint32_t rkey; }; struct infiniband { + struct rdma_cm_id *listen_id; struct rdma_cm_id *id; struct rdma_event_channel *ec; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 8ff29cdba..7044d1eb7 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -31,6 +31,30 @@ #include +int ib_post_recv_wrs(struct node *n) +{ + struct infiniband *ib = (struct infiniband *) n->_vd; + struct ibv_recv_wr wr, *bad_wr = NULL; + int ret; + struct ibv_sge sge; + + // Prepare receive Scatter/Gather element + sge.addr = (uintptr_t)pool_get(&ib->mem.p_recv); + sge.length = ib->mem.p_recv.blocksz; + sge.lkey = ib->mem.mr_recv->lkey; + + // Prepare a receive Work Request + wr.wr_id = (uintptr_t)sge.addr; + wr.next = NULL; + wr.sg_list = &sge; + wr.num_sge = 1; + + // Post Work Request + ret = ibv_post_recv(ib->id->qp, &wr, &bad_wr); + + return ret; +} + static void ib_create_busy_poll(struct node *n, struct rdma_cm_id *id) { struct infiniband *ib = (struct infiniband *) n->_vd; @@ -103,7 +127,7 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id) if(ret) error("Failed to create Queue Pair in node %s.", node_name(n)); - info("Successfully created Queue Pair."); + info("Created Queue Pair."); // Allocate memory ib->mem.p_recv.state = STATE_DESTROYED; @@ -112,7 +136,7 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id) // Set pool size to maximum size of Receive Queue pool_init(&ib->mem.p_recv, ib->qp_init.cap.max_recv_wr, - sizeof(struct payload_s), + sizeof(double), &memtype_heap); if(ret) { error("Failed to init recv memory pool of node %s: %s", @@ -142,7 +166,7 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id) // Set pool size to maximum size of Receive Queue pool_init(&ib->mem.p_send, ib->qp_init.cap.max_send_wr, - sizeof(struct payload_s), + sizeof(double), &memtype_heap); if(ret) { error("Failed to init send memory of node %s: %s", @@ -163,7 +187,20 @@ static void ib_build_ibv(struct node *n, struct rdma_cm_id *id) node_name(n)); } info("Allocated send memory."); + } + + // Post Receive Work Requests to be able to receive data + // Fill complete Receive Queue during initialization + for(int i=0; iqp_init.cap.max_recv_wr; i++) + { + ret = ib_post_recv_wrs(n); + if(ret) { + error("Failed to post initial receive Work Requests of node %s.", + node_name(n)); + } + } + info("Filled the complete Receive Queue."); } static int ib_addr_resolved(struct node *n, struct rdma_cm_id *id) @@ -209,9 +246,11 @@ static int ib_route_resolved(struct node *n, struct rdma_cm_id *id) static int ib_connect_request(struct node *n, struct rdma_cm_id *id) { + struct infiniband *ib = (struct infiniband *) n->_vd; int ret; info("Received a connection request!"); + ib->id = id; ib_build_ibv(n, id); //ToDo: Post receive WRs @@ -398,6 +437,9 @@ int ib_start(struct node *n) } info("Succesfully created rdma_cm_id."); + // The ID will be overwritten for the target + ib->listen_id = ib->id; + // Bind rdma_cm_id to the HCA ret = rdma_bind_addr(ib->id, ib->conn.src_addr->ai_addr); if(ret) { @@ -421,7 +463,7 @@ int ib_start(struct node *n) else { // Listen on rdma_cm_id for events - ret = rdma_listen(ib->id, 10); + ret = rdma_listen(ib->listen_id, 10); if(ret) { error("Failed to listen to rdma_cm_id on node %s", node_name(n)); } @@ -467,64 +509,80 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt) //for testing purposes struct infiniband *ib = (struct infiniband *) n->_vd; int ret; - struct ibv_wc wc[100]; + struct ibv_wc wc[cnt]; + union { + double f; + int64_t i; + } *data; - ret = ibv_poll_cq(ib->ctx.cq, 100, wc); + ret = ibv_poll_cq(ib->ctx.cq, cnt, wc); + if(ret) + { + data = malloc(ret*sizeof(double)); - + for(int i=0; ilength = ret; + smps[0]->capacity = cnt; + memcpy(smps[0]->data, data, ret*sizeof(double)); + } return ret; } int ib_write(struct node *n, struct sample *smps[], unsigned cnt) { + /* Send pool is not used at this moment! */ struct infiniband *ib = (struct infiniband *) n->_vd; int ret; - struct ibv_send_wr wr; - struct ibv_send_wr *bad_wr = NULL; - struct ibv_sge sg_list; + struct ibv_send_wr wr[smps[0]->length], *bad_wr = NULL; + struct ibv_sge sge[smps[0]->length]; memset(&wr, 0, sizeof(wr)); - struct payload_s *payl; - payl = pool_get(&ib->mem.p_send); + //ToDo: Place this into configuration and create checks if settings are valid + int send_inline = 1; - payl->data = 1337; - - // If data is send inline, it is not necessary to copy data to protected - // memory region first. - if(1) + for(int i=0; ilength; i++) { - //sg_list.addr = (uint64_t)smps[0]->data; - //sg_list.length = smps[0]->length-1; - sg_list.addr = (uintptr_t)payl; - sg_list.length = 1; - // lkey not necessary - } - else - { - //- copy value to send_region - //- give pointer to start of array - } + // If data is send inline, it is not necessary to copy data to protected + // memory region first. + if(send_inline) + { + sge[i].addr = (uint64_t)&smps[0]->data[i].f; + sge[i].length = sizeof(double); + } + else + { + //- copy value to send_region + //- give pointer to start of array + } - // Set Send Work Request - wr.wr_id = 123; //ToDo: set this to a useful value - wr.sg_list = &sg_list; - wr.num_sge = 1; //ToDo: Right now only smps[0] is sg_list. This can be extended - //furthermore we should break the transaction up if inline mode - //is selected - wr.next = NULL; - wr.send_flags = IBV_SEND_SIGNALED; - wr.imm_data = htonl(0); //ToDo: set this to a useful value - wr.opcode = IBV_WR_SEND_WITH_IMM; + // Set Send Work Request + wr[i].wr_id = 0; //ToDo: set this to a useful value + wr[i].sg_list = &sge[i]; + wr[i].num_sge = 1; //ToDo: Right now only smps[0] is sg_list. This can be extended + //furthermore we should break the transaction up if inline mode + //is selected + + if(i == (smps[0]->length-1)) + wr[i].next = NULL; + else + wr[i].next = &wr[i+1]; + wr[i].send_flags = IBV_SEND_SIGNALED | (send_inline<<3); + wr[i].imm_data = htonl(0); //ToDo: set this to a useful value + wr[i].opcode = IBV_WR_SEND_WITH_IMM; - for(int i = 0; i < smps[0]->length; i++) - { - printf("Sample %i: %f\n", i, smps[0]->data[i].f); } - ret = ibv_post_send(ib->id->qp, &wr, &bad_wr); + //Send linked list of Work Requests + ret = ibv_post_send(ib->id->qp, wr, &bad_wr); if(ret) { error("Failed to send message in node %s: %s", @@ -532,6 +590,17 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt) return -ret; } + + /* Debugging */ + struct ibv_wc wc[5]; + int size; + while(1) + { + size = ibv_poll_cq(ib->ctx.cq, 5, wc); + if(size) + for(int j=0; j