1
0
Fork 0
mirror of https://git.rwth-aachen.de/acs/public/villas/node/ synced 2025-03-09 00:00:00 +01:00

ib_read() now works for UDP

Node is now able to send data in RDMA_PS_UDP mode. Right now it creates
a new rdma_cm_id for every connection request. We could/should do this
differently
This commit is contained in:
Dennis Potter 2018-07-19 20:33:41 +02:00
parent 2b323c3781
commit 3acc3df7c4
2 changed files with 31 additions and 8 deletions

View file

@ -95,6 +95,8 @@ struct infiniband {
/* Unrealiable connectionless data */
struct rdma_ud_param ud;
void *grh_ptr;
struct ibv_mr *grh_mr;
} conn;

View file

@ -304,7 +304,7 @@ int ib_parse(struct node *n, json_t *cfg)
// Set remaining QP attributes
ib->qp_init.cap.max_send_sge = 1;
ib->qp_init.cap.max_recv_sge = 1;
ib->qp_init.cap.max_recv_sge = (ib->conn.port_space == RDMA_PS_UDP) ? 2 : 1;
// Set number of bytes to be send inline
ib->qp_init.cap.max_inline_data = max_inline_data;
@ -507,6 +507,11 @@ void * ib_rdma_cm_event_thread(void *n)
debug(LOG_IB | 2, "Received RDMA_CM_EVENT_CONNECT_REQUEST");
ret = ib_connect_request(n, event->id);
//ToDo: Think about this. In this context, we say that the QP is initialized
//and at least one other node send data
if (ib->conn.port_space == RDMA_PS_UDP)
node->state = STATE_CONNECTED;
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
@ -611,6 +616,11 @@ int ib_start(struct node *n)
debug(LOG_IB | 3, "Allocated Protection Domain");
//ToDo: This is a temporary solution since we don't use the GRH at this moment
if (ib->conn.port_space == RDMA_PS_UDP) {
ib->conn.grh_ptr = alloc(40);
ib->conn.grh_mr = ibv_reg_mr(ib->ctx.pd, ib->conn.grh_ptr, 40, IBV_ACCESS_LOCAL_WRITE);
}
// Several events should occur on the event channel, to make
// sure the nodes are succesfully connected.
@ -685,7 +695,7 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
struct infiniband *ib = (struct infiniband *) n->_vd;
struct ibv_wc wc[cnt];
struct ibv_recv_wr wr[cnt], *bad_wr = NULL;
struct ibv_sge sge[cnt];
struct ibv_sge sge[cnt][(ib->conn.port_space == RDMA_PS_UDP) ? 2 : 1];
struct ibv_mr *mr;
int ret = 0, wcs = 0, read_values = 0, max_wr_post;
@ -725,15 +735,23 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
for (int i = 0; i < max_wr_post; i++) {
// Prepare receive Scatter/Gather element
sge[i].addr = (uint64_t) &smps[i]->data;
sge[i].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
sge[i].lkey = mr->lkey;
if (ib->conn.port_space == RDMA_PS_UDP) {
sge[i][0].addr = (uint64_t) ib->conn.grh_ptr;
sge[i][0].length = 40;
sge[i][0].lkey = ib->conn.grh_mr->lkey;
}
// [0] if it is TCP, otherwise [1]
sge[i][(ib->conn.port_space == RDMA_PS_UDP)].addr = (uint64_t) &smps[i]->data;
sge[i][(ib->conn.port_space == RDMA_PS_UDP)].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN);
sge[i][(ib->conn.port_space == RDMA_PS_UDP)].lkey = mr->lkey;
// Prepare a receive Work Request
wr[i].wr_id = (uintptr_t) smps[i];
wr[i].next = &wr[i+1];
wr[i].sg_list = &sge[i];
wr[i].num_sge = 1;
wr[i].sg_list = sge[i];
wr[i].num_sge = (ib->conn.port_space == RDMA_PS_UDP) ? 2 : 1;
}
wr[max_wr_post-1].next = NULL;
@ -761,8 +779,11 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea
warn("Work Completion status was not IBV_WC_SUCCES in node %s: %i",
node_name(n), wc[j].status);
// Lenght includes a 40 byte global routing header. Substract it
int correction = (ib->conn.port_space == RDMA_PS_UDP) ? 40 : 0;
smps[j] = (struct sample *) (wc[j].wr_id);
smps[j]->length = wc[j].byte_len / sizeof(double);
smps[j]->length = (wc[j].byte_len - correction) / sizeof(double);
}
}
return read_values;