diff --git a/lib/nodes/infiniband.c b/lib/nodes/infiniband.c index 255e9bcf3..886b262dc 100644 --- a/lib/nodes/infiniband.c +++ b/lib/nodes/infiniband.c @@ -29,6 +29,7 @@ #include #include #include +#include static int ib_disconnect(struct node *n) { @@ -324,8 +325,8 @@ int ib_parse(struct node *n, json_t *cfg) ib->conn.available_recv_wrs = 0; // Set remaining QP attributes - ib->qp_init.cap.max_send_sge = 1; - ib->qp_init.cap.max_recv_sge = (ib->conn.port_space == RDMA_PS_UDP) ? 2 : 1; + ib->qp_init.cap.max_send_sge = 4; + ib->qp_init.cap.max_recv_sge = (ib->conn.port_space == RDMA_PS_UDP) ? 4 : 5; // Set number of bytes to be send inline ib->qp_init.cap.max_inline_data = max_inline_data; @@ -704,8 +705,9 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_wc wc[cnt]; struct ibv_recv_wr wr[cnt], *bad_wr = NULL; - struct ibv_sge sge[cnt][(ib->conn.port_space == RDMA_PS_UDP) ? 2 : 1]; + struct ibv_sge sge[cnt][ib->qp_init.cap.max_recv_sge]; struct ibv_mr *mr; + struct timespec ts_receive; int ret = 0, wcs = 0, read_values = 0, max_wr_post; debug(LOG_IB | 15, "ib_read is called"); @@ -722,6 +724,9 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea wcs = ibv_poll_cq(ib->ctx.recv_cq, cnt, wc); if (wcs) { + // Get time directly after something arrived in Completion Queue + ts_receive = time_now(); + debug(LOG_IB | 10, "Received %i Work Completions", wcs); read_values = wcs; // Value to return @@ -745,9 +750,11 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea mr = memory_ib_get_mr(smps[0]); for (int i = 0; i < max_wr_post; i++) { - // Prepare receive Scatter/Gather element int j = 0; + // Prepare receive Scatter/Gather element + + // First 40 byte of UD data are GRH and unused in our case if (ib->conn.port_space == RDMA_PS_UDP) { sge[i][j].addr = (uint64_t) ib->conn.grh_ptr; sge[i][j].length = 40; @@ -756,6 +763,27 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea j++; } + // Sequence + sge[i][j].addr = (uint64_t) &smps[i]->sequence; + sge[i][j].length = sizeof(smps[i]->sequence); + sge[i][j].lkey = mr->lkey; + + j++; + + // Format + sge[i][j].addr = (uint64_t) &smps[i]->format; + sge[i][j].length = sizeof(smps[i]->format); + sge[i][j].lkey = mr->lkey; + + j++; + + // Timespec origin + sge[i][j].addr = (uint64_t) &smps[i]->ts.origin; + sge[i][j].length = sizeof(smps[i]->ts.origin); + sge[i][j].lkey = mr->lkey; + + j++; + sge[i][j].addr = (uint64_t) &smps[i]->data; sge[i][j].length = SAMPLE_DATA_LEN(DEFAULT_SAMPLELEN); sge[i][j].lkey = mr->lkey; @@ -799,6 +827,9 @@ int ib_read(struct node *n, struct sample *smps[], unsigned cnt, unsigned *relea smps[j] = (struct sample *) (wc[j].wr_id); smps[j]->length = (wc[j].byte_len - correction) / sizeof(double); + + smps[j]->ts.received = ts_receive; + smps[j]->flags = (SAMPLE_HAS_ORIGIN | SAMPLE_HAS_RECEIVED | SAMPLE_HAS_SEQUENCE | SAMPLE_HAS_FORMAT); } } return read_values; @@ -808,7 +839,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele { struct infiniband *ib = (struct infiniband *) n->_vd; struct ibv_send_wr wr[cnt], *bad_wr = NULL; - struct ibv_sge sge[cnt]; + struct ibv_sge sge[cnt][ib->qp_init.cap.max_recv_sge]; struct ibv_wc wc[cnt]; struct ibv_mr *mr; @@ -826,10 +857,36 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele mr = memory_ib_get_mr(smps[0]); for (sent = 0; sent < cnt; sent++) { + int j = 0; + // Set Scatter/Gather element to data of sample - sge[sent].addr = (uint64_t) &smps[sent]->data; - sge[sent].length = smps[sent]->length*sizeof(double); - sge[sent].lkey = mr->lkey; + // Sequence + sge[sent][j].addr = (uint64_t) &smps[sent]->sequence; + sge[sent][j].length = sizeof(smps[sent]->sequence); + sge[sent][j].lkey = mr->lkey; + + j++; + + // Format + sge[sent][j].addr = (uint64_t) &smps[sent]->format; + sge[sent][j].length = sizeof(smps[sent]->format); + sge[sent][j].lkey = mr->lkey; + + j++; + + // Timespec origin + sge[sent][j].addr = (uint64_t) &smps[sent]->ts.origin; + sge[sent][j].length = sizeof(smps[sent]->ts.origin); + sge[sent][j].lkey = mr->lkey; + + j++; + + // Actual Payload + sge[sent][j].addr = (uint64_t) &smps[sent]->data; + sge[sent][j].length = smps[sent]->length*sizeof(double); + sge[sent][j].lkey = mr->lkey; + + j++; // Check if connection is connected or unconnected and set appropriate values if (ib->conn.port_space == RDMA_PS_UDP) { @@ -839,7 +896,7 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele } // Check if data can be send inline - int send_inline = (sge[sent].length < ib->qp_init.cap.max_inline_data) ? + int send_inline = (sge[sent][j-1].length < ib->qp_init.cap.max_inline_data) ? ib->conn.send_inline : 0; @@ -847,8 +904,8 @@ int ib_write(struct node *n, struct sample *smps[], unsigned cnt, unsigned *rele // Set Send Work Request wr[sent].wr_id = send_inline ? 0 : (uintptr_t) smps[sent]; // This way the sample can be release in WC - wr[sent].sg_list = &sge[sent]; - wr[sent].num_sge = 1; + wr[sent].sg_list = sge[sent]; + wr[sent].num_sge = j; wr[sent].next = &wr[sent+1]; wr[sent].send_flags = IBV_SEND_SIGNALED | (send_inline << 3);