diff --git a/include/villas/nodes/infiniband.h b/include/villas/nodes/infiniband.h index 4a765ac5c..1ddd85eb3 100644 --- a/include/villas/nodes/infiniband.h +++ b/include/villas/nodes/infiniband.h @@ -95,6 +95,8 @@ struct infiniband { /* Unrealiable connectionless data */ struct rdma_ud_param ud; + void *grh_ptr; + struct ibv_mr *grh_mr; } conn; diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 6186ed6ff..faede6006 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -304,7 +304,7 @@ int ib_parse(struct node *n, json_t *cfg) // Set remaining QP attributes ib->qp_init.cap.max_send_sge = 1; - ib->qp_init.cap.max_recv_sge = 1; + ib->qp_init.cap.max_recv_sge = (ib->conn.port_space == RDMA_PS_UDP) ? 2 : 1; // Set number of bytes to be send inline ib->qp_init.cap.max_inline_data = max_inline_data; @@ -507,6 +507,11 @@ void * ib_rdma_cm_event_thread(void *n) debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST"); ret = ib_connect_request(n, event->id); + + //ToDo: Think about this. In this context, we say that the QP is initialized + //and at least one other node send data + if (ib->conn.port_space == RDMA_PS_UDP) + node->state = STATE_CONNECTED; break; case RDMA_CM_EVENT_CONNECT_ERROR: @@ -611,6 +616,11 @@ int ib_start(struct node *n) debug(LOG_IB | 3, "Allocated Protection Domain"); + //ToDo: This is a temporary solution since we don't use the GRH at this moment + if (ib->conn.port_space == RDMA_PS_UDP) { + ib->conn.grh_ptr = alloc(40); + ib->conn.grh_mr = ibv_reg_mr(ib->ctx.pd, ib->conn.grh_ptr, 40, IBV_ACCESS_LOCAL_WRITE); + } // Several events should occur on the event channel, to make // sure the nodes are succesfully connected. @@ -685,7 +695,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_wc wc[cnt]; struct ibv_recv_wr wr[cnt], *bad_wr = NULL; - struct ibv_sge sge[cnt]; + struct ibv_sge sge[cnt][(ib->conn.port_space == RDMA_PS_UDP) ? 2 : 1]; struct ibv_mr *mr; int ret = 0, wcs = 0, read_values = 0, max_wr_post; @@ -725,15 +735,23 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea for (int i = 0; i < max_wr_post; i++) { // Prepare receive Scatter/Gather element - sge[i].addr = (uint64_t) &smps[i]->data; - sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); - sge[i].lkey = mr->lkey; + + if (ib->conn.port_space == RDMA_PS_UDP) { + sge[i][0].addr = (uint64_t) ib->conn.grh_ptr; + sge[i][0].length = 40; + sge[i][0].lkey = ib->conn.grh_mr->lkey; + } + + // [0] if it is TCP, otherwise [1] + sge[i][(ib->conn.port_space == RDMA_PS_UDP)].addr = (uint64_t) &smps[i]->data; + sge[i][(ib->conn.port_space == RDMA_PS_UDP)].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); + sge[i][(ib->conn.port_space == RDMA_PS_UDP)].lkey = mr->lkey; // Prepare a receive Work Request wr[i].wr_id = (uintptr_t) smps[i]; wr[i].next = &wr[i+1]; - wr[i].sg_list = &sge[i]; - wr[i].num_sge = 1; + wr[i].sg_list = sge[i]; + wr[i].num_sge = (ib->conn.port_space == RDMA_PS_UDP) ? 2 : 1; } wr[max_wr_post-1].next = NULL; @@ -761,8 +779,11 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i", node_name(n), wc[j].status); + // Lenght includes a 40 byte global routing header. Substract it + int correction = (ib->conn.port_space == RDMA_PS_UDP) ? 40 : 0; + smps[j] = (struct sample *) (wc[j].wr_id); - smps[j]->length = wc[j].byte_len / sizeof(double); + smps[j]->length = (wc[j].byte_len - correction) / sizeof(double); } } return read_values;